Skip to content

Commit

Permalink
feat: replace existing tasks with reth tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Oct 3, 2024
1 parent 0a43dff commit fe4f3d3
Show file tree
Hide file tree
Showing 32 changed files with 626 additions and 659 deletions.
55 changes: 55 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ alloy-transport-http = { version = "0.4.1", default-features = false, features =
# alloy other
alloy-rlp = "0.3.8"

# reth
reth-tasks = { git = "https://github.com/paradigmxyz/reth.git", tag = "v1.0.7" }

anyhow = "1.0.89"
async-trait = "0.1.83"
auto_impl = "1.2.0"
Expand Down
1 change: 1 addition & 0 deletions bin/rundler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ metrics-exporter-prometheus = { version = "0.15.3", default-features = false, fe
metrics-process = "2.1.0"
metrics-util = "0.17.0"
paste = "1.0"
reth-tasks.workspace = true
serde.workspace = true
serde_json.workspace = true
sscanf = "0.4.2"
Expand Down
39 changes: 22 additions & 17 deletions bin/rundler/src/cli/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use rundler_pool::RemotePoolClient;
use rundler_sim::{MempoolConfigs, PriorityFeeMode};
use rundler_task::{
server::{connect_with_retries_shutdown, format_socket_addr},
spawn_tasks_with_shutdown, Task,
TaskSpawnerExt,
};
use rundler_types::{chain::ChainSpec, EntryPointVersion};
use rundler_utils::emit::{self, WithEntryPoint, EVENT_CHANNEL_CAPACITY};
Expand Down Expand Up @@ -419,7 +419,8 @@ pub struct BuilderCliArgs {
pool_url: String,
}

pub async fn run(
pub async fn spawn_tasks<T: TaskSpawnerExt + 'static>(
task_spawner: T,
chain_spec: ChainSpec,
builder_args: BuilderCliArgs,
common_args: CommonArgs,
Expand All @@ -430,7 +431,13 @@ pub async fn run(
} = builder_args;

let (event_sender, event_rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
emit::receive_and_log_events_with_filter(event_rx, is_nonspammy_event);
task_spawner.spawn_critical(
"recv and log events",
Box::pin(emit::receive_and_log_events_with_filter(
event_rx,
is_nonspammy_event,
)),
);

let task_args = builder_args
.to_args(
Expand All @@ -443,7 +450,7 @@ pub async fn run(
let pool = connect_with_retries_shutdown(
"op pool from builder",
&pool_url,
|url| RemotePoolClient::connect(url, chain_spec.clone()),
|url| RemotePoolClient::connect(url, chain_spec.clone(), Box::new(task_spawner.clone())),
tokio::signal::ctrl_c(),
)
.await?;
Expand All @@ -454,20 +461,18 @@ pub async fn run(
ep_v0_7,
} = super::construct_providers(&common_args, &chain_spec)?;

spawn_tasks_with_shutdown(
[BuilderTask::new(
task_args,
event_sender,
LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY),
pool,
provider,
ep_v0_6,
ep_v0_7,
)
.boxed()],
tokio::signal::ctrl_c(),
BuilderTask::new(
task_args,
event_sender,
LocalBuilderBuilder::new(REQUEST_CHANNEL_CAPACITY),
pool,
provider,
ep_v0_6,
ep_v0_7,
)
.await;
.spawn(task_spawner)
.await?;

Ok(())
}

Expand Down
45 changes: 30 additions & 15 deletions bin/rundler/src/cli/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use metrics::gauge;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_process::Collector;
use metrics_util::layers::{PrefixLayer, Stack};
use rundler_task::TaskSpawner;

pub fn initialize<'a>(
pub fn initialize<'a, T: TaskSpawner>(
task_spawner: &T,
sample_interval_millis: u64,
listen_addr: SocketAddr,
tags: impl IntoIterator<Item = &'a String>,
Expand All @@ -38,28 +40,41 @@ pub fn initialize<'a>(
builder = builder.set_buckets(buckets)?;

let (recorder, exporter) = builder.build()?;
tokio::spawn(exporter);
task_spawner.spawn_critical(
"metrics exporter",
Box::pin(async move {
if exporter.await.is_err() {
tracing::error!("metrics exporter failed");
}
}),
);
let stack = Stack::new(recorder);
stack.push(PrefixLayer::new("rundler")).install()?;

tokio::spawn(async move {
let collector = Collector::default();
loop {
collector.collect();
tokio::time::sleep(Duration::from_millis(sample_interval_millis)).await;
}
});
task_spawner.spawn_critical(
"metrics collector",
Box::pin(async move {
let collector = Collector::default();
loop {
collector.collect();
tokio::time::sleep(Duration::from_millis(sample_interval_millis)).await;
}
}),
);

let handle = tokio::runtime::Handle::current();
let frequency = std::time::Duration::from_millis(sample_interval_millis);
let runtime_metrics = handle.metrics();
let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);
tokio::spawn(async move {
for metrics in runtime_monitor.intervals() {
collect_tokio(&runtime_metrics, metrics);
tokio::time::sleep(frequency).await;
}
});
task_spawner.spawn_critical(
"tokio metrics collector",
Box::pin(async move {
for metrics in runtime_monitor.intervals() {
collect_tokio(&runtime_metrics, metrics);
tokio::time::sleep(frequency).await;
}
}),
);

Ok(())
}
Expand Down
34 changes: 29 additions & 5 deletions bin/rundler/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use alloy_primitives::U256;
use anyhow::{bail, Context};
Expand All @@ -29,6 +29,7 @@ mod tracing;
use builder::BuilderCliArgs;
use node::NodeCliArgs;
use pool::PoolCliArgs;
use reth_tasks::TaskManager;
use rpc::RpcCliArgs;
use rundler_provider::{
AlloyEntryPointV0_6, AlloyEntryPointV0_7, AlloyEvmProvider, EntryPointProvider, EvmProvider,
Expand All @@ -51,8 +52,12 @@ pub async fn run() -> anyhow::Result<()> {
let _guard = tracing::configure_logging(&opt.logs)?;
tracing::info!("Parsed CLI options: {:#?}", opt);

let mut task_manager = TaskManager::current();
let task_spawner = task_manager.executor();

let metrics_addr = format!("{}:{}", opt.metrics.host, opt.metrics.port).parse()?;
metrics::initialize(
&task_spawner,
opt.metrics.sample_interval_millis,
metrics_addr,
&opt.metrics.tags,
Expand All @@ -64,12 +69,31 @@ pub async fn run() -> anyhow::Result<()> {
tracing::info!("Chain spec: {:#?}", cs);

match opt.command {
Command::Node(args) => node::run(cs, *args, opt.common).await?,
Command::Pool(args) => pool::run(cs, args, opt.common).await?,
Command::Rpc(args) => rpc::run(cs, args, opt.common).await?,
Command::Builder(args) => builder::run(cs, args, opt.common).await?,
Command::Node(args) => {
node::spawn_tasks(task_spawner.clone(), cs, *args, opt.common).await?
}
Command::Pool(args) => {
pool::spawn_tasks(task_spawner.clone(), cs, args, opt.common).await?
}
Command::Rpc(args) => rpc::spawn_tasks(task_spawner.clone(), cs, args, opt.common).await?,
Command::Builder(args) => {
builder::spawn_tasks(task_spawner.clone(), cs, args, opt.common).await?
}
}

// wait for ctrl-c or the task manager to panic
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::info!("Received ctrl-c, shutting down");
},
e = &mut task_manager => {
tracing::error!("Task manager panicked shutting down: {e}");
},
}

// wait for the task manager to shutdown
task_manager.graceful_shutdown_with_timeout(Duration::from_secs(10));

tracing::info!("Shutdown, goodbye");
Ok(())
}
Expand Down
Loading

0 comments on commit fe4f3d3

Please sign in to comment.