Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up channel creation #407

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use tokio::sync::broadcast;

use super::{json::get_json_config, CommonArgs};

const REQUEST_CHANNEL_CAPACITY: usize = 1024;

/// CLI options for the builder
#[derive(Args, Debug)]
#[command(next_help_heading = "BUILDER")]
Expand Down Expand Up @@ -262,7 +264,7 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
[BuilderTask::new(
task_args,
event_sender,
LocalBuilderBuilder::new(1024),
LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY),
pool,
)
.boxed()],
Expand Down
7 changes: 5 additions & 2 deletions bin/rundler/src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ use crate::cli::{
};
mod events;

const REQUEST_CHANNEL_CAPACITY: usize = 1024;
const BLOCK_CHANNEL_CAPACITY: usize = 1024;

#[derive(Debug, Args)]
pub struct NodeCliArgs {
#[command(flatten)]
Expand Down Expand Up @@ -66,10 +69,10 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
}
});

let pool_builder = LocalPoolBuilder::new(1024, 1024);
let pool_builder = LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY);
let pool_handle = pool_builder.get_handle();

let builder_builder = LocalBuilderBuilder::new(1024);
let builder_builder = LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY);
let builder_handle = builder_builder.get_handle();

spawn_tasks_with_shutdown(
Expand Down
19 changes: 18 additions & 1 deletion bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ use tokio::sync::broadcast;

use super::CommonArgs;
use crate::cli::json::get_json_config;

const REQUEST_CHANNEL_CAPACITY: usize = 1024;
const BLOCK_CHANNEL_CAPACITY: usize = 1024;

/// CLI options for the OP Pool
#[derive(Args, Debug)]
#[command(next_help_heading = "POOL")]
Expand Down Expand Up @@ -77,6 +81,13 @@ pub struct PoolArgs {
env = "POOL_CHAIN_HISTORY_SIZE"
)]
pub chain_history_size: Option<u64>,

#[arg(
long = "pool.chain_update_channel_capacity",
name = "pool.chain_update_channel_capacity",
env = "POOL_CHAIN_UPDATE_CHANNEL_CAPACITY"
)]
pub chain_update_channel_capacity: Option<usize>,
}

impl PoolArgs {
Expand Down Expand Up @@ -141,6 +152,7 @@ impl PoolArgs {
http_poll_interval: Duration::from_millis(common.eth_poll_interval_millis),
pool_configs,
remote_address,
chain_update_channel_capacity: self.chain_update_channel_capacity.unwrap_or(1024),
})
}
}
Expand Down Expand Up @@ -184,7 +196,12 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res
emit::receive_and_log_events_with_filter(event_rx, |_| true);

spawn_tasks_with_shutdown(
[PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()],
[PoolTask::new(
task_args,
event_sender,
LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY),
)
.boxed()],
tokio::signal::ctrl_c(),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// This task is used to consume the new heads and place them onto a channel that can be synchronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
Expand Down
4 changes: 3 additions & 1 deletion crates/pool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct Args {
/// Address to bind the remote mempool server to, if any.
/// If not provided, a server will not be started.
pub remote_address: Option<SocketAddr>,
/// Channel capacity for the chain update channel.
pub chain_update_channel_capacity: usize,
}

/// Mempool task.
Expand Down Expand Up @@ -69,7 +71,7 @@ impl Task for PoolTask {
};
let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?;
let chain = Chain::new(provider, chain_settings);
let (update_sender, _) = broadcast::channel(1000);
let (update_sender, _) = broadcast::channel(self.args.chain_update_channel_capacity);
let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone());

let parsed_url = Url::parse(&self.args.http_url).context("Invalid RPC URL")?;
Expand Down
Loading