1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::generate_traffic;
use forge::{NetworkContext, NetworkTest, Result, Test};
use rand::{
    rngs::{OsRng, StdRng},
    seq::IteratorRandom,
    Rng, SeedableRng,
};
use std::{thread, time::Instant};
use tokio::{runtime::Runtime, time::Duration};

const STATE_SYNC_COMMITTED_COUNTER_NAME: &str = "diem_state_sync_version.synced";

pub struct StateSyncPerformance;

impl Test for StateSyncPerformance {
    fn name(&self) -> &'static str {
        "StateSyncPerformance"
    }
}

impl NetworkTest for StateSyncPerformance {
    fn run<'t>(&self, ctx: &mut NetworkContext<'t>) -> Result<()> {
        let mut rng = StdRng::from_seed(OsRng.gen());
        let duration = Duration::from_secs(30);
        let all_validators = ctx
            .swarm()
            .validators()
            .map(|v| v.peer_id())
            .collect::<Vec<_>>();
        let all_fullnodes = ctx
            .swarm()
            .full_nodes()
            .map(|v| v.peer_id())
            .collect::<Vec<_>>();

        // 1. pick one fullnode to stop
        let fullnode_id = all_fullnodes.iter().choose(&mut rng).unwrap();
        ctx.swarm().full_node_mut(*fullnode_id).unwrap().stop()?;

        // 2. emit txn to validators
        generate_traffic(ctx, &all_validators, duration, 0, None)?;

        // 3. read the validator synced version
        let validator_id = all_validators.iter().choose(&mut rng).unwrap();
        let validator = ctx.swarm().validator(*validator_id).unwrap();
        let validator_metric_port = validator.expose_metric()?;
        let validator_synced_version = validator
            .counter(STATE_SYNC_COMMITTED_COUNTER_NAME, validator_metric_port)
            .unwrap_or(0.0);
        if validator_synced_version == 0.0 {
            return Err(anyhow::format_err!(
                "Validator synced zero transactions! Something has gone wrong!"
            ));
        }
        println!(
            "The validator is now synced at version: {}",
            validator_synced_version
        );

        // 4. restart the fullnode so that it starts state syncing to catch up
        let fullnode = ctx.swarm().full_node_mut(*fullnode_id).unwrap();
        // do data cleanup
        fullnode.clear_storage()?;
        println!("The fullnode is going to restart");
        let runtime = Runtime::new().unwrap();
        runtime.block_on(fullnode.start())?;
        println!(
            "The fullnode is now up. Waiting for it to state sync to the expected version: {}",
            validator_synced_version
        );
        let start_instant = Instant::now();
        let fullnode_metric_port = fullnode.expose_metric()?;
        while fullnode
            .counter(STATE_SYNC_COMMITTED_COUNTER_NAME, fullnode_metric_port)
            .unwrap_or(0.0)
            < validator_synced_version
        {
            thread::sleep(Duration::from_secs(1));
        }
        println!(
            "The fullnode has caught up to version: {}",
            validator_synced_version
        );

        // Calculate the state sync throughput
        let time_to_state_sync = start_instant.elapsed().as_secs();
        if time_to_state_sync == 0 {
            return Err(anyhow::format_err!(
                "The time taken to state sync was 0 seconds! Something has gone wrong!"
            ));
        }
        let state_sync_throughput = validator_synced_version as u64 / time_to_state_sync;
        let state_sync_throughput_message =
            format!("State sync throughput : {} txn/sec", state_sync_throughput,);
        println!("Time to state sync: {:?} secs", time_to_state_sync);
        // Display the state sync throughput and report the results
        println!("{}", state_sync_throughput_message);
        ctx.report.report_text(state_sync_throughput_message);
        ctx.report.report_metric(
            self.name(),
            "state_sync_throughput",
            state_sync_throughput as f64,
        );

        Ok(())
    }
}