diff --git a/Cargo.lock b/Cargo.lock index f97284f97..f3cf2bcaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2015,6 +2015,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "dyn-clone" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" + [[package]] name = "ecdsa" version = "0.16.9" @@ -4193,6 +4199,42 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "reth-metrics" +version = "1.0.7" +source = "git+https://github.com/paradigmxyz/reth.git?tag=v1.0.7#75b7172cf77eb4fd65fe1a6924f75066fb09fcd1" +dependencies = [ + "metrics", + "reth-metrics-derive", +] + +[[package]] +name = "reth-metrics-derive" +version = "1.0.7" +source = "git+https://github.com/paradigmxyz/reth.git?tag=v1.0.7#75b7172cf77eb4fd65fe1a6924f75066fb09fcd1" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.77", +] + +[[package]] +name = "reth-tasks" +version = "1.0.7" +source = "git+https://github.com/paradigmxyz/reth.git?tag=v1.0.7#75b7172cf77eb4fd65fe1a6924f75066fb09fcd1" +dependencies = [ + "auto_impl", + "dyn-clone", + "futures-util", + "metrics", + "reth-metrics", + "thiserror", + "tokio", + "tracing", + "tracing-futures", +] + [[package]] name = "rfc6979" version = "0.4.0" @@ -4316,6 +4358,7 @@ dependencies = [ "metrics-process", "metrics-util", "paste", + "reth-tasks", "rundler-builder", "rundler-pool", "rundler-provider", @@ -4412,6 +4455,7 @@ dependencies = [ "mockall", "parking_lot", "prost", + "reth-tasks", "rundler-contracts", "rundler-provider", "rundler-sim", @@ -4532,6 +4576,7 @@ dependencies = [ "async-trait", "futures", "metrics", + "reth-tasks", "rundler-provider", "rundler-types", "rundler-utils", @@ -5651,6 +5696,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 7d8fa5619..7325109a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,9 @@ alloy-transport-http = { version = "0.4.1", default-features = false, features = # alloy other alloy-rlp = "0.3.8" +# reth +reth-tasks = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7" } + anyhow = "1.0.89" async-trait = "0.1.83" auto_impl = "1.2.0" diff --git a/bin/rundler/Cargo.toml b/bin/rundler/Cargo.toml index 30e38b163..a99106ec3 100644 --- a/bin/rundler/Cargo.toml +++ b/bin/rundler/Cargo.toml @@ -35,6 +35,7 @@ metrics-exporter-prometheus = { version = "0.15.3", default-features = false, fe metrics-process = "2.1.0" metrics-util = "0.17.0" paste = "1.0" +reth-tasks.workspace = true serde.workspace = true serde_json.workspace = true sscanf = "0.4.2" diff --git a/bin/rundler/src/cli/builder.rs b/bin/rundler/src/cli/builder.rs index 9b7f04b96..b8ebb6182 100644 --- a/bin/rundler/src/cli/builder.rs +++ b/bin/rundler/src/cli/builder.rs @@ -24,7 +24,7 @@ use rundler_pool::RemotePoolClient; use rundler_sim::{MempoolConfigs, PriorityFeeMode}; use rundler_task::{ server::{connect_with_retries_shutdown, format_socket_addr}, - spawn_tasks_with_shutdown, Task, + TaskSpawnerExt, }; use rundler_types::{chain::ChainSpec, EntryPointVersion}; use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}; @@ -419,7 +419,8 @@ pub struct BuilderCliArgs { pool_url: String, } -pub async fn run( +pub async fn spawn_tasks( + task_spawner: T, chain_spec: ChainSpec, builder_args: BuilderCliArgs, common_args: CommonArgs, @@ -430,7 +431,13 @@ pub async fn run( } = builder_args; let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY); - emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event); + task_spawner.spawn_critical( + "recv and log events", + Box::pin(emit::receive_and_log_events_with_filter( + event_rx, + is_nonspammy_event, + )), + ); let task_args = builder_args .to_args( @@ -443,7 +450,7 @@ pub async fn run( let pool = connect_with_retries_shutdown( "op pool from builder", &pool_url, - |url| RemotePoolClient::connect(url, chain_spec.clone()), + |url| RemotePoolClient::connect(url, chain_spec.clone(), Box::new(task_spawner.clone())), tokio::signal::ctrl_c(), ) .await?; @@ -454,20 +461,18 @@ pub async fn run( ep_v0_7, } = super::construct_providers(&common_args, &chain_spec)?; - spawn_tasks_with_shutdown( - [BuilderTask::new( - task_args, - event_sender, - LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY), - pool, - provider, - ep_v0_6, - ep_v0_7, - ) - .boxed()], - tokio::signal::ctrl_c(), + BuilderTask::new( + task_args, + event_sender, + LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY), + pool, + provider, + ep_v0_6, + ep_v0_7, ) - .await; + .spawn(task_spawner) + .await?; + Ok(()) } diff --git a/bin/rundler/src/cli/metrics.rs b/bin/rundler/src/cli/metrics.rs index 8cb654d60..025eda778 100644 --- a/bin/rundler/src/cli/metrics.rs +++ b/bin/rundler/src/cli/metrics.rs @@ -18,8 +18,10 @@ use metrics::gauge; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_process::Collector; use metrics_util::layers::{PrefixLayer, Stack}; +use rundler_task::TaskSpawner; -pub fn initialize<'a>( +pub fn initialize<'a, T: TaskSpawner>( + task_spawner: &T, sample_interval_millis: u64, listen_addr: SocketAddr, tags: impl IntoIterator, @@ -38,28 +40,41 @@ pub fn initialize<'a>( builder = builder.set_buckets(buckets)?; let (recorder, exporter) = builder.build()?; - tokio::spawn(exporter); + task_spawner.spawn_critical( + "metrics exporter", + Box::pin(async move { + if exporter.await.is_err() { + tracing::error!("metrics exporter failed"); + } + }), + ); let stack = Stack::new(recorder); stack.push(PrefixLayer::new("rundler")).install()?; - tokio::spawn(async move { - let collector = Collector::default(); - loop { - collector.collect(); - tokio::time::sleep(Duration::from_millis(sample_interval_millis)).await; - } - }); + task_spawner.spawn_critical( + "metrics collector", + Box::pin(async move { + let collector = Collector::default(); + loop { + collector.collect(); + tokio::time::sleep(Duration::from_millis(sample_interval_millis)).await; + } + }), + ); let handle = tokio::runtime::Handle::current(); let frequency = std::time::Duration::from_millis(sample_interval_millis); let runtime_metrics = handle.metrics(); let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle); - tokio::spawn(async move { - for metrics in runtime_monitor.intervals() { - collect_tokio(&runtime_metrics, metrics); - tokio::time::sleep(frequency).await; - } - }); + task_spawner.spawn_critical( + "tokio metrics collector", + Box::pin(async move { + for metrics in runtime_monitor.intervals() { + collect_tokio(&runtime_metrics, metrics); + tokio::time::sleep(frequency).await; + } + }), + ); Ok(()) } diff --git a/bin/rundler/src/cli/mod.rs b/bin/rundler/src/cli/mod.rs index 4c4e41973..c93559ae3 100644 --- a/bin/rundler/src/cli/mod.rs +++ b/bin/rundler/src/cli/mod.rs @@ -11,7 +11,7 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use alloy_primitives::U256; use anyhow::{bail, Context}; @@ -29,6 +29,7 @@ mod tracing; use builder::BuilderCliArgs; use node::NodeCliArgs; use pool::PoolCliArgs; +use reth_tasks::TaskManager; use rpc::RpcCliArgs; use rundler_provider::{ AlloyEntryPointV0_6, AlloyEntryPointV0_7, AlloyEvmProvider, EntryPointProvider, EvmProvider, @@ -51,8 +52,12 @@ pub async fn run() -> anyhow::Result<()> { let _guard = tracing::configure_logging(&opt.logs)?; tracing::info!("Parsed CLI options: {:#?}", opt); + let mut task_manager = TaskManager::current(); + let task_spawner = task_manager.executor(); + let metrics_addr = format!("{}:{}", opt.metrics.host, opt.metrics.port).parse()?; metrics::initialize( + &task_spawner, opt.metrics.sample_interval_millis, metrics_addr, &opt.metrics.tags, @@ -64,12 +69,31 @@ pub async fn run() -> anyhow::Result<()> { tracing::info!("Chain spec: {:#?}", cs); match opt.command { - Command::Node(args) => node::run(cs, *args, opt.common).await?, - Command::Pool(args) => pool::run(cs, args, opt.common).await?, - Command::Rpc(args) => rpc::run(cs, args, opt.common).await?, - Command::Builder(args) => builder::run(cs, args, opt.common).await?, + Command::Node(args) => { + node::spawn_tasks(task_spawner.clone(), cs, *args, opt.common).await? + } + Command::Pool(args) => { + pool::spawn_tasks(task_spawner.clone(), cs, args, opt.common).await? + } + Command::Rpc(args) => rpc::spawn_tasks(task_spawner.clone(), cs, args, opt.common).await?, + Command::Builder(args) => { + builder::spawn_tasks(task_spawner.clone(), cs, args, opt.common).await? + } } + // wait for ctrl-c or the task manager to panic + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::info!("Received ctrl-c, shutting down"); + }, + e = &mut task_manager => { + tracing::error!("Task manager panicked shutting down: {e}"); + }, + } + + // wait for the task manager to shutdown + task_manager.graceful_shutdown_with_timeout(Duration::from_secs(10)); + tracing::info!("Shutdown, goodbye"); Ok(()) } diff --git a/bin/rundler/src/cli/node/mod.rs b/bin/rundler/src/cli/node/mod.rs index 7ad977075..47abf2a2d 100644 --- a/bin/rundler/src/cli/node/mod.rs +++ b/bin/rundler/src/cli/node/mod.rs @@ -15,7 +15,7 @@ use clap::Args; use rundler_builder::{BuilderEvent, BuilderTask, LocalBuilderBuilder}; use rundler_pool::{LocalPoolBuilder, PoolEvent, PoolTask}; use rundler_rpc::RpcTask; -use rundler_task::{spawn_tasks_with_shutdown, Task}; +use rundler_task::TaskSpawnerExt; use rundler_types::chain::ChainSpec; use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY}; use tokio::sync::broadcast; @@ -45,7 +45,8 @@ pub struct NodeCliArgs { rpc: RpcArgs, } -pub async fn run( +pub async fn spawn_tasks( + task_spawner: T, chain_spec: ChainSpec, bundler_args: NodeCliArgs, common_args: CommonArgs, @@ -78,21 +79,30 @@ pub async fn run( let (builder_event_sender, builder_event_rx) = broadcast::channel::>(EVENT_CHANNEL_CAPACITY); - emit::receive_and_log_events_with_filter(event_rx, |_| true); - emit::receive_events("op pool", op_pool_event_rx, { - let event_sender = event_sender.clone(); - move |event| { - let _ = event_sender.send(WithEntryPoint::of(event)); - } - }); - emit::receive_events("builder", builder_event_rx, { - let event_sender = event_sender.clone(); - move |event| { - if builder::is_nonspammy_event(&event) { + task_spawner.spawn_critical( + "recv and log events", + Box::pin(emit::receive_and_log_events_with_filter(event_rx, |_| true)), + ); + task_spawner.spawn_critical( + "recv op pool events", + Box::pin(emit::receive_events("op pool", op_pool_event_rx, { + let event_sender = event_sender.clone(); + move |event| { let _ = event_sender.send(WithEntryPoint::of(event)); } - } - }); + })), + ); + task_spawner.spawn_critical( + "recv builder events", + Box::pin(emit::receive_events("builder", builder_event_rx, { + let event_sender = event_sender.clone(); + move |event| { + if builder::is_nonspammy_event(&event) { + let _ = event_sender.send(WithEntryPoint::of(event)); + } + } + })), + ); let pool_builder = LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY); let pool_handle = pool_builder.get_handle(); @@ -106,40 +116,39 @@ pub async fn run( ep_v0_7, } = super::construct_providers(&common_args, &chain_spec)?; - spawn_tasks_with_shutdown( - [ - PoolTask::new( - pool_task_args, - op_pool_event_sender, - pool_builder, - provider.clone(), - ep_v0_6.clone(), - ep_v0_7.clone(), - ) - .boxed(), - BuilderTask::new( - builder_task_args, - builder_event_sender, - builder_builder, - pool_handle.clone(), - provider.clone(), - ep_v0_6.clone(), - ep_v0_7.clone(), - ) - .boxed(), - RpcTask::new( - rpc_task_args, - pool_handle, - builder_handle, - provider, - ep_v0_6, - ep_v0_7, - ) - .boxed(), - ], - tokio::signal::ctrl_c(), + PoolTask::new( + pool_task_args, + op_pool_event_sender, + pool_builder, + provider.clone(), + ep_v0_6.clone(), + ep_v0_7.clone(), + ) + .spawn(task_spawner.clone()) + .await?; + + BuilderTask::new( + builder_task_args, + builder_event_sender, + builder_builder, + pool_handle.clone(), + provider.clone(), + ep_v0_6.clone(), + ep_v0_7.clone(), + ) + .spawn(task_spawner.clone()) + .await?; + + RpcTask::new( + rpc_task_args, + pool_handle, + builder_handle, + provider, + ep_v0_6, + ep_v0_7, ) - .await; + .spawn(task_spawner) + .await?; Ok(()) } diff --git a/bin/rundler/src/cli/pool.rs b/bin/rundler/src/cli/pool.rs index b5e2c7854..107441c6a 100644 --- a/bin/rundler/src/cli/pool.rs +++ b/bin/rundler/src/cli/pool.rs @@ -18,7 +18,7 @@ use anyhow::Context; use clap::Args; use rundler_pool::{LocalPoolBuilder, PoolConfig, PoolTask, PoolTaskArgs}; use rundler_sim::MempoolConfigs; -use rundler_task::{spawn_tasks_with_shutdown, Task}; +use rundler_task::TaskSpawnerExt; use rundler_types::{chain::ChainSpec, EntryPointVersion}; use rundler_utils::emit::{self, EVENT_CHANNEL_CAPACITY}; use tokio::sync::broadcast; @@ -268,7 +268,8 @@ pub struct PoolCliArgs { pool: PoolArgs, } -pub async fn run( +pub async fn spawn_tasks( + task_spawner: T, chain_spec: ChainSpec, pool_args: PoolCliArgs, common_args: CommonArgs, @@ -283,7 +284,10 @@ pub async fn run( ) .await?; - emit::receive_and_log_events_with_filter(event_rx, |_| true); + task_spawner.spawn_critical( + "recv and log events", + Box::pin(emit::receive_and_log_events_with_filter(event_rx, |_| true)), + ); let RundlerProviders { provider, @@ -291,18 +295,16 @@ pub async fn run( ep_v0_7, } = super::construct_providers(&common_args, &chain_spec)?; - spawn_tasks_with_shutdown( - [PoolTask::new( - task_args, - event_sender, - LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY), - provider, - ep_v0_6, - ep_v0_7, - ) - .boxed()], - tokio::signal::ctrl_c(), + PoolTask::new( + task_args, + event_sender, + LocalPoolBuilder::new(REQUEST_CHANNEL_CAPACITY, BLOCK_CHANNEL_CAPACITY), + provider, + ep_v0_6, + ep_v0_7, ) - .await; + .spawn(task_spawner) + .await?; + Ok(()) } diff --git a/bin/rundler/src/cli/rpc.rs b/bin/rundler/src/cli/rpc.rs index 7d2dd609b..11e9a7d22 100644 --- a/bin/rundler/src/cli/rpc.rs +++ b/bin/rundler/src/cli/rpc.rs @@ -19,7 +19,7 @@ use rundler_builder::RemoteBuilderClient; use rundler_pool::RemotePoolClient; use rundler_rpc::{EthApiSettings, RpcTask, RpcTaskArgs, RundlerApiSettings}; use rundler_sim::{EstimationSettings, PrecheckSettings}; -use rundler_task::{server::connect_with_retries_shutdown, spawn_tasks_with_shutdown, Task}; +use rundler_task::{server::connect_with_retries_shutdown, TaskSpawnerExt}; use rundler_types::chain::ChainSpec; use super::{CommonArgs, RundlerProviders}; @@ -139,7 +139,8 @@ pub struct RpcCliArgs { builder_url: String, } -pub async fn run( +pub async fn spawn_tasks( + task_spawner: T, chain_spec: ChainSpec, rpc_args: RpcCliArgs, common_args: CommonArgs, @@ -162,7 +163,7 @@ pub async fn run( let pool = connect_with_retries_shutdown( "op pool from rpc", &pool_url, - |url| RemotePoolClient::connect(url, chain_spec.clone()), + |url| RemotePoolClient::connect(url, chain_spec.clone(), Box::new(task_spawner.clone())), tokio::signal::ctrl_c(), ) .await?; @@ -181,10 +182,9 @@ pub async fn run( ep_v0_7, } = super::construct_providers(&common_args, &chain_spec)?; - spawn_tasks_with_shutdown( - [RpcTask::new(task_args, pool, builder, provider, ep_v0_6, ep_v0_7).boxed()], - tokio::signal::ctrl_c(), - ) - .await; + RpcTask::new(task_args, pool, builder, provider, ep_v0_6, ep_v0_7) + .spawn(task_spawner) + .await?; + Ok(()) } diff --git a/crates/builder/src/bundle_sender.rs b/crates/builder/src/bundle_sender.rs index f2af90f49..8b0abce99 100644 --- a/crates/builder/src/bundle_sender.rs +++ b/crates/builder/src/bundle_sender.rs @@ -11,16 +11,18 @@ // You should have received a copy of the GNU General Public License along with Rundler. // If not, see https://www.gnu.org/licenses/. -use std::{marker::PhantomData, sync::Arc, time::Duration}; +use std::{marker::PhantomData, pin::Pin, sync::Arc, time::Duration}; use alloy_primitives::{Address, B256}; use anyhow::{bail, Context}; use async_trait::async_trait; +use futures::Stream; use futures_util::StreamExt; #[cfg(test)] use mockall::automock; use rundler_provider::{BundleHandler, EntryPoint, TransactionRequest}; use rundler_sim::ExpectedStorage; +use rundler_task::TaskSpawner; use rundler_types::{ builder::BundlingMode, chain::ChainSpec, @@ -30,7 +32,11 @@ use rundler_types::{ use rundler_utils::emit::WithEntryPoint; use tokio::{ join, - sync::{broadcast, mpsc, mpsc::UnboundedReceiver, oneshot}, + sync::{ + broadcast, mpsc, + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, + }, }; use tracing::{debug, error, info, instrument, warn}; @@ -42,7 +48,7 @@ use crate::{ #[async_trait] pub(crate) trait BundleSender: Send + Sync { - async fn send_bundles_in_loop(self) -> anyhow::Result<()>; + async fn send_bundles_in_loop(self, task_spawner: T); } #[derive(Debug)] @@ -130,14 +136,16 @@ where /// then waiting for one bundle to be mined or dropped before forming the /// next one. #[instrument(skip_all, fields(entry_point = self.entry_point.address().to_string(), builder_index = self.builder_index))] - async fn send_bundles_in_loop(mut self) -> anyhow::Result<()> { + async fn send_bundles_in_loop(mut self, task_spawner: TS) { // trigger for sending bundles let sender_trigger = BundleSenderTrigger::new( + &task_spawner, &self.pool, self.bundle_action_receiver.take().unwrap(), Duration::from_millis(self.chain_spec.bundle_max_send_interval_millis), ) - .await?; + .await + .expect("Failed to create bundle sender trigger"); // initial state let mut state = @@ -1022,12 +1030,22 @@ impl Trigger for BundleSenderTrigger { } impl BundleSenderTrigger { - async fn new( + async fn new( + task_spawner: &T, pool_client: &P, bundle_action_receiver: mpsc::Receiver, timer_interval: Duration, ) -> anyhow::Result { - let block_rx = Self::start_block_stream(pool_client).await?; + let Ok(new_heads) = pool_client.subscribe_new_heads().await else { + error!("Failed to subscribe to new blocks"); + bail!("failed to subscribe to new blocks"); + }; + let (block_tx, block_rx) = mpsc::unbounded_channel(); + + task_spawner.spawn_critical( + "block stream", + Box::pin(Self::block_stream_task(new_heads, block_tx)), + ); Ok(Self { bundling_mode: BundlingMode::Auto, @@ -1041,33 +1059,24 @@ impl BundleSenderTrigger { }) } - async fn start_block_stream( - pool_client: &P, - ) -> anyhow::Result> { - let Ok(mut new_heads) = pool_client.subscribe_new_heads().await else { - error!("Failed to subscribe to new blocks"); - bail!("failed to subscribe to new blocks"); - }; - - let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(async move { - loop { - match new_heads.next().await { - Some(b) => { - if tx.send(b).is_err() { - error!("Failed to buffer new block for bundle sender"); - return; - } - } - None => { - error!("Block stream ended"); + async fn block_stream_task( + mut new_heads: Pin + Send>>, + block_tx: UnboundedSender, + ) { + loop { + match new_heads.next().await { + Some(b) => { + if block_tx.send(b).is_err() { + error!("Failed to buffer new block for bundle sender"); return; } } + None => { + error!("Block stream ended"); + return; + } } - }); - - Ok(rx) + } } fn consume_blocks(&mut self) -> anyhow::Result<()> { diff --git a/crates/builder/src/server/local.rs b/crates/builder/src/server/local.rs index c15bb5d2c..d58cf5af4 100644 --- a/crates/builder/src/server/local.rs +++ b/crates/builder/src/server/local.rs @@ -13,13 +13,13 @@ use alloy_primitives::{Address, B256}; use async_trait::async_trait; -use rundler_task::server::{HealthCheck, ServerStatus}; -use rundler_types::builder::{Builder, BuilderError, BuilderResult, BundlingMode}; -use tokio::{ - sync::{mpsc, oneshot}, - task::JoinHandle, +use futures::future::BoxFuture; +use rundler_task::{ + server::{HealthCheck, ServerStatus}, + GracefulShutdown, }; -use tokio_util::sync::CancellationToken; +use rundler_types::builder::{Builder, BuilderError, BuilderResult, BundlingMode}; +use tokio::sync::{mpsc, oneshot}; use crate::bundle_sender::{BundleSenderAction, SendBundleRequest, SendBundleResult}; @@ -52,11 +52,11 @@ impl LocalBuilderBuilder { self, bundle_sender_actions: Vec>, entry_points: Vec
, - shutdown_token: CancellationToken, - ) -> JoinHandle> { - let mut runner = + shutdown: GracefulShutdown, + ) -> BoxFuture<'static, ()> { + let runner = LocalBuilderServerRunner::new(self.req_receiver, bundle_sender_actions, entry_points); - tokio::spawn(async move { runner.run(shutdown_token).await }) + Box::pin(runner.run(shutdown)) } } @@ -147,11 +147,11 @@ impl LocalBuilderServerRunner { } } - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + async fn run(mut self, shutdown: GracefulShutdown) { loop { tokio::select! { - _ = shutdown_token.cancelled() => { - return Ok(()) + _ = shutdown.clone() => { + return; } Some(req) = self.req_receiver.recv() => { let resp: BuilderResult = 'a: { diff --git a/crates/builder/src/server/mod.rs b/crates/builder/src/server/mod.rs index adf8664be..aa9016ce5 100644 --- a/crates/builder/src/server/mod.rs +++ b/crates/builder/src/server/mod.rs @@ -15,5 +15,5 @@ mod local; pub use local::{LocalBuilderBuilder, LocalBuilderHandle}; mod remote; -pub(crate) use remote::spawn_remote_builder_server; +pub(crate) use remote::remote_builder_server_task; pub use remote::RemoteBuilderClient; diff --git a/crates/builder/src/server/remote/mod.rs b/crates/builder/src/server/remote/mod.rs index eca6adeb3..922b125f0 100644 --- a/crates/builder/src/server/remote/mod.rs +++ b/crates/builder/src/server/remote/mod.rs @@ -19,4 +19,4 @@ mod error; pub mod protos; mod server; -pub(crate) use server::spawn_remote_builder_server; +pub(crate) use server::remote_builder_server_task; diff --git a/crates/builder/src/server/remote/server.rs b/crates/builder/src/server/remote/server.rs index 791809fc0..ff1f3bcd4 100644 --- a/crates/builder/src/server/remote/server.rs +++ b/crates/builder/src/server/remote/server.rs @@ -13,9 +13,8 @@ use std::net::SocketAddr; +use rundler_task::GracefulShutdown; use rundler_types::builder::Builder; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; use tonic::{async_trait, transport::Server, Request, Response, Status}; use super::protos::{ @@ -28,19 +27,20 @@ use super::protos::{ use crate::server::{local::LocalBuilderHandle, remote::protos::DebugSendBundleNowSuccess}; /// Spawn a remote builder server -pub(crate) async fn spawn_remote_builder_server( +pub(crate) async fn remote_builder_server_task( addr: SocketAddr, chain_id: u64, local_builder: LocalBuilderHandle, - shutdown_token: CancellationToken, -) -> anyhow::Result>> { + shutdown: GracefulShutdown, +) { // gRPC server let builder_server = GrpcBuilderServerImpl::new(chain_id, local_builder); let builder_server = GrpcBuilderServer::new(builder_server); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(BUILDER_FILE_DESCRIPTOR_SET) - .build_v1()?; + .build_v1() + .expect("should build builder refelction service"); // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); @@ -48,15 +48,17 @@ pub(crate) async fn spawn_remote_builder_server( .set_serving::>() .await; - Ok(tokio::spawn(async move { - Server::builder() - .add_service(builder_server) - .add_service(reflection_service) - .add_service(health_service) - .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) - .await - .map_err(|e| anyhow::anyhow!(format!("builder server failed: {e:?}"))) - })) + if let Err(e) = Server::builder() + .add_service(builder_server) + .add_service(reflection_service) + .add_service(health_service) + .serve_with_shutdown(addr, async move { + let _ = shutdown.await; + }) + .await + { + tracing::error!("builder server failed: {e:?}"); + } } #[derive(Debug)] diff --git a/crates/builder/src/signer/aws.rs b/crates/builder/src/signer/aws.rs index 11c5f9f73..d0ef4bba8 100644 --- a/crates/builder/src/signer/aws.rs +++ b/crates/builder/src/signer/aws.rs @@ -19,7 +19,7 @@ use anyhow::Context; use aws_config::BehaviorVersion; use rslock::{Lock, LockGuard, LockManager}; use rundler_provider::EvmProvider; -use rundler_utils::handle::SpawnGuard; +use rundler_task::TaskSpawner; use tokio::{sync::oneshot, time::sleep}; use super::monitor_account_balance; @@ -28,12 +28,11 @@ use super::monitor_account_balance; #[derive(Debug)] pub(crate) struct KmsSigner { pub(crate) signer: AwsSigner, - _kms_guard: Option, - _monitor_guard: SpawnGuard, } impl KmsSigner { - pub(crate) async fn connect( + pub(crate) async fn connect( + task_spawner: &T, provider: P, chain_id: u64, key_ids: Vec, @@ -43,36 +42,32 @@ impl KmsSigner { let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await; let client = aws_sdk_kms::Client::new(&config); - let mut kms_guard = None; - let key_id; - - if key_ids.len() > 1 { + let key_id = if key_ids.len() > 1 { let (tx, rx) = oneshot::channel::(); - kms_guard = Some(SpawnGuard::spawn_with_guard(Self::lock_manager_loop( - redis_uri, key_ids, chain_id, ttl_millis, tx, - ))); - key_id = rx.await.context("should lock key_id")?; + task_spawner.spawn_critical( + "kms lock manager loop", + Box::pin(Self::lock_manager_loop( + redis_uri, key_ids, chain_id, ttl_millis, tx, + )), + ); + rx.await.context("should lock key_id")? } else { - key_id = key_ids + key_ids .first() .expect("There should be at least one kms key") - .to_owned(); + .to_owned() }; let signer = AwsSigner::new(client, key_id, Some(chain_id)) .await .context("should create signer")?; - let monitor_guard = SpawnGuard::spawn_with_guard(monitor_account_balance( + task_spawner.spawn(Box::pin(monitor_account_balance( signer.address(), provider.clone(), - )); + ))); - Ok(Self { - signer, - _kms_guard: kms_guard, - _monitor_guard: monitor_guard, - }) + Ok(Self { signer }) } async fn lock_manager_loop( diff --git a/crates/builder/src/signer/local.rs b/crates/builder/src/signer/local.rs index 2743504fb..f28796dde 100644 --- a/crates/builder/src/signer/local.rs +++ b/crates/builder/src/signer/local.rs @@ -15,17 +15,16 @@ use alloy_signer::Signer as _; use alloy_signer_local::PrivateKeySigner; use anyhow::Context; use rundler_provider::EvmProvider; -use rundler_utils::handle::SpawnGuard; - +use rundler_task::TaskSpawner; /// A local signer handle #[derive(Debug)] pub(crate) struct LocalSigner { pub(crate) signer: PrivateKeySigner, - _monitor_abort_handle: SpawnGuard, } impl LocalSigner { - pub(crate) async fn connect( + pub(crate) async fn connect( + task_spawner: &T, provider: P, chain_id: u64, private_key: String, @@ -33,14 +32,14 @@ impl LocalSigner { let signer = private_key .parse::() .context("should create signer")?; - let _monitor_abort_handle = SpawnGuard::spawn_with_guard(super::monitor_account_balance( + + task_spawner.spawn(Box::pin(super::monitor_account_balance( signer.address(), provider, - )); + ))); Ok(Self { signer: signer.with_chain_id(Some(chain_id)), - _monitor_abort_handle, }) } } diff --git a/crates/builder/src/task.rs b/crates/builder/src/task.rs index 14385eb28..58f49ed36 100644 --- a/crates/builder/src/task.rs +++ b/crates/builder/src/task.rs @@ -14,29 +14,24 @@ use std::{collections::HashMap, net::SocketAddr, time::Duration}; use alloy_primitives::{Address, B256}; -use anyhow::{bail, Context}; -use async_trait::async_trait; -use futures::future; -use futures_util::TryFutureExt; +use anyhow::Context; use rundler_provider::{EntryPointProvider, EvmProvider}; use rundler_sim::{ gas::{self, FeeEstimatorImpl}, simulation::{self, UnsafeSimulator}, MempoolConfig, PriorityFeeMode, SimulationSettings, Simulator, }; -use rundler_task::Task; +use rundler_task::TaskSpawnerExt; use rundler_types::{ chain::ChainSpec, pool::Pool, v0_6::UserOperation as UserOperationV0_6, v0_7::UserOperation as UserOperationV0_7, EntryPointVersion, UserOperation, UserOperationVariant, }; -use rundler_utils::{emit::WithEntryPoint, handle}; +use rundler_utils::emit::WithEntryPoint; use tokio::{ sync::{broadcast, mpsc}, - task::JoinHandle, - time, try_join, + time, }; -use tokio_util::sync::CancellationToken; use tracing::info; use crate::{ @@ -44,7 +39,7 @@ use crate::{ bundle_sender::{self, BundleSender, BundleSenderAction, BundleSenderImpl}, emit::BuilderEvent, sender::TransactionSenderArgs, - server::{spawn_remote_builder_server, LocalBuilderBuilder}, + server::{self, LocalBuilderBuilder}, signer::{BundlerSigner, KmsSigner, LocalSigner, Signer}, transaction_tracker::{self, TransactionTrackerImpl}, }; @@ -121,33 +116,53 @@ pub struct BuilderTask { ep_07: Option, } -#[async_trait] -impl Task for BuilderTask +impl BuilderTask { + /// Create a new builder task + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + builder_builder: LocalBuilderBuilder, + pool: P, + provider: PR, + ep_06: Option, + ep_07: Option, + ) -> Self { + Self { + args, + event_sender, + builder_builder, + pool, + provider, + ep_06, + ep_07, + } + } +} + +impl BuilderTask where P: Pool + Clone + 'static, PR: EvmProvider + Clone + 'static, E06: EntryPointProvider + Clone + 'static, E07: EntryPointProvider + Clone + 'static, { - fn boxed(self) -> Box { - Box::new(self) - } - - async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { - let mut sender_handles = vec![]; + /// Spawn the builder task on the given task spawner + pub async fn spawn(self, task_spawner: T) -> anyhow::Result<()> { let mut bundle_sender_actions = vec![]; let mut pk_iter = self.args.private_keys.clone().into_iter(); for ep in &self.args.entry_points { match ep.version { EntryPointVersion::V0_6 => { - let (handles, actions) = self.create_builders_v0_6(ep, &mut pk_iter).await?; - sender_handles.extend(handles); + let actions = self + .create_builders_v0_6(&task_spawner, ep, &mut pk_iter) + .await?; bundle_sender_actions.extend(actions); } EntryPointVersion::V0_7 => { - let (handles, actions) = self.create_builders_v0_7(ep, &mut pk_iter).await?; - sender_handles.extend(handles); + let actions = self + .create_builders_v0_7(&task_spawner, ep, &mut pk_iter) + .await?; bundle_sender_actions.extend(actions); } EntryPointVersion::Unspecified => { @@ -156,91 +171,45 @@ where } } - // flatten the senders handles to one handle, short-circuit on errors - let sender_handle = tokio::spawn( - future::try_join_all(sender_handles) - .map_ok(|_| ()) - .map_err(|e| anyhow::anyhow!(e)), - ); - let builder_handle = self.builder_builder.get_handle(); - let builder_runnder_handle = self.builder_builder.run( - bundle_sender_actions, - vec![self.args.chain_spec.entry_point_address_v0_6], - shutdown_token.clone(), - ); - let remote_handle = match self.args.remote_address { - Some(addr) => { - spawn_remote_builder_server( - addr, - self.args.chain_spec.id, - builder_handle, - shutdown_token, + task_spawner.spawn_critical_with_graceful_shutdown_signal( + "local builder server", + |shutdown| { + self.builder_builder.run( + bundle_sender_actions, + vec![self.args.chain_spec.entry_point_address_v0_6], + shutdown, ) - .await? - } - None => tokio::spawn(async { Ok(()) }), - }; - - info!("Started bundle builder"); + }, + ); - match try_join!( - handle::flatten_handle(sender_handle), - handle::flatten_handle(builder_runnder_handle), - handle::flatten_handle(remote_handle), - ) { - Ok(_) => { - info!("Builder server shutdown"); - Ok(()) - } - Err(e) => { - tracing::error!("Builder server error: {e:?}"); - bail!("Builder server error: {e:?}") - } + if let Some(addr) = self.args.remote_address { + task_spawner.spawn_critical_with_graceful_shutdown_signal( + "remote builder server", + |shutdown| { + server::remote_builder_server_task( + addr, + self.args.chain_spec.id, + builder_handle, + shutdown, + ) + }, + ); } - } -} -impl BuilderTask { - /// Create a new builder task - pub fn new( - args: Args, - event_sender: broadcast::Sender>, - builder_builder: LocalBuilderBuilder, - pool: P, - provider: PR, - ep_06: Option, - ep_07: Option, - ) -> Self { - Self { - args, - event_sender, - builder_builder, - pool, - provider, - ep_06, - ep_07, - } + info!("Started bundle builder"); + Ok(()) } -} -impl BuilderTask -where - P: Pool + Clone + 'static, - PR: EvmProvider + Clone + 'static, - E06: EntryPointProvider + Clone + 'static, - E07: EntryPointProvider + Clone + 'static, -{ - async fn create_builders_v0_6( + async fn create_builders_v0_6( &self, + task_spawner: &T, ep: &EntryPointBuilderSettings, pk_iter: &mut I, - ) -> anyhow::Result<( - Vec>>, - Vec>, - )> + ) -> anyhow::Result>> where + T: TaskSpawnerExt, I: Iterator, { info!("Mempool config for ep v0.6: {:?}", ep.mempool_configs); @@ -248,11 +217,11 @@ where .ep_06 .clone() .context("entry point v0.6 not supplied")?; - let mut sender_handles = vec![]; let mut bundle_sender_actions = vec![]; for i in 0..ep.num_bundle_builders { - let (spawn_guard, bundle_sender_action) = if self.args.unsafe_mode { + let bundle_sender_action = if self.args.unsafe_mode { self.create_bundle_builder( + task_spawner, i + ep.bundle_builder_index_offset, self.provider.clone(), ep_v0_6.clone(), @@ -266,6 +235,7 @@ where .await? } else { self.create_bundle_builder( + task_spawner, i + ep.bundle_builder_index_offset, self.provider.clone(), ep_v0_6.clone(), @@ -279,21 +249,19 @@ where ) .await? }; - sender_handles.push(spawn_guard); bundle_sender_actions.push(bundle_sender_action); } - Ok((sender_handles, bundle_sender_actions)) + Ok(bundle_sender_actions) } - async fn create_builders_v0_7( + async fn create_builders_v0_7( &self, + task_spawner: &T, ep: &EntryPointBuilderSettings, pk_iter: &mut I, - ) -> anyhow::Result<( - Vec>>, - Vec>, - )> + ) -> anyhow::Result>> where + T: TaskSpawnerExt, I: Iterator, { info!("Mempool config for ep v0.7: {:?}", ep.mempool_configs); @@ -301,11 +269,11 @@ where .ep_07 .clone() .context("entry point v0.7 not supplied")?; - let mut sender_handles = vec![]; let mut bundle_sender_actions = vec![]; for i in 0..ep.num_bundle_builders { - let (spawn_guard, bundle_sender_action) = if self.args.unsafe_mode { + let bundle_sender_action = if self.args.unsafe_mode { self.create_bundle_builder( + task_spawner, i + ep.bundle_builder_index_offset, self.provider.clone(), ep_v0_7.clone(), @@ -319,6 +287,7 @@ where .await? } else { self.create_bundle_builder( + task_spawner, i + ep.bundle_builder_index_offset, self.provider.clone(), ep_v0_7.clone(), @@ -332,24 +301,22 @@ where ) .await? }; - sender_handles.push(spawn_guard); bundle_sender_actions.push(bundle_sender_action); } - Ok((sender_handles, bundle_sender_actions)) + Ok(bundle_sender_actions) } - async fn create_bundle_builder( + async fn create_bundle_builder( &self, + task_spawner: &T, index: u64, provider: PR, entry_point: E, simulator: S, pk_iter: &mut I, - ) -> anyhow::Result<( - JoinHandle>, - mpsc::Sender, - )> + ) -> anyhow::Result> where + T: TaskSpawnerExt, UO: UserOperation + From, UserOperationVariant: AsRef, E: EntryPointProvider + Clone + 'static, @@ -361,8 +328,13 @@ where let signer = if let Some(pk) = pk_iter.next() { info!("Using local signer"); BundlerSigner::Local( - LocalSigner::connect(provider.clone(), self.args.chain_spec.id, pk.to_owned()) - .await?, + LocalSigner::connect( + &task_spawner, + provider.clone(), + self.args.chain_spec.id, + pk.to_owned(), + ) + .await?, ) } else { info!("Using AWS KMS signer"); @@ -373,6 +345,7 @@ where // so this should give ample time for the connection to establish. Duration::from_millis(self.args.redis_lock_ttl_millis / 4), KmsSigner::connect( + &task_spawner, provider.clone(), self.args.chain_spec.id, self.args.aws_kms_key_ids.clone(), @@ -454,6 +427,9 @@ where ); // Spawn each sender as its own independent task - Ok((tokio::spawn(builder.send_bundles_in_loop()), send_bundle_tx)) + let ts = task_spawner.clone(); + task_spawner.spawn_critical("bundle sender", builder.send_bundles_in_loop(ts)); + + Ok(send_bundle_tx) } } diff --git a/crates/pool/Cargo.toml b/crates/pool/Cargo.toml index 129b49312..f990aeb54 100644 --- a/crates/pool/Cargo.toml +++ b/crates/pool/Cargo.toml @@ -45,6 +45,7 @@ mockall = {workspace = true, optional = true } mockall.workspace = true rundler-sim = { path = "../sim", features = ["test-utils"] } rundler-provider = { path = "../provider", features = ["test-utils"] } +reth-tasks.workspace = true [build-dependencies] tonic-build.workspace = true diff --git a/crates/pool/src/chain.rs b/crates/pool/src/chain.rs index b2fab9e8b..5185fa207 100644 --- a/crates/pool/src/chain.rs +++ b/crates/pool/src/chain.rs @@ -32,14 +32,13 @@ use rundler_contracts::{ }, }; use rundler_provider::{Block, EvmProvider, Filter, Log}; -use rundler_task::block_watcher; +use rundler_task::{block_watcher, GracefulShutdown}; use rundler_types::{EntryPointVersion, Timestamp, UserOperationId}; use tokio::{ select, sync::{broadcast, Semaphore}, time, }; -use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; const MAX_LOAD_OPS_CONCURRENCY: usize = 64; @@ -175,14 +174,14 @@ impl Chain

{ pub(crate) async fn watch( mut self, sender: broadcast::Sender>, - shutdown_token: CancellationToken, + shutdown: GracefulShutdown, ) { loop { select! { update = self.wait_for_update() => { let _ = sender.send(Arc::new(update)); } - _ = shutdown_token.cancelled() => { + _ = shutdown.clone() => { info!("Shutting down chain watcher"); break; } diff --git a/crates/pool/src/server/local.rs b/crates/pool/src/server/local.rs index b6ac9f663..5725dc51a 100644 --- a/crates/pool/src/server/local.rs +++ b/crates/pool/src/server/local.rs @@ -16,9 +16,12 @@ use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; use alloy_primitives::{Address, B256}; use async_stream::stream; use async_trait::async_trait; -use futures::future; +use futures::future::{self, BoxFuture}; use futures_util::Stream; -use rundler_task::server::{HealthCheck, ServerStatus}; +use rundler_task::{ + server::{HealthCheck, ServerStatus}, + GracefulShutdown, TaskSpawner, +}; use rundler_types::{ pool::{ MempoolError, NewHead, PaymasterMetadata, Pool, PoolError, PoolOperation, PoolResult, @@ -26,12 +29,8 @@ use rundler_types::{ }, EntityUpdate, EntryPointVersion, UserOperationId, UserOperationVariant, }; -use tokio::{ - sync::{broadcast, mpsc, oneshot}, - task::JoinHandle, -}; -use tokio_util::sync::CancellationToken; -use tracing::error; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tracing::{error, info}; use crate::{ chain::ChainUpdate, @@ -68,17 +67,19 @@ impl LocalPoolBuilder { /// Run the local pool server, consumes the builder pub fn run( self, + task_spawner: Box, mempools: HashMap>, chain_updates: broadcast::Receiver>, - shutdown_token: CancellationToken, - ) -> JoinHandle> { - let mut runner = LocalPoolServerRunner::new( + shutdown: GracefulShutdown, + ) -> BoxFuture<'static, ()> { + let runner = LocalPoolServerRunner::new( self.req_receiver, self.block_sender, mempools, chain_updates, + task_spawner, ); - tokio::spawn(async move { runner.run(shutdown_token).await }) + Box::pin(runner.run(shutdown)) } } @@ -95,6 +96,7 @@ struct LocalPoolServerRunner { block_sender: broadcast::Sender, mempools: HashMap>, chain_updates: broadcast::Receiver>, + task_spawner: Box, } impl LocalPoolHandle { @@ -332,7 +334,7 @@ impl Pool for LocalPoolHandle { error!("new_heads_receiver lagged {c} blocks"); } Err(broadcast::error::RecvError::Closed) => { - error!("new_heads_receiver closed"); + info!("new_heads_receiver closed, ending subscription"); break; } } @@ -364,12 +366,14 @@ impl LocalPoolServerRunner { block_sender: broadcast::Sender, mempools: HashMap>, chain_updates: broadcast::Receiver>, + task_spawner: Box, ) -> Self { Self { req_receiver, block_sender, mempools, chain_updates, + task_spawner, } } @@ -507,7 +511,7 @@ impl LocalPoolServerRunner { match self.get_pool(entry_point) { Ok(mempool) => { let mempool = Arc::clone(mempool); - tokio::spawn(f(mempool, response)); + self.task_spawner.spawn(Box::pin(f(mempool, response))); } Err(e) => { if let Err(e) = response.send(Err(e)) { @@ -517,10 +521,10 @@ impl LocalPoolServerRunner { } } - async fn run(&mut self, shutdown_token: CancellationToken) -> anyhow::Result<()> { + async fn run(mut self, shutdown: GracefulShutdown) { loop { tokio::select! { - _ = shutdown_token.cancelled() => { + _ = shutdown.clone() => { break; } chain_update = self.chain_updates.recv() => { @@ -537,13 +541,13 @@ impl LocalPoolServerRunner { let cu = Arc::clone(&chain_update); async move { m.on_chain_update(&cu).await } }).collect(); - tokio::spawn(async move { + self.task_spawner.spawn(Box::pin(async move { future::join_all(update_futures).await; let _ = block_sender.send(NewHead { block_hash: chain_update.latest_block_hash, block_number: chain_update.latest_block_number, }); - }); + })); } } Some(req) = self.req_receiver.recv() => { @@ -686,8 +690,6 @@ impl LocalPoolServerRunner { } } } - - Ok(()) } } @@ -806,6 +808,7 @@ mod tests { use std::{iter::zip, sync::Arc}; use futures_util::StreamExt; + use reth_tasks::TaskManager; use rundler_types::v0_6::UserOperation; use super::*; @@ -921,18 +924,25 @@ mod tests { struct State { handle: LocalPoolHandle, chain_update_tx: broadcast::Sender>, - _run_handle: JoinHandle>, + _task_manager: TaskManager, } fn setup(pools: HashMap>) -> State { let builder = LocalPoolBuilder::new(10, 10); let handle = builder.get_handle(); let (tx, rx) = broadcast::channel(10); - let run_handle = builder.run(pools, rx, CancellationToken::new()); + let tm = TaskManager::current(); + let ts = tm.executor(); + let ts_box = Box::new(ts.clone()); + + ts.spawn_critical_with_graceful_shutdown_signal("test pool", |shutdown| { + builder.run(ts_box, pools, rx, shutdown) + }); + State { handle, chain_update_tx: tx, - _run_handle: run_handle, + _task_manager: tm, } } diff --git a/crates/pool/src/server/mod.rs b/crates/pool/src/server/mod.rs index 2cf32bef2..3f48443db 100644 --- a/crates/pool/src/server/mod.rs +++ b/crates/pool/src/server/mod.rs @@ -15,5 +15,5 @@ mod local; pub use local::{LocalPoolBuilder, LocalPoolHandle}; mod remote; -pub(crate) use remote::spawn_remote_mempool_server; +pub(crate) use remote::remote_mempool_server_task; pub use remote::RemotePoolClient; diff --git a/crates/pool/src/server/remote/client.rs b/crates/pool/src/server/remote/client.rs index 532fa185e..5a8428711 100644 --- a/crates/pool/src/server/remote/client.rs +++ b/crates/pool/src/server/remote/client.rs @@ -19,6 +19,7 @@ use futures_util::Stream; use rundler_task::{ grpc::protos::{from_bytes, ConversionError, ToProtoBytes}, server::{HealthCheck, ServerStatus}, + TaskSpawner, }; use rundler_types::{ chain::ChainSpec, @@ -61,11 +62,16 @@ pub struct RemotePoolClient { chain_spec: ChainSpec, op_pool_client: OpPoolClient, op_pool_health: HealthClient, + task_spawner: Box, } impl RemotePoolClient { /// Connect to a remote pool server, returning a client for submitting requests. - pub async fn connect(url: String, chain_spec: ChainSpec) -> anyhow::Result { + pub async fn connect( + url: String, + chain_spec: ChainSpec, + task_spawner: Box, + ) -> anyhow::Result { let op_pool_client = OpPoolClient::connect(url.clone()).await?; let op_pool_health = HealthClient::new(Channel::builder(Uri::from_str(&url)?).connect().await?); @@ -73,6 +79,7 @@ impl RemotePoolClient { chain_spec, op_pool_client, op_pool_health, + task_spawner, }) } @@ -551,7 +558,8 @@ impl Pool for RemotePoolClient { let (tx, rx) = mpsc::unbounded_channel(); let client = self.op_pool_client.clone(); - tokio::spawn(Self::new_heads_subscription_handler(client, tx)); + self.task_spawner + .spawn(Box::pin(Self::new_heads_subscription_handler(client, tx))); Ok(Box::pin(UnboundedReceiverStream::new(rx))) } } diff --git a/crates/pool/src/server/remote/mod.rs b/crates/pool/src/server/remote/mod.rs index 718252094..488897124 100644 --- a/crates/pool/src/server/remote/mod.rs +++ b/crates/pool/src/server/remote/mod.rs @@ -18,4 +18,4 @@ mod protos; mod server; pub use client::*; -pub(crate) use server::spawn_remote_mempool_server; +pub(crate) use server::remote_mempool_server_task; diff --git a/crates/pool/src/server/remote/server.rs b/crates/pool/src/server/remote/server.rs index 5f4f4d1b8..edaf06e2d 100644 --- a/crates/pool/src/server/remote/server.rs +++ b/crates/pool/src/server/remote/server.rs @@ -25,15 +25,15 @@ use futures_util::StreamExt; use rundler_task::{ grpc::{grpc_metrics::HttpMethodExtractor, protos::from_bytes}, metrics::MetricsLayer, + GracefulShutdown, TaskSpawner, }; use rundler_types::{ chain::ChainSpec, pool::{Pool, Reputation}, EntityUpdate, UserOperationId, UserOperationVariant, }; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; -use tokio_util::sync::CancellationToken; use tonic::{transport::Server, Request, Response, Result, Status}; use super::protos::{ @@ -62,18 +62,20 @@ use crate::server::local::LocalPoolHandle; const MAX_REMOTE_BLOCK_SUBSCRIPTIONS: usize = 32; -pub(crate) async fn spawn_remote_mempool_server( +pub(crate) async fn remote_mempool_server_task( + task_spawner: Box, chain_spec: ChainSpec, local_pool: LocalPoolHandle, addr: SocketAddr, - shutdown_token: CancellationToken, -) -> anyhow::Result>> { + shutdown: GracefulShutdown, +) { // gRPC server - let pool_impl = OpPoolImpl::new(chain_spec, local_pool); + let pool_impl = OpPoolImpl::new(chain_spec, local_pool, task_spawner); let op_pool_server = OpPoolServer::new(pool_impl); let reflection_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(OP_POOL_FILE_DESCRIPTOR_SET) - .build_v1()?; + .build_v1() + .expect("failed to build reflection service"); // health service let (mut health_reporter, health_service) = tonic_health::server::health_reporter(); @@ -86,32 +88,38 @@ pub(crate) async fn spawn_remote_mempool_server( "http-grpc".to_string(), ); - let handle = tokio::spawn(async move { - Server::builder() - .layer(metrics_layer) - .add_service(op_pool_server) - .add_service(reflection_service) - .add_service(health_service) - .serve_with_shutdown(addr, async move { shutdown_token.cancelled().await }) - .await - .map_err(|e| anyhow::anyhow!(format!("pool server failed: {e:?}"))) - }); - - Ok(handle) + if let Err(e) = Server::builder() + .layer(metrics_layer) + .add_service(op_pool_server) + .add_service(reflection_service) + .add_service(health_service) + .serve_with_shutdown(addr, async move { + let _ = shutdown.await; + }) + .await + { + tracing::error!("pool server failed: {e:?}"); + } } struct OpPoolImpl { chain_spec: ChainSpec, local_pool: LocalPoolHandle, num_block_subscriptions: Arc, + task_spawner: Box, } impl OpPoolImpl { - pub(crate) fn new(chain_spec: ChainSpec, local_pool: LocalPoolHandle) -> Self { + pub(crate) fn new( + chain_spec: ChainSpec, + local_pool: LocalPoolHandle, + task_spawner: Box, + ) -> Self { Self { chain_spec, local_pool, num_block_subscriptions: Arc::new(AtomicUsize::new(0)), + task_spawner, } } @@ -551,7 +559,7 @@ impl OpPool for OpPoolImpl { } }; - tokio::spawn(async move { + self.task_spawner.spawn(Box::pin(async move { loop { match new_heads.next().await { Some(new_head) => { @@ -571,7 +579,7 @@ impl OpPool for OpPoolImpl { } } num_block_subscriptions.fetch_sub(1, Ordering::Relaxed); - }); + })); Ok(Response::new(UnboundedReceiverStream::new(rx))) } diff --git a/crates/pool/src/task.rs b/crates/pool/src/task.rs index 908b32f9e..d8867117b 100644 --- a/crates/pool/src/task.rs +++ b/crates/pool/src/task.rs @@ -14,22 +14,21 @@ use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use anyhow::{bail, Context}; -use async_trait::async_trait; +use futures::FutureExt; use rundler_provider::{EntryPointProvider, EvmProvider}; use rundler_sim::{ gas::{self, FeeEstimatorImpl}, simulation::{self, UnsafeSimulator}, PrecheckerImpl, Simulator, }; -use rundler_task::Task; +use rundler_task::TaskSpawnerExt; use rundler_types::{ chain::ChainSpec, v0_6::UserOperation as UserOperationV0_6, v0_7::UserOperation as UserOperationV0_7, EntryPointVersion, UserOperation, UserOperationVariant, }; -use rundler_utils::{emit::WithEntryPoint, handle}; -use tokio::{sync::broadcast, try_join}; -use tokio_util::sync::CancellationToken; +use rundler_utils::emit::WithEntryPoint; +use tokio::sync::broadcast; use super::mempool::PoolConfig; use crate::{ @@ -38,7 +37,7 @@ use crate::{ mempool::{ AddressReputation, Mempool, PaymasterConfig, PaymasterTracker, ReputationParams, UoPool, }, - server::{spawn_remote_mempool_server, LocalPoolBuilder}, + server::{self, LocalPoolBuilder}, }; /// Arguments for the pool task. @@ -74,18 +73,35 @@ pub struct PoolTask { ep_07: Option, } -#[async_trait] -impl Task for PoolTask +impl PoolTask { + /// Create a new pool task. + pub fn new( + args: Args, + event_sender: broadcast::Sender>, + pool_builder: LocalPoolBuilder, + provider: P, + ep_06: Option, + ep_07: Option, + ) -> Self { + Self { + args, + event_sender, + pool_builder, + provider, + ep_06, + ep_07, + } + } +} + +impl PoolTask where P: EvmProvider + Clone + 'static, E06: EntryPointProvider + Clone + 'static, E07: EntryPointProvider + Clone + 'static, { - fn boxed(self) -> Box { - Box::new(self) - } - - async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { + /// Spawns the mempool task on the given task spawner. + pub async fn spawn(self, task_spawner: T) -> anyhow::Result<()> { let chain_id = self.args.chain_spec.id; tracing::info!("Chain id: {chain_id}"); tracing::info!("Http url: {:?}", self.args.http_url); @@ -105,10 +121,9 @@ where let chain = Chain::new(self.provider.clone(), chain_settings); let (update_sender, _) = broadcast::channel(self.args.chain_update_channel_capacity); - let chain_handle = tokio::spawn({ - let update_sender = update_sender.clone(); - let shutdown_token = shutdown_token.clone(); - async move { chain.watch(update_sender, shutdown_token).await } + + task_spawner.spawn_critical_with_graceful_shutdown_signal("chain watcher", |shutdown| { + chain.watch(update_sender.clone(), shutdown) }); // create mempools @@ -118,6 +133,7 @@ where EntryPointVersion::V0_6 => { let pool = self .create_mempool_v0_6( + &task_spawner, self.args.chain_spec.clone(), pool_config, self.args.unsafe_mode, @@ -130,6 +146,7 @@ where EntryPointVersion::V0_7 => { let pool = self .create_mempool_v0_7( + &task_spawner, self.args.chain_spec.clone(), pool_config, self.args.unsafe_mode, @@ -146,76 +163,45 @@ where } let pool_handle = self.pool_builder.get_handle(); - let pool_runner_handle = - self.pool_builder - .run(mempools, update_sender.subscribe(), shutdown_token.clone()); - let remote_handle = match self.args.remote_address { - Some(addr) => { - spawn_remote_mempool_server( - self.args.chain_spec.clone(), - pool_handle, - addr, - shutdown_token, - ) - .await? - } - None => tokio::spawn(async { Ok(()) }), + let ts_box = Box::new(task_spawner.clone()); + task_spawner.spawn_critical_with_graceful_shutdown_signal( + "local pool server", + |shutdown| { + self.pool_builder + .run(ts_box, mempools, update_sender.subscribe(), shutdown) + }, + ); + + if let Some(addr) = self.args.remote_address { + let ts_box = Box::new(task_spawner.clone()); + task_spawner.spawn_critical_with_graceful_shutdown_signal( + "remote mempool server", + |shutdown| { + server::remote_mempool_server_task( + ts_box, + self.args.chain_spec.clone(), + pool_handle, + addr, + shutdown, + ) + }, + ); }; tracing::info!("Started op_pool"); - match try_join!( - handle::flatten_handle(pool_runner_handle), - handle::flatten_handle(remote_handle), - handle::as_anyhow_handle(chain_handle), - ) { - Ok(_) => { - tracing::info!("Pool server shutdown"); - Ok(()) - } - Err(e) => { - tracing::error!("Pool server error: {e:?}"); - bail!("Pool server error: {e:?}") - } - } - } -} - -impl PoolTask { - /// Create a new pool task. - pub fn new( - args: Args, - event_sender: broadcast::Sender>, - pool_builder: LocalPoolBuilder, - provider: P, - ep_06: Option, - ep_07: Option, - ) -> Self { - Self { - args, - event_sender, - pool_builder, - provider, - ep_06, - ep_07, - } + Ok(()) } -} -impl<'a, P, E06, E07> PoolTask -where - P: EvmProvider + Clone + 'a, - E06: EntryPointProvider + Clone + 'a, - E07: EntryPointProvider + Clone + 'a, -{ - fn create_mempool_v0_6( + fn create_mempool_v0_6( &self, + task_spawner: &T, chain_spec: ChainSpec, pool_config: &PoolConfig, unsafe_mode: bool, event_sender: broadcast::Sender>, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let ep = self .ep_06 .clone() @@ -228,6 +214,7 @@ where pool_config.sim_settings.clone(), ); Self::create_mempool( + task_spawner, chain_spec, pool_config, event_sender, @@ -243,6 +230,7 @@ where pool_config.mempool_channel_configs.clone(), ); Self::create_mempool( + task_spawner, chain_spec, pool_config, event_sender, @@ -253,13 +241,14 @@ where } } - fn create_mempool_v0_7( + fn create_mempool_v0_7( &self, + task_spawner: &T, chain_spec: ChainSpec, pool_config: &PoolConfig, unsafe_mode: bool, event_sender: broadcast::Sender>, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let ep = self .ep_07 .clone() @@ -272,6 +261,7 @@ where pool_config.sim_settings.clone(), ); Self::create_mempool( + task_spawner, chain_spec, pool_config, event_sender, @@ -287,6 +277,7 @@ where pool_config.mempool_channel_configs.clone(), ); Self::create_mempool( + task_spawner, chain_spec, pool_config, event_sender, @@ -297,19 +288,21 @@ where } } - fn create_mempool( + fn create_mempool( + task_spawner: &T, chain_spec: ChainSpec, pool_config: &PoolConfig, event_sender: broadcast::Sender>, provider: P, ep: E, simulator: S, - ) -> anyhow::Result> + ) -> anyhow::Result> where + T: TaskSpawnerExt, UO: UserOperation + From + Into, UserOperationVariant: From, - E: EntryPointProvider + Clone + 'a, - S: Simulator + 'a, + E: EntryPointProvider + Clone + 'static, + S: Simulator + 'static, { let fee_oracle = gas::get_fee_oracle(&chain_spec, provider.clone()); let fee_estimator = FeeEstimatorImpl::new( @@ -337,7 +330,10 @@ where // Start reputation manager let reputation_runner = Arc::clone(&reputation); - tokio::spawn(async move { reputation_runner.run().await }); + task_spawner.spawn_critical( + "reputation manager", + async move { reputation_runner.run().await }.boxed(), + ); let paymaster = PaymasterTracker::new( ep.clone(), diff --git a/crates/rpc/src/task.rs b/crates/rpc/src/task.rs index de5c2ba2c..c33bc4889 100644 --- a/crates/rpc/src/task.rs +++ b/crates/rpc/src/task.rs @@ -13,8 +13,8 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; -use anyhow::{bail, Context}; -use async_trait::async_trait; +use anyhow::Context; +use futures_util::FutureExt; use jsonrpsee::{ server::{middleware::http::ProxyGetRequestLayer, ServerBuilder}, RpcModule, @@ -26,13 +26,12 @@ use rundler_sim::{ }; use rundler_task::{ server::{format_socket_addr, HealthCheck}, - Task, + TaskSpawner, }; use rundler_types::{ builder::Builder, chain::ChainSpec, pool::Pool, v0_6::UserOperation as UserOperationV0_6, v0_7::UserOperation as UserOperationV0_7, }; -use tokio_util::sync::CancellationToken; use tracing::info; use crate::{ @@ -91,8 +90,28 @@ pub struct RpcTask { ep_07: Option, } -#[async_trait] -impl Task for RpcTask +impl RpcTask { + /// Creates a new RPC server task. + pub fn new( + args: Args, + pool: P, + builder: B, + provider: PR, + ep_06: Option, + ep_07: Option, + ) -> Self { + Self { + args, + pool, + builder, + provider, + ep_06, + ep_07, + } + } +} + +impl RpcTask where P: Pool + HealthCheck + Clone + 'static, B: Builder + HealthCheck + Clone + 'static, @@ -100,11 +119,8 @@ where E06: EntryPointProvider + Clone + 'static, E07: EntryPointProvider + Clone + 'static, { - fn boxed(self) -> Box { - Box::new(self) - } - - async fn run(mut self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()> { + /// Spawns the RPC server task on the given task spawner. + pub async fn spawn(self, task_spawner: T) -> anyhow::Result<()> { let addr: SocketAddr = format_socket_addr(&self.args.host, self.args.port).parse()?; tracing::info!("Starting rpc server on {}", addr); @@ -189,12 +205,6 @@ where .layer(ProxyGetRequestLayer::new("/health", "system_health")?) .timeout(self.args.rpc_timeout); - // TODO: add metrics - // let rpc_metric_middleware = MetricsLayer::>::new( - // "rundler-eth-service".to_string(), - // "rpc".to_string(), - // ); - let server = ServerBuilder::default() .set_http_middleware(http_middleware) .max_connections(self.args.max_connections) @@ -211,50 +221,20 @@ where let handle = server.start(module); - info!("Started RPC server"); - - tokio::select! { - _ = handle.stopped() => { - tracing::error!("RPC server stopped unexpectedly"); - bail!("RPC server stopped unexpectedly") - } - _ = shutdown_token.cancelled() => { - tracing::info!("Server shutdown"); - Ok(()) + task_spawner.spawn_critical( + "rpc server", + async move { + handle.stopped().await; + tracing::error!("RPC server stopped"); } - } - } -} + .boxed(), + ); -impl RpcTask { - /// Creates a new RPC server task. - pub fn new( - args: Args, - pool: P, - builder: B, - provider: PR, - ep_06: Option, - ep_07: Option, - ) -> Self { - Self { - args, - pool, - builder, - provider, - ep_06, - ep_07, - } + info!("Started RPC server"); + + Ok(()) } -} -impl RpcTask -where - P: Pool + HealthCheck + Clone, - B: Builder + HealthCheck + Clone, - PR: EvmProvider, - E06: EntryPointProvider, - E07: EntryPointProvider, -{ fn attach_namespaces( &self, entry_point_router: EntryPointRouter, diff --git a/crates/task/Cargo.toml b/crates/task/Cargo.toml index 1e457548b..9fd2031f2 100644 --- a/crates/task/Cargo.toml +++ b/crates/task/Cargo.toml @@ -18,6 +18,7 @@ anyhow.workspace = true async-trait.workspace = true futures.workspace = true metrics.workspace = true +reth-tasks.workspace = true tokio.workspace = true tokio-util.workspace = true tonic.workspace = true diff --git a/crates/task/src/lib.rs b/crates/task/src/lib.rs index 5cb280f02..5fff8c992 100644 --- a/crates/task/src/lib.rs +++ b/crates/task/src/lib.rs @@ -24,5 +24,11 @@ pub mod grpc; pub mod metrics; pub mod server; -mod task; -pub use task::*; +pub use reth_tasks::{ + shutdown::GracefulShutdown, TaskSpawner, TaskSpawnerExt as RethTaskSpawnerExt, +}; + +/// A trait that extends Reth's `TaskSpawner` with additional methods. +pub trait TaskSpawnerExt: TaskSpawner + RethTaskSpawnerExt + Clone + 'static {} + +impl TaskSpawnerExt for T {} diff --git a/crates/task/src/task.rs b/crates/task/src/task.rs deleted file mode 100644 index 80b984a81..000000000 --- a/crates/task/src/task.rs +++ /dev/null @@ -1,72 +0,0 @@ -// This file is part of Rundler. -// -// Rundler is free software: you can redistribute it and/or modify it under the -// terms of the GNU Lesser General Public License as published by the Free Software -// Foundation, either version 3 of the License, or (at your option) any later version. -// -// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with Rundler. -// If not, see https://www.gnu.org/licenses/. - -//! Task trait and helper functions - -use async_trait::async_trait; -use futures::{future::try_join_all, Future}; -use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; -use tracing::{error, info}; - -/// Core task trait implemented by top level Rundler tasks. -#[async_trait] -pub trait Task: Sync + Send + 'static { - /// Convert into a boxed task. - fn boxed(self) -> Box; - - /// Run the task. - async fn run(self: Box, shutdown_token: CancellationToken) -> anyhow::Result<()>; -} - -/// Spawn a set of tasks and wait for a shutdown signal. -pub async fn spawn_tasks_with_shutdown( - tasks: impl IntoIterator>, - signal: T, -) where - T: Future> + Send + 'static, - E: std::fmt::Debug, -{ - let (shutdown_scope, mut shutdown_wait) = mpsc::channel::<()>(1); - let shutdown_token = CancellationToken::new(); - let mut shutdown_scope = Some(shutdown_scope); - - let handles = tasks.into_iter().map(|task| { - let st = shutdown_token.clone(); - let ss = shutdown_scope.clone(); - async move { - let ret = task.run(st).await; - drop(ss); - ret - } - }); - tokio::select! { - res = try_join_all(handles) => { - error!("Task exited unexpectedly: {res:?}"); - } - res = signal => { - match res { - Ok(_) => { - info!("Received signal, shutting down"); - } - Err(err) => { - error!("Error while waiting for signal: {err:?}"); - } - } - } - } - - shutdown_token.cancel(); - shutdown_scope.take(); - shutdown_wait.recv().await; -} diff --git a/crates/utils/src/emit.rs b/crates/utils/src/emit.rs index 27a190daa..c1060250f 100644 --- a/crates/utils/src/emit.rs +++ b/crates/utils/src/emit.rs @@ -16,10 +16,7 @@ use std::fmt::Display; use alloy_primitives::Address; -use tokio::{ - sync::broadcast::{self, error::RecvError}, - task::JoinHandle, -}; +use tokio::sync::broadcast::{self, error::RecvError}; use tracing::{info, warn}; /// Capacity of the event channels. @@ -56,36 +53,32 @@ impl Display for WithEntryPoint { /// Receive events from a event broadcast channel and call /// the given handler function for each event. -pub fn receive_events( +pub async fn receive_events( description: &'static str, mut rx: broadcast::Receiver, handler: impl Fn(T) + Send + 'static, -) -> JoinHandle<()> -where +) where T: Clone + Send + 'static, { - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(event) => handler(event), - Err(RecvError::Closed) => { - info!("Event stream for {description} closed. Logging complete"); - break; - } - Err(RecvError::Lagged(count)) => { - warn!("Event stream for {description} lagged. Missed {count} messages.") - } + loop { + match rx.recv().await { + Ok(event) => handler(event), + Err(RecvError::Closed) => { + info!("Event stream for {description} closed. Logging complete"); + break; + } + Err(RecvError::Lagged(count)) => { + warn!("Event stream for {description} lagged. Missed {count} messages.") } } - }) + } } /// An event handler that simply logs the event at an INFO level. -pub fn receive_and_log_events_with_filter( +pub async fn receive_and_log_events_with_filter( rx: broadcast::Receiver, filter: impl (Fn(&T) -> bool) + Send + 'static, -) -> JoinHandle<()> -where +) where T: Clone + Display + Send + 'static, { receive_events("logging", rx, move |event| { @@ -93,4 +86,5 @@ where info!("{}", event); } }) + .await } diff --git a/crates/utils/src/handle.rs b/crates/utils/src/handle.rs deleted file mode 100644 index bd204258c..000000000 --- a/crates/utils/src/handle.rs +++ /dev/null @@ -1,58 +0,0 @@ -// This file is part of Rundler. -// -// Rundler is free software: you can redistribute it and/or modify it under the -// terms of the GNU Lesser General Public License as published by the Free Software -// Foundation, either version 3 of the License, or (at your option) any later version. -// -// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. -// See the GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License along with Rundler. -// If not, see https://www.gnu.org/licenses/. - -//! Utilities for working with future handles. - -use anyhow::Context; -use futures::Future; -use tokio::task::{AbortHandle, JoinHandle}; - -/// Flatten a JoinHandle result. -/// -/// Flattens the two types of errors that can occur when awaiting a handle. -/// Useful when using tokio::try_join! to await multiple handles. -pub async fn flatten_handle(handle: JoinHandle>) -> anyhow::Result { - match handle.await { - Ok(Ok(result)) => Ok(result), - Ok(Err(err)) => Err(err)?, - Err(err) => Err(err).context("handling failed")?, - } -} - -/// Converts a JoinHandle result into an `anyhow::Result`. Like -/// `flatten_handle`, useful when using `tokio::try_join!` to await multiple -/// handles. -pub async fn as_anyhow_handle(handle: JoinHandle) -> anyhow::Result { - handle.await.context("handling failed") -} - -/// A guard that aborts a spawned task when dropped. -#[derive(Debug)] -pub struct SpawnGuard(AbortHandle); - -impl SpawnGuard { - /// Spawn a future on Tokio and return a guard that will abort it when dropped. - pub fn spawn_with_guard(fut: T) -> Self - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - Self(tokio::spawn(fut).abort_handle()) - } -} - -impl Drop for SpawnGuard { - fn drop(&mut self) { - self.0.abort(); - } -} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index ac1a44f03..7453c1ed3 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -23,7 +23,6 @@ pub mod cache; pub mod emit; pub mod eth; -pub mod handle; pub mod log; pub mod math; pub mod retry;