From e602a6a27882a5473d2b30bb1698b5c61506856d Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 10:01:26 -0300 Subject: [PATCH 01/12] fix: add build settings --- .cargo/config.toml | 4 ++++ Cargo.toml | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index cf929f1cc..1236d57a3 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -4,3 +4,7 @@ xclippy = [ "-Wclippy::all", "-Wclippy::disallowed-methods", ] + +# https://nnethercote.github.io/perf-book/build-configuration.html#cpu-specific-instructions +[build] +rustflags = ["-C", "target-cpu=native"] \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 22e17525a..a883abff5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,8 @@ deprecated-in-future = "warn" [profile.release] strip = true +codegen-units = 1 # https://nnethercote.github.io/perf-book/build-configuration.html#codegen-units +lto = "fat" # https://nnethercote.github.io/perf-book/build-configuration.html#link-time-optimization [workspace.dependencies] topos-core = { path = "./crates/topos-core", default-features = false } @@ -40,10 +42,10 @@ tokio-stream = { version = "0.1", default-features = false } tower = "0.4" # Blockchain -ethereum-types = { version = "0.13.1"} -secp256k1 = {version = "0.27", features = ["recovery"]} -tiny-keccak = {version = "1.5"} -ethers = {version = "2.0.9", features = ["legacy", "abigen-online"]} +ethereum-types = { version = "0.13.1" } +secp256k1 = { version = "0.27", features = ["recovery"] } +tiny-keccak = { version = "1.5" } +ethers = { version = "2.0.9", features = ["legacy", "abigen-online"] } # Log, Tracing & telemetry opentelemetry = { version = "0.22", features = ["metrics"] } @@ -58,7 +60,7 @@ tracing-opentelemetry = "0.23" tracing-subscriber = { version = "0.3", default-features = false } # gRPC -prost = {version = "0.12"} +prost = { version = "0.12" } tonic = { version = "0.11", default-features = false } tonic-build = { version = "0.11", default-features = false, features = [ "prost", "transport" @@ -72,7 +74,7 @@ http = "0.2.9" tower-http = { version = "0.4", features = ["cors"] } # P2P related -libp2p = { version = "0.53", default-features = false, features = ["noise"]} +libp2p = { version = "0.53", default-features = false, features = ["noise"] } # Serialization & Deserialization bincode = { version = "1.3", default-features = false } @@ -93,5 +95,5 @@ reqwest = { version = "0.11", features = ["json"] } # Tests rstest = { version = "0.17.0", default-features = false } test-log = { version = "0.2", features = ["trace"] } -env_logger = { version = "0.10.0"} # Needed by test-log to print traces in tests -serial_test = {version = "0.9.0"} +env_logger = { version = "0.10.0" } # Needed by test-log to print traces in tests +serial_test = { version = "0.9.0" } From 19da5d9b51fb9f9e8e96ec766ebfaa962eda58b6 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 15:28:42 -0300 Subject: [PATCH 02/12] fix: channel size, timing of parsing messages, local signature hashset --- .../topos-config/src/tce/synchronization.rs | 2 +- crates/topos-p2p/src/behaviour/gossip.rs | 4 +- crates/topos-p2p/src/constants.rs | 6 +-- .../src/proxy.rs | 3 +- crates/topos-tce-api/src/grpc/mod.rs | 4 +- crates/topos-tce-api/src/lib.rs | 4 +- .../src/double_echo/mod.rs | 21 +++++++++- .../src/task_manager/mod.rs | 38 ++++++++++--------- 8 files changed, 53 insertions(+), 29 deletions(-) diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs index 222cd1030..b6ff4050d 100644 --- a/crates/topos-config/src/tce/synchronization.rs +++ b/crates/topos-config/src/tce/synchronization.rs @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig { } impl SynchronizationConfig { - pub const INTERVAL_SECONDS: u64 = 10; + pub const INTERVAL_SECONDS: u64 = 60; pub const LIMIT_PER_SUBNET: usize = 100; const fn default_interval_seconds() -> u64 { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index eaf6991ef..6bf3d341b 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 10; +const MAX_BATCH_SIZE: usize = 1024 * 100; pub struct Behaviour { batch_size: usize, @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(2 * 1024 * 1024) + .max_transmit_size(20 * 1024 * 1024) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-p2p/src/constants.rs b/crates/topos-p2p/src/constants.rs index ad5057e20..3b16983e2 100644 --- a/crates/topos-p2p/src/constants.rs +++ b/crates/topos-p2p/src/constants.rs @@ -12,9 +12,9 @@ lazy_static! { pub static ref EVENT_STREAM_BUFFER: usize = env::var("TCE_EVENT_STREAM_BUFFER") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(2048 * 2); + .unwrap_or(1024 * 20); pub static ref CAPACITY_EVENT_STREAM_BUFFER: usize = EVENT_STREAM_BUFFER - .checked_mul(10) + .checked_mul(1_000) .map(|v| { let r: usize = v.checked_div(100).unwrap_or(*EVENT_STREAM_BUFFER); r @@ -23,7 +23,7 @@ lazy_static! { pub static ref COMMAND_STREAM_BUFFER_SIZE: usize = env::var("TCE_COMMAND_STREAM_BUFFER_SIZE") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(2048); + .unwrap_or(1024 * 20); } pub const DISCOVERY_PROTOCOL: &str = "/tce-disco/1"; diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 5039698bb..e0e93f48d 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(256); + let (command_sender, mut command_rcv) = + mpsc::channel::(1024 * 20); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index ae6f132e4..2463d3ced 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20; pub(crate) mod builder; pub(crate) mod messaging; @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService { .map(move |message| Self::parse_stream(message, stream_id)) .boxed(); - let (command_sender, command_receiver) = mpsc::channel(2048); + let (command_sender, command_receiver) = mpsc::channel(1024 * 20); let (outbound_stream, rx) = mpsc::channel::, OutboundMessage), Status>>( DEFAULT_CHANNEL_STREAM_CAPACITY, diff --git a/crates/topos-tce-api/src/lib.rs b/crates/topos-tce-api/src/lib.rs index 41808c13a..7d1158d52 100644 --- a/crates/topos-tce-api/src/lib.rs +++ b/crates/topos-tce-api/src/lib.rs @@ -9,10 +9,10 @@ mod tests; pub(crate) mod constants { /// Constant size of every channel in the crate - pub(crate) const CHANNEL_SIZE: usize = 2048; + pub(crate) const CHANNEL_SIZE: usize = 1024 * 20; /// Constant size of every transient stream channel in the crate - pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024; + pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20; } pub use runtime::{ error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent, diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 3b9e0274f..b998e77bc 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -49,13 +49,14 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, + pub known_signatures: HashSet, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, } impl DoubleEcho { - pub const MAX_BUFFER_SIZE: usize = 2048; + pub const MAX_BUFFER_SIZE: usize = 1024 * 20; #[allow(clippy::too_many_arguments)] pub fn new( @@ -85,6 +86,7 @@ impl DoubleEcho { }, shutdown, validator_store, + known_signatures: HashSet::new(), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -150,6 +152,12 @@ impl DoubleEcho { continue; } + if self.known_signatures.contains(&signature) { + debug!("ECHO message signature already known: {}", signature); + self.handle_echo(certificate_id, validator_id, signature).await; + continue; + } + let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -159,6 +167,8 @@ impl DoubleEcho { continue; } + self.known_signatures.insert(signature); + self.handle_echo(certificate_id, validator_id, signature).await }, DoubleEchoCommand::Ready { certificate_id, validator_id, signature } => { @@ -168,6 +178,13 @@ impl DoubleEcho { continue; } + if self.known_signatures.contains(&signature) { + debug!("READY message signature already known: {}", signature); + self.handle_ready(certificate_id, validator_id, signature).await; + continue; + } + + let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -177,6 +194,8 @@ impl DoubleEcho { continue; } + self.known_signatures.insert(signature); + self.handle_ready(certificate_id, validator_id, signature).await }, } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index d25e3525d..83931e7ca 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,31 +107,35 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut pending_certificate_interval = tokio::time::interval(Duration::from_micros(500)); + let mut message_interval = tokio::time::interval(Duration::from_millis(100)); loop { tokio::select! { biased; - _ = interval.tick() => { + _ = pending_certificate_interval.tick() => { self.next_pending_certificate(); } - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) + _ = message_interval.tick() => { + if let Some(msg) = self.message_receiver.recv().await { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; + } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + trace!("Received broadcast message for certificate {} ", cert.id); + + self.create_task(cert, need_gossip, pending_id) + } } } } From 7fbbb293bd0c0978fa438f966101aa23d40b8f8f Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 15:37:56 -0300 Subject: [PATCH 03/12] fix: sec updates, update select loop --- Cargo.lock | 48 +++++++++---------- .../src/task_manager/mod.rs | 34 ++++++------- 2 files changed, 39 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14fbf4735..2e56c9525 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2835,7 +2835,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" dependencies = [ "futures-io", - "rustls 0.21.10", + "rustls 0.21.11", ] [[package]] @@ -3019,9 +3019,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.24" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3038,9 +3038,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" dependencies = [ "bytes", "fnv", @@ -3374,14 +3374,14 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "httparse", "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.5", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -3397,7 +3397,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.2", + "h2 0.4.4", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -3416,7 +3416,7 @@ dependencies = [ "futures-util", "http 0.2.11", "hyper 0.14.28", - "rustls 0.21.10", + "rustls 0.21.11", "tokio", "tokio-rustls 0.24.1", ] @@ -4233,7 +4233,7 @@ dependencies = [ "quinn", "rand", "ring 0.16.20", - "rustls 0.21.10", + "rustls 0.21.11", "socket2 0.5.5", "thiserror", "tokio", @@ -4345,7 +4345,7 @@ dependencies = [ "libp2p-identity", "rcgen", "ring 0.16.20", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-webpki 0.101.7", "thiserror", "x509-parser", @@ -6004,7 +6004,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.11", "thiserror", "tokio", "tracing", @@ -6020,7 +6020,7 @@ dependencies = [ "rand", "ring 0.16.20", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.11", "slab", "thiserror", "tinyvec", @@ -6226,7 +6226,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -6240,7 +6240,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -6456,9 +6456,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log", "ring 0.17.7", @@ -6468,9 +6468,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.2" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", "ring 0.17.7", @@ -7521,7 +7521,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls 0.21.11", "tokio", ] @@ -7531,7 +7531,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.2", + "rustls 0.22.4", "rustls-pki-types", "tokio", ] @@ -7556,7 +7556,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", - "rustls 0.21.10", + "rustls 0.21.11", "tokio", "tokio-rustls 0.24.1", "tungstenite", @@ -7679,7 +7679,7 @@ dependencies = [ "axum 0.6.20", "base64 0.21.7", "bytes", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -8608,7 +8608,7 @@ dependencies = [ "httparse", "log", "rand", - "rustls 0.21.10", + "rustls 0.21.11", "sha1", "thiserror", "url", diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 83931e7ca..10ba5c53d 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -108,7 +108,6 @@ impl TaskManager { pub async fn run(mut self, shutdown_receiver: CancellationToken) { let mut pending_certificate_interval = tokio::time::interval(Duration::from_micros(500)); - let mut message_interval = tokio::time::interval(Duration::from_millis(100)); loop { tokio::select! { @@ -118,29 +117,26 @@ impl TaskManager { self.next_pending_certificate(); } - _ = message_interval.tick() => { - if let Some(msg) = self.message_receiver.recv().await { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - trace!("Received broadcast message for certificate {} ", cert.id); + let Some(msg) = self.message_receiver.recv() { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; + } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) - } + self.create_task(cert, need_gossip, pending_id) } } } - Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { trace!("Task for certificate {} finished successfully", certificate_id); From e9bd98ce1e5942390471d857f5383da41b37527a Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 15:39:13 -0300 Subject: [PATCH 04/12] fix: clippy --- crates/topos-tce-broadcast/src/task_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 10ba5c53d..5e96d1b20 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -117,7 +117,7 @@ impl TaskManager { self.next_pending_certificate(); } - let Some(msg) = self.message_receiver.recv() { + Some(msg) = self.message_receiver.recv() => { match msg { DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { if let Some(task_context) = self.tasks.get(&certificate_id) { From d130daea0654e312f7ee3b70f717de664fa97a9c Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 15:49:09 -0300 Subject: [PATCH 05/12] fix: adjust channels, add debug to release profile --- Cargo.toml | 1 + crates/topos-p2p/src/behaviour/gossip.rs | 2 +- crates/topos-tce-broadcast/src/task_manager/mod.rs | 2 +- crates/topos-tce-proxy/src/client.rs | 6 +++--- crates/topos-tce-proxy/src/worker.rs | 4 ++-- crates/topos-tce-synchronizer/src/builder.rs | 2 +- crates/topos-tce/src/app_context.rs | 2 +- 7 files changed, 10 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a883abff5..f758d9b67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ deprecated-in-future = "warn" strip = true codegen-units = 1 # https://nnethercote.github.io/perf-book/build-configuration.html#codegen-units lto = "fat" # https://nnethercote.github.io/perf-book/build-configuration.html#link-time-optimization +debug = true # So we can un `perf` on the Rust binary [workspace.dependencies] topos-core = { path = "./crates/topos-core", default-features = false } diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index 6bf3d341b..ab09cda94 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 1024 * 100; +const MAX_BATCH_SIZE: usize = 1024 * 20; pub struct Behaviour { batch_size: usize, diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 5e96d1b20..7b7d3302a 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,7 +107,7 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut pending_certificate_interval = tokio::time::interval(Duration::from_micros(500)); + let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(200)); loop { tokio::select! { diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index a6682abb5..63d475bfc 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -22,9 +22,9 @@ use topos_core::{ use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100; -const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 100; -const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 100; +const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 1024 * 10; +const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 1024 * 10; +const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 1024 * 10; // Maximum backoff retry timeout in seconds (1 hour) const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600); diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 0869893af..0011e05dc 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -22,8 +22,8 @@ impl TceProxyWorker { /// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet. /// The worker holds a [`crate::client::TceClient`] pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> { - let (command_sender, mut command_rcv) = mpsc::channel::(128); - let (evt_sender, evt_rcv) = mpsc::channel::(128); + let (command_sender, mut command_rcv) = mpsc::channel::(1024 * 20); + let (evt_sender, evt_rcv) = mpsc::channel::(1024 * 20); let (tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::>(1); diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index a7b70e08a..c85bc88d5 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -29,7 +29,7 @@ impl Default for SynchronizerBuilder { network_client: None, store: None, config: SynchronizationConfig::default(), - event_channel_size: 100, + event_channel_size: 1024 * 20, shutdown: None, } } diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index fedc50b66..f7478c508 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -69,7 +69,7 @@ impl AppContext { validator_store: Arc, api_context: RuntimeContext, ) -> (Self, mpsc::Receiver) { - let (events, receiver) = mpsc::channel(100); + let (events, receiver) = mpsc::channel(1024 * 20); ( Self { is_validator, From 922e26da423e07813670ce6a236064b226682d94 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 16:05:37 -0300 Subject: [PATCH 06/12] fix: update channel sizes --- crates/topos-p2p/src/behaviour/gossip.rs | 2 +- crates/topos-p2p/src/constants.rs | 6 +++--- crates/topos-tce-api/src/runtime/mod.rs | 2 +- crates/topos-tce-broadcast/src/constant.rs | 6 +++--- crates/topos-tce-broadcast/src/task_manager/mod.rs | 2 +- crates/topos-tce-proxy/src/client.rs | 6 +++--- crates/topos-tce-proxy/src/worker.rs | 2 +- crates/topos-tce-synchronizer/src/builder.rs | 2 +- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index ab09cda94..5651c9788 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(20 * 1024 * 1024) + .max_transmit_size(20 * 2048 * 2048) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-p2p/src/constants.rs b/crates/topos-p2p/src/constants.rs index 3b16983e2..ad5057e20 100644 --- a/crates/topos-p2p/src/constants.rs +++ b/crates/topos-p2p/src/constants.rs @@ -12,9 +12,9 @@ lazy_static! { pub static ref EVENT_STREAM_BUFFER: usize = env::var("TCE_EVENT_STREAM_BUFFER") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(1024 * 20); + .unwrap_or(2048 * 2); pub static ref CAPACITY_EVENT_STREAM_BUFFER: usize = EVENT_STREAM_BUFFER - .checked_mul(1_000) + .checked_mul(10) .map(|v| { let r: usize = v.checked_div(100).unwrap_or(*EVENT_STREAM_BUFFER); r @@ -23,7 +23,7 @@ lazy_static! { pub static ref COMMAND_STREAM_BUFFER_SIZE: usize = env::var("TCE_COMMAND_STREAM_BUFFER_SIZE") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(1024 * 20); + .unwrap_or(2048); } pub const DISCOVERY_PROTOCOL: &str = "/tce-disco/1"; diff --git a/crates/topos-tce-api/src/runtime/mod.rs b/crates/topos-tce-api/src/runtime/mod.rs index e0e3b2b89..78b3b9235 100644 --- a/crates/topos-tce-api/src/runtime/mod.rs +++ b/crates/topos-tce-api/src/runtime/mod.rs @@ -88,7 +88,7 @@ impl Runtime { } pub async fn launch(mut self) { - let mut health_update = tokio::time::interval(Duration::from_secs(1)); + let mut health_update = tokio::time::interval(Duration::from_secs(10)); let shutdowned: Option> = loop { tokio::select! { shutdown = self.shutdown.recv() => { diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index 30e46f24d..74799481a 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -6,19 +6,19 @@ lazy_static! { std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(1024 * 20); /// Size of the channel between double echo and the task manager pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(20_480); + .unwrap_or(1024 * 20); /// Size of the channel to send protocol events from the double echo pub static ref PROTOCOL_CHANNEL_SIZE: usize = std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(1024 * 20); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE .checked_mul(10) diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 7b7d3302a..cb285b9b5 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -107,7 +107,7 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(200)); + let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(3)); loop { tokio::select! { diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index 63d475bfc..f16e31068 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -22,9 +22,9 @@ use topos_core::{ use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 1024 * 10; -const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 1024 * 10; -const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 1024 * 10; +const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 1024 * 5; +const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 1024 * 5; +const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 1024 * 5; // Maximum backoff retry timeout in seconds (1 hour) const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600); diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 0011e05dc..e8f78234f 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -19,7 +19,7 @@ pub struct TceProxyWorker { } impl TceProxyWorker { - /// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet. + /// Construct a new [`TceProxyWorker`] with a 1024 * 20 items deep channel to send commands to and receive events from a TCE node on the given subnet. /// The worker holds a [`crate::client::TceClient`] pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> { let (command_sender, mut command_rcv) = mpsc::channel::(1024 * 20); diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index c85bc88d5..182b6bf37 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -29,7 +29,7 @@ impl Default for SynchronizerBuilder { network_client: None, store: None, config: SynchronizationConfig::default(), - event_channel_size: 1024 * 20, + event_channel_size: 1024, shutdown: None, } } From 57d260ec9d9ae508f7d428ad59625440b8025c45 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 21:07:43 -0300 Subject: [PATCH 07/12] feat: add lru cache for delivered certificates and known signatures --- Cargo.lock | 7 +- Cargo.toml | 1 + crates/topos-tce-broadcast/Cargo.toml | 1 + .../src/double_echo/mod.rs | 73 +++++++------------ .../src/task_manager/mod.rs | 20 ++++- 5 files changed, 51 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e56c9525..2b3b1b650 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3381,7 +3381,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -4508,9 +4508,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" dependencies = [ "hashbrown 0.14.3", ] @@ -8220,6 +8220,7 @@ dependencies = [ "futures", "hex", "lazy_static", + "lru", "rand", "rand_core", "rstest", diff --git a/Cargo.toml b/Cargo.toml index f758d9b67..dbd8ef2ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ lazy_static = "1" rand = { version = "0.8", default-features = false } rand_core = { version = "0.6", default-features = false } rand_distr = { version = "0.4", default-features = false } +lru = "0.12.3" # Async & Tokio related async-stream = { version = "0.3", default-features = false } diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index 616cfc1f7..97f8a6f65 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -24,6 +24,7 @@ topos-config = { path = "../topos-config/" } topos-metrics = { path = "../topos-metrics/" } topos-tce-storage = { path = "../topos-tce-storage/" } topos-crypto = { path = "../topos-crypto" } +lru.workspace = true [dev-dependencies] criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] } diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index b998e77bc..fe1941e5c 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -15,17 +15,18 @@ use crate::event::ProtocolEvents; use crate::{DoubleEchoCommand, SubscriptionsView}; +use lru::LruCache; use std::collections::HashSet; +use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_util::sync::CancellationToken; use topos_config::tce::broadcast::ReliableBroadcastParams; use topos_core::{types::ValidatorId, uci::CertificateId}; use topos_crypto::messages::{MessageSigner, Signature}; -use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; pub mod broadcast_state; @@ -49,7 +50,7 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, - pub known_signatures: HashSet, + pub known_signatures: LruCache, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, @@ -57,6 +58,7 @@ pub struct DoubleEcho { impl DoubleEcho { pub const MAX_BUFFER_SIZE: usize = 1024 * 20; + pub const KNOWN_SIGNATURES_CACHE_SIZE: usize = 15 * 10_000; #[allow(clippy::too_many_arguments)] pub fn new( @@ -86,7 +88,9 @@ impl DoubleEcho { }, shutdown, validator_store, - known_signatures: HashSet::new(), + known_signatures: LruCache::new( + NonZeroUsize::new(Self::KNOWN_SIGNATURES_CACHE_SIZE).unwrap(), + ), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -167,7 +171,7 @@ impl DoubleEcho { continue; } - self.known_signatures.insert(signature); + self.known_signatures.push(signature, ()); self.handle_echo(certificate_id, validator_id, signature).await }, @@ -184,7 +188,6 @@ impl DoubleEcho { continue; } - let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -194,7 +197,7 @@ impl DoubleEcho { continue; } - self.known_signatures.insert(signature); + self.known_signatures.push(signature, ()); self.handle_ready(certificate_id, validator_id, signature).await }, @@ -230,26 +233,14 @@ impl DoubleEcho { validator_id: ValidatorId, signature: Signature, ) { - match self.validator_store.get_certificate(&certificate_id) { - Err(storage_error) => error!( - "Unable to get the Certificate {} due to {:?}", - &certificate_id, storage_error - ), - Ok(Some(_)) => debug!( - "Certificate {} already delivered, ignoring echo", - &certificate_id - ), - Ok(None) => { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Echo { - validator_id, - certificate_id, - signature, - }) - .await; - } - } + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Echo { + validator_id, + certificate_id, + signature, + }) + .await; } pub async fn handle_ready( @@ -258,25 +249,13 @@ impl DoubleEcho { validator_id: ValidatorId, signature: Signature, ) { - match self.validator_store.get_certificate(&certificate_id) { - Err(storage_error) => error!( - "Unable to get the Certificate {} due to {:?}", - &certificate_id, storage_error - ), - Ok(Some(_)) => debug!( - "Certificate {} already delivered, ignoring echo", - &certificate_id - ), - Ok(None) => { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Ready { - validator_id, - certificate_id, - signature, - }) - .await; - } - } + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Ready { + validator_id, + certificate_id, + signature, + }) + .await; } } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index cb285b9b5..9118ec1f0 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -2,8 +2,10 @@ use crate::event::ProtocolEvents; use futures::stream::FuturesUnordered; use futures::Future; use futures::StreamExt; +use lru::LruCache; use std::collections::HashMap; use std::future::IntoFuture; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -52,11 +54,14 @@ pub struct TaskManager { pub thresholds: ReliableBroadcastParams, pub validator_id: ValidatorId, pub validator_store: Arc, + pub delivered_certificates: LruCache, pub broadcast_sender: broadcast::Sender, pub latest_pending_id: PendingCertificateId, } impl TaskManager { + pub const DELIVERED_CERTIFICATES_CACHE_SIZE: usize = 20_000; + #[allow(clippy::too_many_arguments)] pub fn new( message_receiver: mpsc::Receiver, @@ -79,6 +84,9 @@ impl TaskManager { message_signer, thresholds, validator_store, + delivered_certificates: LruCache::new( + NonZeroUsize::new(Self::DELIVERED_CERTIFICATES_CACHE_SIZE).unwrap(), + ), broadcast_sender, latest_pending_id: 0, } @@ -120,6 +128,11 @@ impl TaskManager { Some(msg) = self.message_receiver.recv() => { match msg { DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if self.delivered_certificates.contains(&certificate_id) { + trace!("Received message for certificate {} that has already been delivered", certificate_id); + continue; + } + if let Some(task_context) = self.tasks.get(&certificate_id) { _ = task_context.sink.send(msg).await; } else { @@ -130,6 +143,11 @@ impl TaskManager { }; } DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + if self.delivered_certificates.contains(&cert.id) { + trace!("Received message for certificate {} that has already been delivered", cert.id); + continue; + } + trace!("Received broadcast message for certificate {} ", cert.id); self.create_task(cert, need_gossip, pending_id) @@ -139,10 +157,10 @@ impl TaskManager { Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { + self.delivered_certificates.put(certificate_id, ()); trace!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); - } else { error!("Task for certificate {} finished unsuccessfully", certificate_id); } From b7cbb301100ae796b66babddf50dbc4aa6e966f4 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 21:12:02 -0300 Subject: [PATCH 08/12] fix: add interval for processing double echo messages --- .../src/task_manager/mod.rs | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 9118ec1f0..07d0038b7 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -115,7 +115,8 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(3)); + let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(10)); + let mut double_echo_messages_interval = tokio::time::interval(Duration::from_millis(50)); loop { tokio::select! { @@ -125,32 +126,34 @@ impl TaskManager { self.next_pending_certificate(); } - Some(msg) = self.message_receiver.recv() => { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if self.delivered_certificates.contains(&certificate_id) { - trace!("Received message for certificate {} that has already been delivered", certificate_id); - continue; - } + _ = double_echo_messages_interval.tick() => { + if let Some(msg) = self.message_receiver.recv().await { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if self.delivered_certificates.contains(&certificate_id) { + trace!("Received message for certificate {} that has already been delivered", certificate_id); + continue; + } - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; - } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - if self.delivered_certificates.contains(&cert.id) { - trace!("Received message for certificate {} that has already been delivered", cert.id); - continue; + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + if self.delivered_certificates.contains(&cert.id) { + trace!("Received message for certificate {} that has already been delivered", cert.id); + continue; + } - trace!("Received broadcast message for certificate {} ", cert.id); + trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) + self.create_task(cert, need_gossip, pending_id) + } } } } From 125b772ce0fceaa4eedc7cc5a919a7f581181622 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 21:18:51 -0300 Subject: [PATCH 09/12] fix: remove interval --- .../src/task_manager/mod.rs | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 07d0038b7..33e5b98cd 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -116,7 +116,6 @@ impl TaskManager { pub async fn run(mut self, shutdown_receiver: CancellationToken) { let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(10)); - let mut double_echo_messages_interval = tokio::time::interval(Duration::from_millis(50)); loop { tokio::select! { @@ -126,34 +125,32 @@ impl TaskManager { self.next_pending_certificate(); } - _ = double_echo_messages_interval.tick() => { - if let Some(msg) = self.message_receiver.recv().await { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if self.delivered_certificates.contains(&certificate_id) { - trace!("Received message for certificate {} that has already been delivered", certificate_id); - continue; - } + Some(msg) = self.message_receiver.recv() => { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if self.delivered_certificates.contains(&certificate_id) { + trace!("Received message for certificate {} that has already been delivered", certificate_id); + continue; + } - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; + } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + if self.delivered_certificates.contains(&cert.id) { + trace!("Received message for certificate {} that has already been delivered", cert.id); + continue; } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - if self.delivered_certificates.contains(&cert.id) { - trace!("Received message for certificate {} that has already been delivered", cert.id); - continue; - } - trace!("Received broadcast message for certificate {} ", cert.id); + trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) - } + self.create_task(cert, need_gossip, pending_id) } } } From c6b35877efaf38aead8c22ab28db9b22b777b1b8 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 21:42:27 -0300 Subject: [PATCH 10/12] fix: replace LRU with a HashSet --- crates/topos-tce-broadcast/src/double_echo/mod.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index fe1941e5c..ce676a24d 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -15,7 +15,6 @@ use crate::event::ProtocolEvents; use crate::{DoubleEchoCommand, SubscriptionsView}; -use lru::LruCache; use std::collections::HashSet; use std::num::NonZeroUsize; use std::sync::Arc; @@ -50,7 +49,7 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, - pub known_signatures: LruCache, + pub known_signatures: HashSet, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, @@ -58,7 +57,6 @@ pub struct DoubleEcho { impl DoubleEcho { pub const MAX_BUFFER_SIZE: usize = 1024 * 20; - pub const KNOWN_SIGNATURES_CACHE_SIZE: usize = 15 * 10_000; #[allow(clippy::too_many_arguments)] pub fn new( @@ -88,9 +86,7 @@ impl DoubleEcho { }, shutdown, validator_store, - known_signatures: LruCache::new( - NonZeroUsize::new(Self::KNOWN_SIGNATURES_CACHE_SIZE).unwrap(), - ), + known_signatures: HashSet::new(), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -171,7 +167,7 @@ impl DoubleEcho { continue; } - self.known_signatures.push(signature, ()); + self.known_signatures.insert(signature); self.handle_echo(certificate_id, validator_id, signature).await }, @@ -197,7 +193,7 @@ impl DoubleEcho { continue; } - self.known_signatures.push(signature, ()); + self.known_signatures.insert(signature); self.handle_ready(certificate_id, validator_id, signature).await }, From 4a28799035af8d90e6c26c3f84cbecb222a4dc07 Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Wed, 24 Apr 2024 21:48:54 -0300 Subject: [PATCH 11/12] fix: clippy --- crates/topos-tce-broadcast/src/double_echo/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index ce676a24d..d8237c96d 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -16,7 +16,6 @@ use crate::event::ProtocolEvents; use crate::{DoubleEchoCommand, SubscriptionsView}; use std::collections::HashSet; -use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_util::sync::CancellationToken; From 403fb4b305a48ffa8245066b2f030daae8a8cafe Mon Sep 17 00:00:00 2001 From: Bastian Gruber Date: Tue, 30 Apr 2024 11:31:21 -0300 Subject: [PATCH 12/12] fix: activate perf settings --- .cargo/config.toml | 2 +- Cargo.toml | 15 +++++++++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 1236d57a3..cc4946299 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -7,4 +7,4 @@ xclippy = [ # https://nnethercote.github.io/perf-book/build-configuration.html#cpu-specific-instructions [build] -rustflags = ["-C", "target-cpu=native"] \ No newline at end of file +rustflags = ["-C", "target-cpu=native"] diff --git a/Cargo.toml b/Cargo.toml index dbd8ef2ee..f68ce2359 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,18 @@ missing_docs = "allow" # Warn deprecated-in-future = "warn" +# Use this for performance +#[profile.release] +#strip = true +#codegen-units = 1 # https://nnethercote.github.io/perf-book/build-configuration.html#codegen-units +#lto = "fat" # https://nnethercote.github.io/perf-book/build-configuration.html#link-time-optimization +#debug = false + +# Use this for perf analysis [profile.release] -strip = true -codegen-units = 1 # https://nnethercote.github.io/perf-book/build-configuration.html#codegen-units -lto = "fat" # https://nnethercote.github.io/perf-book/build-configuration.html#link-time-optimization -debug = true # So we can un `perf` on the Rust binary +strip = false +debug = true +lto = false [workspace.dependencies] topos-core = { path = "./crates/topos-core", default-features = false }