1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
mod read_write_set_analyzer;
mod storage_wrapper;
mod vm_wrapper;
use crate::{
adapter_common::{preprocess_transaction, PreprocessedTransaction},
data_cache::RemoteStorage,
diem_vm::DiemVM,
parallel_executor::{
read_write_set_analyzer::ReadWriteSetAnalysisWrapper, vm_wrapper::DiemVMWrapper,
},
};
use diem_parallel_executor::{
errors::Error,
executor::ParallelTransactionExecutor,
task::{Transaction as PTransaction, TransactionOutput as PTransactionOutput},
};
use diem_state_view::StateView;
use diem_types::{
access_path::AccessPath,
transaction::{Transaction, TransactionOutput, TransactionStatus},
write_set::{WriteOp, WriteSet},
};
use move_core_types::vm_status::{StatusCode, VMStatus};
use rayon::prelude::*;
use read_write_set_dynamic::NormalizedReadWriteSetAnalysis;
impl PTransaction for PreprocessedTransaction {
type Key = AccessPath;
type Value = WriteOp;
}
pub(crate) struct DiemTransactionOutput(TransactionOutput);
impl DiemTransactionOutput {
pub fn new(output: TransactionOutput) -> Self {
Self(output)
}
pub fn into(self) -> TransactionOutput {
self.0
}
}
impl PTransactionOutput for DiemTransactionOutput {
type T = PreprocessedTransaction;
fn get_writes(&self) -> Vec<(AccessPath, WriteOp)> {
self.0.write_set().iter().cloned().collect()
}
fn skip_output() -> Self {
Self(TransactionOutput::new(
WriteSet::default(),
vec![],
0,
TransactionStatus::Retry,
))
}
}
pub struct ParallelDiemVM();
impl ParallelDiemVM {
pub fn execute_block<S: StateView>(
analysis_result: &NormalizedReadWriteSetAnalysis,
transactions: Vec<Transaction>,
state_view: &S,
) -> Result<(Vec<TransactionOutput>, Option<Error<VMStatus>>), VMStatus> {
let blockchain_view = RemoteStorage::new(state_view);
let analyzer = ReadWriteSetAnalysisWrapper::new(analysis_result, &blockchain_view);
let signature_verified_block: Vec<PreprocessedTransaction> = transactions
.par_iter()
.map(|txn| preprocess_transaction::<DiemVM>(txn.clone()))
.collect();
match ParallelTransactionExecutor::<
PreprocessedTransaction,
DiemVMWrapper<S>,
ReadWriteSetAnalysisWrapper<RemoteStorage<S>>,
>::new(analyzer)
.execute_transactions_parallel(state_view, signature_verified_block)
{
Ok(results) => Ok((
results
.into_iter()
.map(DiemTransactionOutput::into)
.collect(),
None,
)),
Err(err @ Error::InferencerError) | Err(err @ Error::UnestimatedWrite) => {
let output = DiemVM::execute_block_and_keep_vm_status(transactions, state_view)?;
Ok((
output
.into_iter()
.map(|(_vm_status, txn_output)| txn_output)
.collect(),
Some(err),
))
}
Err(Error::InvariantViolation) => Err(VMStatus::Error(
StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR,
)),
Err(Error::UserError(err)) => Err(err),
}
}
}