1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#![forbid(unsafe_code)]
use anyhow::Result;
use diem_infallible::Mutex;
use diem_logger::warn;
use diem_secure_net::NetworkClient;
use diem_types::{
account_address::AccountAddress,
account_state_blob::AccountStateBlob,
ledger_info::LedgerInfoWithSignatures,
proof::SparseMerkleProof,
transaction::{TransactionToCommit, Version},
};
use serde::de::DeserializeOwned;
use std::net::SocketAddr;
use storage_interface::{
DbReader, DbWriter, Error, GetAccountStateWithProofByVersionRequest, SaveTransactionsRequest,
StartupInfo, StorageRequest,
};
pub struct StorageClient {
network_client: Mutex<NetworkClient>,
}
impl StorageClient {
pub fn new(server_address: &SocketAddr, timeout: u64) -> Self {
Self {
network_client: Mutex::new(NetworkClient::new("storage", *server_address, timeout)),
}
}
fn process_one_message(&self, input: &[u8]) -> Result<Vec<u8>, Error> {
let mut client = self.network_client.lock();
client.write(input)?;
client.read().map_err(|e| e.into())
}
fn request<T: DeserializeOwned>(&self, input: StorageRequest) -> std::result::Result<T, Error> {
let input_message = bcs::to_bytes(&input)?;
let result = loop {
match self.process_one_message(&input_message) {
Err(err) => warn!(
error = ?err,
request = ?input,
"Failed to communicate with storage service.",
),
Ok(value) => break value,
}
};
bcs::from_bytes(&result)?
}
pub fn get_account_state_with_proof_by_version(
&self,
address: AccountAddress,
version: Version,
) -> std::result::Result<
(
Option<AccountStateBlob>,
SparseMerkleProof<AccountStateBlob>,
),
Error,
> {
self.request(StorageRequest::GetAccountStateWithProofByVersionRequest(
Box::new(GetAccountStateWithProofByVersionRequest::new(
address, version,
)),
))
}
pub fn get_startup_info(&self) -> std::result::Result<Option<StartupInfo>, Error> {
self.request(StorageRequest::GetStartupInfoRequest)
}
pub fn save_transactions(
&self,
txns_to_commit: Vec<TransactionToCommit>,
first_version: Version,
ledger_info_with_sigs: Option<LedgerInfoWithSignatures>,
) -> std::result::Result<(), Error> {
self.request(StorageRequest::SaveTransactionsRequest(Box::new(
SaveTransactionsRequest::new(txns_to_commit, first_version, ledger_info_with_sigs),
)))
}
}
impl DbReader for StorageClient {
fn get_account_state_with_proof_by_version(
&self,
address: AccountAddress,
version: u64,
) -> Result<(
Option<AccountStateBlob>,
SparseMerkleProof<AccountStateBlob>,
)> {
Ok(Self::get_account_state_with_proof_by_version(
self, address, version,
)?)
}
fn get_startup_info(&self) -> Result<Option<StartupInfo>> {
Ok(Self::get_startup_info(self)?)
}
}
impl DbWriter for StorageClient {
fn save_transactions(
&self,
txns_to_commit: &[TransactionToCommit],
first_version: Version,
ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
) -> Result<()> {
Ok(Self::save_transactions(
self,
txns_to_commit.to_vec(),
first_version,
ledger_info_with_sigs.cloned(),
)?)
}
}