From 736d1b125d69b19529c3a5fd100be2b0adef83f1 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 22 Apr 2024 18:58:04 -0500 Subject: [PATCH] feat(aot): configure tokio runtime by env on node --- Cargo.lock | 2 ++ Cargo.toml | 1 + crates/aot/Cargo.toml | 4 ++++ crates/aot/src/runner/mod.rs | 43 +++++++++++++++++++++++++++++++++++- 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 88ba5099..c231698d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3349,8 +3349,10 @@ dependencies = [ "lazy_static", "metrics-exporter-prometheus 0.14.0", "nix", + "num_cpus", "rand", "rand_chacha", + "rayon", "reqwest 0.12.3", "rocksdb", "serde", diff --git a/Cargo.toml b/Cargo.toml index feaf4785..ee6dcfae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ lasso = { version = "0.7", features = ["multi-threaded"] } local-ip-address = "0.6" metrics-exporter-prometheus = "0.14" nix = { version = "0.28", features = ["process"] } +num_cpus = "1.16" rand = { version = "0.8", default-features = false } rand_chacha = { version = "0.3", default-features = false } rayon = "1" diff --git a/crates/aot/Cargo.toml b/crates/aot/Cargo.toml index b759c912..b286e5bc 100644 --- a/crates/aot/Cargo.toml +++ b/crates/aot/Cargo.toml @@ -13,6 +13,8 @@ node = [ "crossterm", "snarkos-node/metrics", "metrics-exporter-prometheus", + "num_cpus", + "rayon", ] [dependencies] @@ -29,6 +31,8 @@ indicatif.workspace = true lazy_static.workspace = true metrics-exporter-prometheus = { workspace = true, optional = true } nix.workspace = true +num_cpus = { optional = true, workspace = true } +rayon = { workspace = true, optional = true } rand.workspace = true rand_chacha.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } diff --git a/crates/aot/src/runner/mod.rs b/crates/aot/src/runner/mod.rs index e4e04a3a..0544c5de 100644 --- a/crates/aot/src/runner/mod.rs +++ b/crates/aot/src/runner/mod.rs @@ -95,8 +95,20 @@ pub struct Runner { } impl Runner { + pub fn parse(self) -> Result<()> { + if std::env::var("DEFAULT_RUNTIME").ok().is_some() { + self.start_without_runtime() + } else { + Self::runtime().block_on(async move { self.start().await }) + } + } + #[tokio::main] - pub async fn parse(self) -> Result<()> { + pub async fn start_without_runtime(self) -> Result<()> { + self.start().await + } + + pub async fn start(self) -> Result<()> { let bind_addr = self.bind_addr; let node_ip = SocketAddr::new(bind_addr, self.node); let rest_ip = SocketAddr::new(bind_addr, self.rest); @@ -212,4 +224,33 @@ impl Runner { Ok(()) } + + /// Returns a runtime for the node. + pub fn runtime() -> tokio::runtime::Runtime { + // Retrieve the number of cores. + let num_cores = num_cpus::get(); + + // Initialize the number of tokio worker threads, max tokio blocking threads, + // and rayon cores. Note: We intentionally set the number of tokio + // worker threads and number of rayon cores to be more than the number + // of physical cores, because the node is expected to be I/O-bound. + let (num_tokio_worker_threads, max_tokio_blocking_threads, num_rayon_cores_global) = + (2 * num_cores, 512, num_cores); + + // Initialize the parallelization parameters. + rayon::ThreadPoolBuilder::new() + .stack_size(8 * 1024 * 1024) + .num_threads(num_rayon_cores_global) + .build_global() + .unwrap(); + + // Initialize the runtime configuration. + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_stack_size(8 * 1024 * 1024) + .worker_threads(num_tokio_worker_threads) + .max_blocking_threads(max_tokio_blocking_threads) + .build() + .expect("Failed to initialize a runtime for the router") + } }