Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add build optimizations, increase channel size, add local cache #493

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ xclippy = [
"-Wclippy::all",
"-Wclippy::disallowed-methods",
]

# https://nnethercote.github.io/perf-book/build-configuration.html#cpu-specific-instructions
[build]
rustflags = ["-C", "target-cpu=native"]
51 changes: 26 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 20 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,18 @@ missing_docs = "allow"
# Warn
deprecated-in-future = "warn"

# Use this for performance
#[profile.release]
#strip = true
#codegen-units = 1 # https://nnethercote.github.io/perf-book/build-configuration.html#codegen-units
#lto = "fat" # https://nnethercote.github.io/perf-book/build-configuration.html#link-time-optimization
#debug = false

# Use this for perf analysis
[profile.release]
strip = true
strip = false
debug = true
lto = false

[workspace.dependencies]
topos-core = { path = "./crates/topos-core", default-features = false }
Expand All @@ -29,6 +39,7 @@ lazy_static = "1"
rand = { version = "0.8", default-features = false }
rand_core = { version = "0.6", default-features = false }
rand_distr = { version = "0.4", default-features = false }
lru = "0.12.3"

# Async & Tokio related
async-stream = { version = "0.3", default-features = false }
Expand All @@ -40,10 +51,10 @@ tokio-stream = { version = "0.1", default-features = false }
tower = "0.4"

# Blockchain
ethereum-types = { version = "0.13.1"}
secp256k1 = {version = "0.27", features = ["recovery"]}
tiny-keccak = {version = "1.5"}
ethers = {version = "2.0.9", features = ["legacy", "abigen-online"]}
ethereum-types = { version = "0.13.1" }
secp256k1 = { version = "0.27", features = ["recovery"] }
tiny-keccak = { version = "1.5" }
ethers = { version = "2.0.9", features = ["legacy", "abigen-online"] }

# Log, Tracing & telemetry
opentelemetry = { version = "0.22", features = ["metrics"] }
Expand All @@ -58,7 +69,7 @@ tracing-opentelemetry = "0.23"
tracing-subscriber = { version = "0.3", default-features = false }

# gRPC
prost = {version = "0.12"}
prost = { version = "0.12" }
tonic = { version = "0.11", default-features = false }
tonic-build = { version = "0.11", default-features = false, features = [
"prost", "transport"
Expand All @@ -72,7 +83,7 @@ http = "0.2.9"
tower-http = { version = "0.4", features = ["cors"] }

# P2P related
libp2p = { version = "0.53", default-features = false, features = ["noise"]}
libp2p = { version = "0.53", default-features = false, features = ["noise"] }

# Serialization & Deserialization
bincode = { version = "1.3", default-features = false }
Expand All @@ -93,5 +104,5 @@ reqwest = { version = "0.11", features = ["json"] }
# Tests
rstest = { version = "0.17.0", default-features = false }
test-log = { version = "0.2", features = ["trace"] }
env_logger = { version = "0.10.0"} # Needed by test-log to print traces in tests
serial_test = {version = "0.9.0"}
env_logger = { version = "0.10.0" } # Needed by test-log to print traces in tests
serial_test = { version = "0.9.0" }
2 changes: 1 addition & 1 deletion crates/topos-config/src/tce/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Default for SynchronizationConfig {
}

impl SynchronizationConfig {
pub const INTERVAL_SECONDS: u64 = 10;
pub const INTERVAL_SECONDS: u64 = 60;
pub const LIMIT_PER_SUBNET: usize = 100;

const fn default_interval_seconds() -> u64 {
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-p2p/src/behaviour/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_REA

use super::HealthStatus;

const MAX_BATCH_SIZE: usize = 10;
const MAX_BATCH_SIZE: usize = 1024 * 20;

pub struct Behaviour {
batch_size: usize,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Behaviour {
.unwrap_or(Ok(MAX_BATCH_SIZE))
.unwrap();
let gossipsub = gossipsub::ConfigBuilder::default()
.max_transmit_size(2 * 1024 * 1024)
.max_transmit_size(20 * 2048 * 2048)
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(|msg_id| {
// Content based id
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-sequencer-subnet-runtime/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl SubnetRuntimeProxy {
address: {}, ",
&config.http_endpoint, &config.ws_endpoint, &config.subnet_contract_address
);
let (command_sender, mut command_rcv) = mpsc::channel::<SubnetRuntimeProxyCommand>(256);
let (command_sender, mut command_rcv) =
mpsc::channel::<SubnetRuntimeProxyCommand>(1024 * 20);
let ws_runtime_endpoint = config.ws_endpoint.clone();
let http_runtime_endpoint = config.http_endpoint.clone();
let subnet_contract_address = Arc::new(config.subnet_contract_address.clone());
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(crate) mod console;
#[cfg(test)]
mod tests;

const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 100;
const DEFAULT_CHANNEL_STREAM_CAPACITY: usize = 1024 * 20;

pub(crate) mod builder;
pub(crate) mod messaging;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl ApiService for TceGrpcService {
.map(move |message| Self::parse_stream(message, stream_id))
.boxed();

let (command_sender, command_receiver) = mpsc::channel(2048);
let (command_sender, command_receiver) = mpsc::channel(1024 * 20);

let (outbound_stream, rx) = mpsc::channel::<Result<(Option<Uuid>, OutboundMessage), Status>>(
DEFAULT_CHANNEL_STREAM_CAPACITY,
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ mod tests;

pub(crate) mod constants {
/// Constant size of every channel in the crate
pub(crate) const CHANNEL_SIZE: usize = 2048;
pub(crate) const CHANNEL_SIZE: usize = 1024 * 20;

/// Constant size of every transient stream channel in the crate
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024;
pub(crate) const TRANSIENT_STREAM_CHANNEL_SIZE: usize = 1024 * 20;
}
pub use runtime::{
error::RuntimeError, Runtime, RuntimeClient, RuntimeCommand, RuntimeContext, RuntimeEvent,
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-api/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Runtime {
}

pub async fn launch(mut self) {
let mut health_update = tokio::time::interval(Duration::from_secs(1));
let mut health_update = tokio::time::interval(Duration::from_secs(10));
let shutdowned: Option<oneshot::Sender<()>> = loop {
tokio::select! {
shutdown = self.shutdown.recv() => {
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ topos-config = { path = "../topos-config/" }
topos-metrics = { path = "../topos-metrics/" }
topos-tce-storage = { path = "../topos-tce-storage/" }
topos-crypto = { path = "../topos-crypto" }
lru.workspace = true

[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_futures", "async_tokio"] }
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-broadcast/src/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ lazy_static! {
std::env::var("TOPOS_DOUBLE_ECHO_COMMAND_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
/// Size of the channel between double echo and the task manager
pub static ref BROADCAST_TASK_MANAGER_CHANNEL_SIZE: usize =
std::env::var("TOPOS_BROADCAST_TASK_MANAGER_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(20_480);
.unwrap_or(1024 * 20);
/// Size of the channel to send protocol events from the double echo
pub static ref PROTOCOL_CHANNEL_SIZE: usize =
std::env::var("TOPOS_PROTOCOL_CHANNEL_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2048);
.unwrap_or(1024 * 20);
/// Capacity alert threshold for the double echo command channel
pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE
.checked_mul(10)
Expand Down
Loading
Loading