Skip to content

Commit

Permalink
refactor: clean up channel creation
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-miao committed Sep 22, 2023
1 parent c385f93 commit 7889192
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 17 deletions.
7 changes: 5 additions & 2 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ use rundler_task::{
spawn_tasks_with_shutdown,
};
use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};

use super::{json::get_json_config, CommonArgs};

const REQUEST_CAPACITY: usize = 1024;

/// CLI options for the builder
#[derive(Args, Debug)]
#[command(next_help_heading = "BUILDER")]
Expand Down Expand Up @@ -258,11 +260,12 @@ pub async fn run(builder_args: BuilderCliArgs, common_args: CommonArgs) -> anyho
)
.await?;

let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY);
spawn_tasks_with_shutdown(
[BuilderTask::new(
task_args,
event_sender,
LocalBuilderBuilder::new(1024),
LocalBuilderBuilder::new(req_sender, req_receiver),
pool,
)
.boxed()],
Expand Down
13 changes: 10 additions & 3 deletions bin/rundler/src/cli/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rundler_pool::{LocalPoolBuilder, PoolEvent, PoolTask};
use rundler_rpc::RpcTask;
use rundler_task::spawn_tasks_with_shutdown;
use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};

use self::events::Event;
use crate::cli::{
Expand All @@ -27,6 +27,9 @@ pub struct NodeCliArgs {
rpc: RpcArgs,
}

const REQUEST_CAPACITY: usize = 1024;
const BLOCK_CAPACITY: usize = 1024;

pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::Result<()> {
let NodeCliArgs {
pool: pool_args,
Expand Down Expand Up @@ -66,10 +69,14 @@ pub async fn run(bundler_args: NodeCliArgs, common_args: CommonArgs) -> anyhow::
}
});

let pool_builder = LocalPoolBuilder::new(1024, 1024);
let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY);
let (block_sender, _) = broadcast::channel(BLOCK_CAPACITY);

let pool_builder = LocalPoolBuilder::new(req_sender, req_receiver, block_sender);
let pool_handle = pool_builder.get_handle();

let builder_builder = LocalBuilderBuilder::new(1024);
let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY);
let builder_builder = LocalBuilderBuilder::new(req_sender, req_receiver);
let builder_handle = builder_builder.get_handle();

spawn_tasks_with_shutdown(
Expand Down
15 changes: 13 additions & 2 deletions bin/rundler/src/cli/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rundler_pool::{LocalPoolBuilder, PoolConfig, PoolTask, PoolTaskArgs};
use rundler_sim::MempoolConfig;
use rundler_task::spawn_tasks_with_shutdown;
use rundler_utils::emit::{self, EVENT_CHANNEL_CAPACITY};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};

use super::CommonArgs;
use crate::cli::json::get_json_config;
Expand Down Expand Up @@ -171,6 +171,9 @@ pub struct PoolCliArgs {
pool: PoolArgs,
}

const REQUEST_CAPACITY: usize = 1024;
const BLOCK_CAPACITY: usize = 1024;

pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Result<()> {
let PoolCliArgs { pool: pool_args } = pool_args;
let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
Expand All @@ -183,8 +186,16 @@ pub async fn run(pool_args: PoolCliArgs, common_args: CommonArgs) -> anyhow::Res

emit::receive_and_log_events_with_filter(event_rx, |_| true);

let (req_sender, req_receiver) = mpsc::channel(REQUEST_CAPACITY);
let (block_sender, _) = broadcast::channel(BLOCK_CAPACITY);

spawn_tasks_with_shutdown(
[PoolTask::new(task_args, event_sender, LocalPoolBuilder::new(1024, 1024)).boxed()],
[PoolTask::new(
task_args,
event_sender,
LocalPoolBuilder::new(req_sender, req_receiver, block_sender),
)
.boxed()],
tokio::signal::ctrl_c(),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion crates/builder/src/bundle_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ where
};

// The new_heads stream can buffer up multiple blocks, but we only want to consume the latest one.
// This task is used to consume the new heads and place them onto a channel that can be syncronously
// This task is used to consume the new heads and place them onto a channel that can be synchronously
// consumed until the latest block is reached.
let (tx, mut rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
Expand Down
8 changes: 5 additions & 3 deletions crates/builder/src/server/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ pub struct LocalBuilderBuilder {

impl LocalBuilderBuilder {
/// Create a new local builder server builder
pub fn new(request_capcity: usize) -> Self {
let (req_sender, req_receiver) = mpsc::channel(request_capcity);
pub fn new(
req_sender: mpsc::Sender<ServerRequest>,
req_receiver: mpsc::Receiver<ServerRequest>,
) -> Self {
Self {
req_sender,
req_receiver,
Expand Down Expand Up @@ -221,7 +223,7 @@ enum ServerRequestKind {
}

#[derive(Debug)]
struct ServerRequest {
pub struct ServerRequest {
request: ServerRequestKind,
response: oneshot::Sender<BuilderResult<ServerResponse>>,
}
Expand Down
14 changes: 9 additions & 5 deletions crates/pool/src/server/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ pub struct LocalPoolBuilder {

impl LocalPoolBuilder {
/// Create a new local pool server builder
pub fn new(request_capacity: usize, block_capacity: usize) -> Self {
let (req_sender, req_receiver) = mpsc::channel(request_capacity);
let (block_sender, _) = broadcast::channel(block_capacity);
pub fn new(
req_sender: mpsc::Sender<ServerRequest>,
req_receiver: mpsc::Receiver<ServerRequest>,
block_sender: broadcast::Sender<NewHead>,
) -> Self {
Self {
req_sender,
req_receiver,
Expand Down Expand Up @@ -434,7 +436,7 @@ where
}

#[derive(Debug)]
struct ServerRequest {
pub struct ServerRequest {
request: ServerRequestKind,
response: oneshot::Sender<PoolResult<ServerResponse>>,
}
Expand Down Expand Up @@ -614,7 +616,9 @@ mod tests {
}

fn setup(pools: HashMap<Address, Arc<MockMempool>>) -> State {
let builder = LocalPoolBuilder::new(10, 10);
let (req_sender, req_receiver) = mpsc::channel(1024);
let (block_sender, _) = broadcast::channel(1024);
let builder = LocalPoolBuilder::new(req_sender, req_receiver, block_sender);
let handle = builder.get_handle();
let (tx, rx) = broadcast::channel(10);
let run_handle = builder.run(pools, rx, CancellationToken::new());
Expand Down
4 changes: 3 additions & 1 deletion crates/pool/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use crate::{
server::{spawn_remote_mempool_server, LocalPoolBuilder},
};

const CHAIN_UPDATE_CHANNEL_CAPACITY: usize = 1000;

/// Arguments for the pool task.
#[derive(Debug)]
pub struct Args {
Expand Down Expand Up @@ -69,7 +71,7 @@ impl Task for PoolTask {
};
let provider = eth::new_provider(&self.args.http_url, self.args.http_poll_interval)?;
let chain = Chain::new(provider, chain_settings);
let (update_sender, _) = broadcast::channel(1000);
let (update_sender, _) = broadcast::channel(CHAIN_UPDATE_CHANNEL_CAPACITY);
let chain_handle = chain.spawn_watcher(update_sender.clone(), shutdown_token.clone());

let parsed_url = Url::parse(&self.args.http_url).context("Invalid RPC URL")?;
Expand Down

0 comments on commit 7889192

Please sign in to comment.