1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{context::Context, index};

use diem_config::config::{ApiConfig, JsonRpcConfig, NodeConfig};
use diem_mempool::MempoolClientSender;
use diem_types::chain_id::ChainId;
use futures::join;
use storage_interface::MoveDbReader;
use warp::{Filter, Reply};

use std::{convert::Infallible, net::SocketAddr, sync::Arc};
use tokio::runtime::{Builder, Runtime};

/// Creates HTTP server (warp-based) serves for both REST and JSON-RPC API.
/// When api and json-rpc are configured with same port, both API will be served for the port.
/// When api and json-rpc are configured with different port, both API will be served for
/// both ports.
/// Returns corresponding Tokio runtime
pub fn bootstrap(
    config: &NodeConfig,
    chain_id: ChainId,
    db: Arc<dyn MoveDbReader>,
    mp_sender: MempoolClientSender,
) -> anyhow::Result<Runtime> {
    let runtime = Builder::new_multi_thread()
        .thread_name("api")
        .enable_all()
        .build()
        .expect("[api] failed to create runtime");

    let role = config.base.role;
    let json_rpc_config = config.json_rpc.clone();
    let api_config = config.api.clone();
    let api = WebServer::from(api_config.clone());
    let jsonrpc = WebServer::from(json_rpc_config.clone());

    runtime.spawn(async move {
        let context = Context::new(chain_id, db, mp_sender, role, json_rpc_config, api_config);
        let routes = index::routes(context);
        if api.address.port() == jsonrpc.address.port() {
            // when we rollout api, it's likely there is no api configuration for diem
            // node, we will load default api configurations (127.0.0.1:8080).
            // if ip is unspecified(0.0.0.0), we assume it is properly configured and
            // prefer the api web server configuration over jsonrpc configuration.
            if api.address.ip() == jsonrpc.address.ip() || api.address.ip().is_unspecified() {
                api.serve(routes).await;
            } else {
                jsonrpc.serve(routes).await;
            }
        } else {
            join!(api.serve(routes.clone()), jsonrpc.serve(routes));
        }
    });
    Ok(runtime)
}

#[derive(Clone, Debug, PartialEq)]
struct WebServer {
    pub address: SocketAddr,
    pub tls_cert_path: Option<String>,
    pub tls_key_path: Option<String>,
}

impl From<ApiConfig> for WebServer {
    fn from(cfg: ApiConfig) -> Self {
        Self::new(cfg.address, cfg.tls_cert_path, cfg.tls_key_path)
    }
}

impl From<JsonRpcConfig> for WebServer {
    fn from(cfg: JsonRpcConfig) -> Self {
        Self::new(cfg.address, cfg.tls_cert_path, cfg.tls_key_path)
    }
}

impl WebServer {
    pub fn new(
        address: SocketAddr,
        tls_cert_path: Option<String>,
        tls_key_path: Option<String>,
    ) -> Self {
        Self {
            address,
            tls_cert_path,
            tls_key_path,
        }
    }

    pub async fn serve<F>(&self, routes: F)
    where
        F: Filter<Error = Infallible> + Clone + Sync + Send + 'static,
        F::Extract: Reply,
    {
        match &self.tls_cert_path {
            None => warp::serve(routes).bind(self.address).await,
            Some(cert_path) => {
                warp::serve(routes)
                    .tls()
                    .cert_path(cert_path)
                    .key_path(self.tls_key_path.as_ref().unwrap())
                    .bind(self.address)
                    .await
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use diem_config::config::NodeConfig;
    use diem_types::chain_id::ChainId;
    use serde_json::json;

    use crate::{
        runtime::bootstrap,
        tests::{new_test_context, TestContext},
    };

    #[test]
    fn test_bootstrap_jsonprc_and_api_configured_same_address() {
        let mut cfg = NodeConfig::default();
        cfg.randomize_ports();
        // same port and ip
        cfg.api.address = cfg.json_rpc.address;
        bootstrap_with_config(cfg);
    }

    #[test]
    fn test_bootstrap_jsonprc_and_api_configured_at_different_port() {
        let mut cfg = NodeConfig::default();
        cfg.randomize_ports();
        bootstrap_with_config(cfg);
    }

    #[test]
    fn test_bootstrap_same_port_and_jsonprc_with_unspecified_ip() {
        let mut cfg = NodeConfig::default();
        cfg.randomize_ports();
        cfg.json_rpc.address = format!("0.0.0.0:{}", cfg.api.address.port())
            .parse()
            .unwrap();
        bootstrap_with_config(cfg);
    }

    #[test]
    fn test_bootstrap_same_port_and_api_with_unspecified_ip() {
        let mut cfg = NodeConfig::default();
        cfg.randomize_ports();
        cfg.api.address = format!("0.0.0.0:{}", cfg.json_rpc.address.port())
            .parse()
            .unwrap();

        bootstrap_with_config(cfg);
    }

    pub fn bootstrap_with_config(cfg: NodeConfig) {
        let runtime = tokio::runtime::Runtime::new().unwrap();
        let context = runtime.block_on(new_test_context_async());
        let ret = bootstrap(
            &cfg,
            ChainId::test(),
            context.db.clone(),
            context.mempool.ac_client.clone(),
        );
        assert!(ret.is_ok());

        assert_web_server(cfg.api.address.port());
        assert_web_server(cfg.json_rpc.address.port());
    }

    pub fn assert_web_server(port: u16) {
        let base_url = format!("http://localhost:{}", port);
        let client = reqwest::blocking::Client::new();
        // first call have retry to ensure the server is ready to serve
        let api_resp = with_retry(|| Ok(client.get(&base_url).send()?)).unwrap();
        assert_eq!(api_resp.status(), 200);
        let healthy_check_resp = client
            .get(format!("{}/-/healthy", base_url))
            .send()
            .unwrap();
        assert_eq!(healthy_check_resp.status(), 200);
        let jsonrpc_resp = client
            .post(&base_url)
            .json(&json!({"jsonrpc": "2.0", "method": "get_metadata", "id": 1}))
            .send()
            .unwrap();
        assert_eq!(jsonrpc_resp.status(), 200);
    }

    fn with_retry<F>(f: F) -> anyhow::Result<reqwest::blocking::Response>
    where
        F: Fn() -> anyhow::Result<reqwest::blocking::Response>,
    {
        let mut remaining_attempts = 60;
        loop {
            match f() {
                Ok(r) => return Ok(r),
                Err(_) if remaining_attempts > 0 => {
                    remaining_attempts -= 1;
                    std::thread::sleep(Duration::from_millis(100));
                }
                Err(error) => return Err(error),
            }
        }
    }

    pub async fn new_test_context_async() -> TestContext {
        new_test_context()
    }
}