diff --git a/.cargo/config.toml b/.cargo/config.toml index cf929f1cc..cc4946299 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -4,3 +4,7 @@ xclippy = [ "-Wclippy::all", "-Wclippy::disallowed-methods", ] + +# https://nnethercote.github.io/perf-book/build-configuration.html#cpu-specific-instructions +[build] +rustflags = ["-C", "target-cpu=native"] diff --git a/Cargo.lock b/Cargo.lock index 14fbf4735..2b3b1b650 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2835,7 +2835,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" dependencies = [ "futures-io", - "rustls 0.21.10", + "rustls 0.21.11", ] [[package]] @@ -3019,9 +3019,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.24" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3038,9 +3038,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.2" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31d030e59af851932b72ceebadf4a2b5986dba4c3b99dd2493f8273a0f151943" +checksum = "816ec7294445779408f36fe57bc5b7fc1cf59664059096c65f905c1c61f58069" dependencies = [ "bytes", "fnv", @@ -3374,7 +3374,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "httparse", @@ -3397,7 +3397,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.2", + "h2 0.4.4", "http 1.0.0", "http-body 1.0.0", "httparse", @@ -3416,7 +3416,7 @@ dependencies = [ "futures-util", "http 0.2.11", "hyper 0.14.28", - "rustls 0.21.10", + "rustls 0.21.11", "tokio", "tokio-rustls 0.24.1", ] @@ -4233,7 +4233,7 @@ dependencies = [ "quinn", "rand", "ring 0.16.20", - "rustls 0.21.10", + "rustls 0.21.11", "socket2 0.5.5", "thiserror", "tokio", @@ -4345,7 +4345,7 @@ dependencies = [ "libp2p-identity", "rcgen", "ring 0.16.20", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-webpki 0.101.7", "thiserror", "x509-parser", @@ -4508,9 +4508,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" dependencies = [ "hashbrown 0.14.3", ] @@ -6004,7 +6004,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.11", "thiserror", "tokio", "tracing", @@ -6020,7 +6020,7 @@ dependencies = [ "rand", "ring 0.16.20", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.11", "slab", "thiserror", "tinyvec", @@ -6226,7 +6226,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -6240,7 +6240,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.10", + "rustls 0.21.11", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -6456,9 +6456,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" dependencies = [ "log", "ring 0.17.7", @@ -6468,9 +6468,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.2" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" dependencies = [ "log", "ring 0.17.7", @@ -7521,7 +7521,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls 0.21.11", "tokio", ] @@ -7531,7 +7531,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.2", + "rustls 0.22.4", "rustls-pki-types", "tokio", ] @@ -7556,7 +7556,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", - "rustls 0.21.10", + "rustls 0.21.11", "tokio", "tokio-rustls 0.24.1", "tungstenite", @@ -7679,7 +7679,7 @@ dependencies = [ "axum 0.6.20", "base64 0.21.7", "bytes", - "h2 0.3.24", + "h2 0.3.26", "http 0.2.11", "http-body 0.4.6", "hyper 0.14.28", @@ -8220,6 +8220,7 @@ dependencies = [ "futures", "hex", "lazy_static", + "lru", "rand", "rand_core", "rstest", @@ -8608,7 +8609,7 @@ dependencies = [ "httparse", "log", "rand", - "rustls 0.21.10", + "rustls 0.21.11", "sha1", "thiserror", "url", diff --git a/Cargo.toml b/Cargo.toml index 22e17525a..f68ce2359 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,18 @@ missing_docs = "allow" # Warn deprecated-in-future = "warn" +# Use this for performance +#[profile.release] +#strip = true +#codegen-units = 1 # https://nnethercote.github.io/perf-book/build-configuration.html#codegen-units +#lto = "fat" # https://nnethercote.github.io/perf-book/build-configuration.html#link-time-optimization +#debug = false + +# Use this for perf analysis [profile.release] -strip = true +strip = false +debug = true +lto = false [workspace.dependencies] topos-core = { path = "./crates/topos-core", default-features = false } @@ -29,6 +39,7 @@ lazy_static = "1" rand = { version = "0.8", default-features = false } rand_core = { version = "0.6", default-features = false } rand_distr = { version = "0.4", default-features = false } +lru = "0.12.3" # Async & Tokio related async-stream = { version = "0.3", default-features = false } @@ -40,10 +51,10 @@ tokio-stream = { version = "0.1", default-features = false } tower = "0.4" # Blockchain -ethereum-types = { version = "0.13.1"} -secp256k1 = {version = "0.27", features = ["recovery"]} -tiny-keccak = {version = "1.5"} -ethers = {version = "2.0.9", features = ["legacy", "abigen-online"]} +ethereum-types = { version = "0.13.1" } +secp256k1 = { version = "0.27", features = ["recovery"] } +tiny-keccak = { version = "1.5" } +ethers = { version = "2.0.9", features = ["legacy", "abigen-online"] } # Log, Tracing & telemetry opentelemetry = { version = "0.22", features = ["metrics"] } @@ -58,7 +69,7 @@ tracing-opentelemetry = "0.23" tracing-subscriber = { version = "0.3", default-features = false } # gRPC -prost = {version = "0.12"} +prost = { version = "0.12" } tonic = { version = "0.11", default-features = false } tonic-build = { version = "0.11", default-features = false, features = [ "prost", "transport" @@ -72,7 +83,7 @@ http = "0.2.9" tower-http = { version = "0.4", features = ["cors"] } # P2P related -libp2p = { version = "0.53", default-features = false, features = ["noise"]} +libp2p = { version = "0.53", default-features = false, features = ["noise"] } # Serialization & Deserialization bincode = { version = "1.3", default-features = false } @@ -93,5 +104,5 @@ reqwest = { version = "0.11", features = ["json"] } # Tests rstest = { version = "0.17.0", default-features = false } test-log = { version = "0.2", features = ["trace"] } -env_logger = { version = "0.10.0"} # Needed by test-log to print traces in tests -serial_test = {version = "0.9.0"} +env_logger = { version = "0.10.0" } # Needed by test-log to print traces in tests +serial_test = { version = "0.9.0" } diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs index 222cd1030..b6ff4050d 100644 --- a/crates/topos-config/src/tce/synchronization.rs +++ b/crates/topos-config/src/tce/synchronization.rs @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig { } impl SynchronizationConfig { - pub const INTERVAL_SECONDS: u64 = 10; + pub const INTERVAL_SECONDS: u64 = 60; pub const LIMIT_PER_SUBNET: usize = 100; const fn default_interval_seconds() -> u64 { diff --git a/crates/topos-p2p/src/behaviour/gossip.rs b/crates/topos-p2p/src/behaviour/gossip.rs index eaf6991ef..5651c9788 100644 --- a/crates/topos-p2p/src/behaviour/gossip.rs +++ b/crates/topos-p2p/src/behaviour/gossip.rs @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA use super::HealthStatus; -const MAX_BATCH_SIZE: usize = 10; +const MAX_BATCH_SIZE: usize = 1024 * 20; pub struct Behaviour { batch_size: usize, @@ -76,7 +76,7 @@ impl Behaviour { .unwrap_or(Ok(MAX_BATCH_SIZE)) .unwrap(); let gossipsub = gossipsub::ConfigBuilder::default() - .max_transmit_size(2 * 1024 * 1024) + .max_transmit_size(20 * 2048 * 2048) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(|msg_id| { // Content based id diff --git a/crates/topos-sequencer-subnet-runtime/src/proxy.rs b/crates/topos-sequencer-subnet-runtime/src/proxy.rs index 5039698bb..e0e93f48d 100644 --- a/crates/topos-sequencer-subnet-runtime/src/proxy.rs +++ b/crates/topos-sequencer-subnet-runtime/src/proxy.rs @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy { address: {}, ", &config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address ); - let (command_sender, mut command_rcv) = mpsc::channel::(256); + let (command_sender, mut command_rcv) = + mpsc::channel::(1024 * 20); let ws_runtime_endpoint = config.ws_endpoint.clone(); let http_runtime_endpoint = config.http_endpoint.clone(); let subnet_contract_address = Arc::new(config.subnet_contract_address.clone()); diff --git a/crates/topos-tce-api/src/grpc/mod.rs b/crates/topos-tce-api/src/grpc/mod.rs index ae6f132e4..2463d3ced 100644 --- a/crates/topos-tce-api/src/grpc/mod.rs +++ b/crates/topos-tce-api/src/grpc/mod.rs @@ -29,7 +29,7 @@ pub(crate) mod console; #[cfg(test)] mod tests; -const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100; +const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20; pub(crate) mod builder; pub(crate) mod messaging; @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService { .map(move |message| Self::parse_stream(message, stream_id)) .boxed(); - let (command_sender, command_receiver) = mpsc::channel(2048); + let (command_sender, command_receiver) = mpsc::channel(1024 * 20); let (outbound_stream, rx) = mpsc::channel::, OutboundMessage), Status>>( DEFAULT_CHANNEL_STREAM_CAPACITY, diff --git a/crates/topos-tce-api/src/lib.rs b/crates/topos-tce-api/src/lib.rs index 41808c13a..7d1158d52 100644 --- a/crates/topos-tce-api/src/lib.rs +++ b/crates/topos-tce-api/src/lib.rs @@ -9,10 +9,10 @@ mod tests; pub(crate) mod constants { /// Constant size of every channel in the crate - pub(crate) const CHANNEL_SIZE: usize = 2048; + pub(crate) const CHANNEL_SIZE: usize = 1024 * 20; /// Constant size of every transient stream channel in the crate - pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024; + pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20; } pub use runtime::{ error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent, diff --git a/crates/topos-tce-api/src/runtime/mod.rs b/crates/topos-tce-api/src/runtime/mod.rs index e0e3b2b89..78b3b9235 100644 --- a/crates/topos-tce-api/src/runtime/mod.rs +++ b/crates/topos-tce-api/src/runtime/mod.rs @@ -88,7 +88,7 @@ impl Runtime { } pub async fn launch(mut self) { - let mut health_update = tokio::time::interval(Duration::from_secs(1)); + let mut health_update = tokio::time::interval(Duration::from_secs(10)); let shutdowned: Option> = loop { tokio::select! { shutdown = self.shutdown.recv() => { diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index 616cfc1f7..97f8a6f65 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -24,6 +24,7 @@ topos-config = { path = "../topos-config/" } topos-metrics = { path = "../topos-metrics/" } topos-tce-storage = { path = "../topos-tce-storage/" } topos-crypto = { path = "../topos-crypto" } +lru.workspace = true [dev-dependencies] criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] } diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index 30e46f24d..74799481a 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -6,19 +6,19 @@ lazy_static! { std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(1024 * 20); /// Size of the channel between double echo and the task manager pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize = std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(20_480); + .unwrap_or(1024 * 20); /// Size of the channel to send protocol events from the double echo pub static ref PROTOCOL_CHANNEL_SIZE: usize = std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(2048); + .unwrap_or(1024 * 20); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE .checked_mul(10) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 3b9e0274f..d8237c96d 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -22,10 +22,9 @@ use tokio_util::sync::CancellationToken; use topos_config::tce::broadcast::ReliableBroadcastParams; use topos_core::{types::ValidatorId, uci::CertificateId}; use topos_crypto::messages::{MessageSigner, Signature}; -use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use tracing::{debug, error, info, warn}; +use tracing::{debug, info, warn}; pub mod broadcast_state; @@ -49,13 +48,14 @@ pub struct DoubleEcho { /// List of approved validators through smart contract and/or genesis pub validators: HashSet, pub validator_store: Arc, + pub known_signatures: HashSet, pub broadcast_sender: broadcast::Sender, pub task_manager_cancellation: CancellationToken, } impl DoubleEcho { - pub const MAX_BUFFER_SIZE: usize = 2048; + pub const MAX_BUFFER_SIZE: usize = 1024 * 20; #[allow(clippy::too_many_arguments)] pub fn new( @@ -85,6 +85,7 @@ impl DoubleEcho { }, shutdown, validator_store, + known_signatures: HashSet::new(), broadcast_sender, task_manager_cancellation: CancellationToken::new(), } @@ -150,6 +151,12 @@ impl DoubleEcho { continue; } + if self.known_signatures.contains(&signature) { + debug!("ECHO message signature already known: {}", signature); + self.handle_echo(certificate_id, validator_id, signature).await; + continue; + } + let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -159,6 +166,8 @@ impl DoubleEcho { continue; } + self.known_signatures.insert(signature); + self.handle_echo(certificate_id, validator_id, signature).await }, DoubleEchoCommand::Ready { certificate_id, validator_id, signature } => { @@ -168,6 +177,12 @@ impl DoubleEcho { continue; } + if self.known_signatures.contains(&signature) { + debug!("READY message signature already known: {}", signature); + self.handle_ready(certificate_id, validator_id, signature).await; + continue; + } + let mut payload = Vec::new(); payload.extend_from_slice(certificate_id.as_array()); payload.extend_from_slice(validator_id.as_bytes()); @@ -177,6 +192,8 @@ impl DoubleEcho { continue; } + self.known_signatures.insert(signature); + self.handle_ready(certificate_id, validator_id, signature).await }, } @@ -211,26 +228,14 @@ impl DoubleEcho { validator_id: ValidatorId, signature: Signature, ) { - match self.validator_store.get_certificate(&certificate_id) { - Err(storage_error) => error!( - "Unable to get the Certificate {} due to {:?}", - &certificate_id, storage_error - ), - Ok(Some(_)) => debug!( - "Certificate {} already delivered, ignoring echo", - &certificate_id - ), - Ok(None) => { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Echo { - validator_id, - certificate_id, - signature, - }) - .await; - } - } + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Echo { + validator_id, + certificate_id, + signature, + }) + .await; } pub async fn handle_ready( @@ -239,25 +244,13 @@ impl DoubleEcho { validator_id: ValidatorId, signature: Signature, ) { - match self.validator_store.get_certificate(&certificate_id) { - Err(storage_error) => error!( - "Unable to get the Certificate {} due to {:?}", - &certificate_id, storage_error - ), - Ok(Some(_)) => debug!( - "Certificate {} already delivered, ignoring echo", - &certificate_id - ), - Ok(None) => { - let _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Ready { - validator_id, - certificate_id, - signature, - }) - .await; - } - } + let _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Ready { + validator_id, + certificate_id, + signature, + }) + .await; } } diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index d25e3525d..33e5b98cd 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -2,8 +2,10 @@ use crate::event::ProtocolEvents; use futures::stream::FuturesUnordered; use futures::Future; use futures::StreamExt; +use lru::LruCache; use std::collections::HashMap; use std::future::IntoFuture; +use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -52,11 +54,14 @@ pub struct TaskManager { pub thresholds: ReliableBroadcastParams, pub validator_id: ValidatorId, pub validator_store: Arc, + pub delivered_certificates: LruCache, pub broadcast_sender: broadcast::Sender, pub latest_pending_id: PendingCertificateId, } impl TaskManager { + pub const DELIVERED_CERTIFICATES_CACHE_SIZE: usize = 20_000; + #[allow(clippy::too_many_arguments)] pub fn new( message_receiver: mpsc::Receiver, @@ -79,6 +84,9 @@ impl TaskManager { message_signer, thresholds, validator_store, + delivered_certificates: LruCache::new( + NonZeroUsize::new(Self::DELIVERED_CERTIFICATES_CACHE_SIZE).unwrap(), + ), broadcast_sender, latest_pending_id: 0, } @@ -107,18 +115,24 @@ impl TaskManager { } pub async fn run(mut self, shutdown_receiver: CancellationToken) { - let mut interval = tokio::time::interval(Duration::from_secs(1)); + let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(10)); loop { tokio::select! { biased; - _ = interval.tick() => { + _ = pending_certificate_interval.tick() => { self.next_pending_certificate(); } + Some(msg) = self.message_receiver.recv() => { match msg { DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if self.delivered_certificates.contains(&certificate_id) { + trace!("Received message for certificate {} that has already been delivered", certificate_id); + continue; + } + if let Some(task_context) = self.tasks.get(&certificate_id) { _ = task_context.sink.send(msg).await; } else { @@ -129,6 +143,11 @@ impl TaskManager { }; } DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + if self.delivered_certificates.contains(&cert.id) { + trace!("Received message for certificate {} that has already been delivered", cert.id); + continue; + } + trace!("Received broadcast message for certificate {} ", cert.id); self.create_task(cert, need_gossip, pending_id) @@ -136,13 +155,12 @@ impl TaskManager { } } - Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { + self.delivered_certificates.put(certificate_id, ()); trace!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); - } else { error!("Task for certificate {} finished unsuccessfully", certificate_id); } diff --git a/crates/topos-tce-proxy/src/client.rs b/crates/topos-tce-proxy/src/client.rs index a6682abb5..f16e31068 100644 --- a/crates/topos-tce-proxy/src/client.rs +++ b/crates/topos-tce-proxy/src/client.rs @@ -22,9 +22,9 @@ use topos_core::{ use tracing::{debug, error, info, info_span, warn, Instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; -const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100; -const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 100; -const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 100; +const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 1024 * 5; +const CERTIFICATE_INBOUND_CHANNEL_SIZE: usize = 1024 * 5; +const TCE_PROXY_COMMAND_CHANNEL_SIZE: usize = 1024 * 5; // Maximum backoff retry timeout in seconds (1 hour) const TCE_SUBMIT_CERTIFICATE_BACKOFF_TIMEOUT: Duration = Duration::from_secs(3600); diff --git a/crates/topos-tce-proxy/src/worker.rs b/crates/topos-tce-proxy/src/worker.rs index 0869893af..e8f78234f 100644 --- a/crates/topos-tce-proxy/src/worker.rs +++ b/crates/topos-tce-proxy/src/worker.rs @@ -19,11 +19,11 @@ pub struct TceProxyWorker { } impl TceProxyWorker { - /// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet. + /// Construct a new [`TceProxyWorker`] with a 1024 * 20 items deep channel to send commands to and receive events from a TCE node on the given subnet. /// The worker holds a [`crate::client::TceClient`] pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> { - let (command_sender, mut command_rcv) = mpsc::channel::(128); - let (evt_sender, evt_rcv) = mpsc::channel::(128); + let (command_sender, mut command_rcv) = mpsc::channel::(1024 * 20); + let (evt_sender, evt_rcv) = mpsc::channel::(1024 * 20); let (tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::>(1); diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index a7b70e08a..182b6bf37 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -29,7 +29,7 @@ impl Default for SynchronizerBuilder { network_client: None, store: None, config: SynchronizationConfig::default(), - event_channel_size: 100, + event_channel_size: 1024, shutdown: None, } } diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index fedc50b66..f7478c508 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -69,7 +69,7 @@ impl AppContext { validator_store: Arc, api_context: RuntimeContext, ) -> (Self, mpsc::Receiver) { - let (events, receiver) = mpsc::channel(100); + let (events, receiver) = mpsc::channel(1024 * 20); ( Self { is_validator,