From c876d2b4c3073af52542a865d948512be1eb4063 Mon Sep 17 00:00:00 2001 From: Ethan Oroshiba Date: Wed, 28 Aug 2024 08:23:56 -0500 Subject: [PATCH] refactor(composer): streamline geth and executor `run_until_stopped` (#1361) ## Summary Shortened and streamlined geth and executor `run_until_stopped()` functions to get rid of clippy exceptions and ensure events are emitted within spans. ## Background #1326 removed instrumentation on these `run_until_stopped()` functions, which revealed a clippy warning for too many lines. Additionally, this made it so that logging inside these functions would not be emitted within any span. ## Changes - Delegated many tasks to helper functions with instrumentation. - Created new `utils` module to house a shared `report_exit_reason()` function. - Moved `ensure_chain_id_is_correct()` and `get_latest_nonce()` to `init()` (previosuly `pre_run_checks()`) ## Testing Passing all tests. ## Related Issues Part of #1321 --------- Co-authored-by: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Co-authored-by: Richard Janis Goldschmidt --- crates/astria-composer/src/collectors/geth.rs | 239 ++++++++------ crates/astria-composer/src/executor/mod.rs | 312 +++++++++--------- crates/astria-composer/src/lib.rs | 1 + crates/astria-composer/src/utils.rs | 16 + 4 files changed, 321 insertions(+), 247 deletions(-) create mode 100644 crates/astria-composer/src/utils.rs diff --git a/crates/astria-composer/src/collectors/geth.rs b/crates/astria-composer/src/collectors/geth.rs index 66ad311cee..16c9c7bc8b 100644 --- a/crates/astria-composer/src/collectors/geth.rs +++ b/crates/astria-composer/src/collectors/geth.rs @@ -28,10 +28,14 @@ use astria_eyre::eyre::{ Report, WrapErr as _, }; -use ethers::providers::{ - Provider, - ProviderError, - Ws, +use ethers::{ + providers::{ + Provider, + ProviderError, + SubscriptionStream, + Ws, + }, + types::Transaction, }; use metrics::Counter; use tokio::{ @@ -43,16 +47,20 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{ - debug, error, info, + instrument, warn, }; use crate::{ collectors::EXECUTOR_SEND_TIMEOUT, - executor, + executor::{ + self, + Handle, + }, metrics::Metrics, + utils::report_exit_reason, }; type StdError = dyn std::error::Error; @@ -147,9 +155,6 @@ impl Geth { /// Starts the collector instance and runs until failure or until /// explicitly closed - // Allow: Refactored in https://github.com/astriaorg/astria/pull/1361. Current logging events will show as errors in otel, - // refactor addresses this as well. - #[allow(clippy::too_many_lines)] pub(crate) async fn run_until_stopped(self) -> eyre::Result<()> { use ethers::providers::Middleware as _; use futures::stream::StreamExt as _; @@ -165,55 +170,12 @@ impl Geth { fee_asset, } = self; - let txs_received_counter = metrics - .geth_txs_received(&chain_name) - .cloned() - .unwrap_or_else(|| { - error!( - rollup_chain_name = %chain_name, - "failed to get geth transactions_received counter" - ); - Counter::noop() - }); - let txs_dropped_counter = metrics - .geth_txs_dropped(&chain_name) - .cloned() - .unwrap_or_else(|| { - error!( - rollup_chain_name = %chain_name, - "failed to get geth transactions_dropped counter" - ); - Counter::noop() - }); - - let retry_config = tryhard::RetryFutureConfig::new(1024) - .exponential_backoff(Duration::from_millis(500)) - .max_delay(Duration::from_secs(60)) - .on_retry( - |attempt, next_delay: Option, error: &ProviderError| { - let wait_duration = next_delay - .map(humantime::format_duration) - .map(tracing::field::display); - warn!( - attempt, - wait_duration, - error = error as &StdError, - "attempt to connect to geth node failed; retrying after backoff", - ); - futures::future::ready(()) - }, - ); + let txs_received_counter = txs_received_counter(metrics, &chain_name); + let txs_dropped_counter = txs_dropped_counter(metrics, &chain_name); - let client = tryhard::retry_fn(|| { - let url = url.clone(); - async move { - let websocket_client = Ws::connect_with_reconnects(url, 0).await?; - Ok(Provider::new(websocket_client)) - } - }) - .with_config(retry_config) - .await - .wrap_err("failed connecting to geth after several retries; giving up")?; + let client = connect_to_geth_node(url) + .await + .wrap_err("failed to connect to geth node")?; let mut tx_stream = client .subscribe_full_pending_txs() @@ -231,7 +193,6 @@ impl Geth { tx_res = tx_stream.next() => { if let Some(tx) = tx_res { let tx_hash = tx.hash; - debug!(transaction.hash = %tx_hash, "collected transaction from rollup"); let data = tx.rlp().to_vec(); let seq_action = SequenceAction { rollup_id, @@ -241,29 +202,15 @@ impl Geth { txs_received_counter.increment(1); - match executor_handle - .send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT) - .await - { - Ok(()) => {}, - Err(SendTimeoutError::Timeout(_seq_action)) => { - warn!( - transaction.hash = %tx_hash, - timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(), - "timed out sending new transaction to executor; dropping tx", - ); - txs_dropped_counter.increment(1); - } - Err(SendTimeoutError::Closed(_seq_action)) => { - warn!( - transaction.hash = %tx_hash, - "executor channel closed while sending transaction; dropping transaction \ - and exiting event loop" - ); - txs_dropped_counter.increment(1); - break Err(eyre!("executor channel closed while sending transaction")); - } + if let Err(err) = forward_geth_tx( + &executor_handle, + seq_action, + tx_hash, + &txs_dropped_counter, + ).await { + break Err(err); } + } else { break Err(eyre!("geth tx stream ended")); } @@ -271,34 +218,126 @@ impl Geth { } }; - match &reason { - Ok(reason) => { - info!(reason, "shutting down"); - } - Err(reason) => { - error!(%reason, "shutting down"); - } - }; + report_exit_reason(reason.as_deref()); status.send_modify(|status| status.is_connected = false); // if the loop exits with an error, we can still proceed with unsubscribing the WSS // stream as we could have exited due to an error in sending messages via the executor // channel. + unsubscribe_from_rollup(&tx_stream).await; - // give 2s for the websocket connection to be unsubscribed as we want to avoid having - // this hang for too long - match tokio::time::timeout(WSS_UNSUBSCRIBE_TIMEOUT, tx_stream.unsubscribe()).await { - Ok(Ok(true)) => info!("unsubscribed from geth tx stream"), - Ok(Ok(false)) => warn!("failed to unsubscribe from geth tx stream"), - Ok(Err(err)) => { - error!(error = %Report::new(err), "failed unsubscribing from the geth tx stream"); - } - Err(err) => { - error!(error = %Report::new(err), "timed out while unsubscribing from the geth tx stream"); - } + reason.map(|_| ()) + } +} + +#[instrument(skip_all)] +async fn forward_geth_tx( + executor_handle: &Handle, + seq_action: SequenceAction, + tx_hash: ethers::types::H256, + txs_dropped_counter: &Counter, +) -> eyre::Result<()> { + match executor_handle + .send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT) + .await + { + Ok(()) => Ok(()), + Err(SendTimeoutError::Timeout(_seq_action)) => { + warn!( + transaction.hash = %tx_hash, + timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(), + "timed out sending new transaction to executor; dropping tx", + ); + txs_dropped_counter.increment(1); + Ok(()) } + Err(SendTimeoutError::Closed(_seq_action)) => { + warn!( + transaction.hash = %tx_hash, + "executor channel closed while sending transaction; dropping transaction \ + and exiting event loop" + ); + txs_dropped_counter.increment(1); + Err(eyre!("executor channel closed while sending transaction")) + } + } +} - reason.map(|_| ()) +#[instrument(skip_all)] +async fn unsubscribe_from_rollup(tx_stream: &SubscriptionStream<'_, Ws, Transaction>) { + // give 2s for the websocket connection to be unsubscribed as we want to avoid having + // this hang for too long + match tokio::time::timeout(WSS_UNSUBSCRIBE_TIMEOUT, tx_stream.unsubscribe()).await { + Ok(Ok(true)) => info!("unsubscribed from geth tx stream"), + Ok(Ok(false)) => warn!("geth responded to unsubscribe request but returned `false`"), + Ok(Err(err)) => { + error!(error = %Report::new(err), "failed unsubscribing from the geth tx stream"); + } + Err(_) => { + error!("timed out while unsubscribing from the geth tx stream"); + } } } + +#[instrument(skip_all)] +fn txs_received_counter(metrics: &'static Metrics, chain_name: &String) -> Counter { + metrics + .geth_txs_received(chain_name) + .cloned() + .unwrap_or_else(|| { + error!( + rollup_chain_name = %chain_name, + "failed to get geth transactions_received counter" + ); + Counter::noop() + }) +} + +#[instrument(skip_all)] +fn txs_dropped_counter(metrics: &'static Metrics, chain_name: &String) -> Counter { + metrics + .geth_txs_dropped(chain_name) + .cloned() + .unwrap_or_else(|| { + error!( + rollup_chain_name = %chain_name, + "failed to get geth transactions_dropped counter" + ); + Counter::noop() + }) +} + +#[instrument(skip_all, err)] +async fn connect_to_geth_node(url: String) -> eyre::Result> { + let retry_config = tryhard::RetryFutureConfig::new(1024) + .exponential_backoff(Duration::from_millis(500)) + .max_delay(Duration::from_secs(60)) + .on_retry( + |attempt, next_delay: Option, error: &ProviderError| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &StdError, + "attempt to connect to geth node failed; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let client = tryhard::retry_fn(|| { + let url = url.clone(); + async move { + let websocket_client = Ws::connect_with_reconnects(url, 0).await?; + Ok(Provider::new(websocket_client)) + } + }) + .with_config(retry_config) + .await + .wrap_err("failed connecting to geth after several retries; giving up")?; + + Ok(client) +} diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index a8fa8be5b1..8d87f5b569 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -24,7 +24,6 @@ use astria_core::{ }; use astria_eyre::eyre::{ self, - eyre, WrapErr as _, }; use futures::{ @@ -60,6 +59,7 @@ use tokio::{ time::{ self, Instant, + Sleep, }, }; use tokio_util::sync::CancellationToken; @@ -82,6 +82,7 @@ use crate::{ SizedBundleReport, }, metrics::Metrics, + utils::report_exit_reason, }; mod bundle_factory; @@ -212,25 +213,19 @@ impl Executor { /// /// # Errors /// An error is returned if connecting to the sequencer fails. - // Allow: Refactored in https://github.com/astriaorg/astria/pull/1361. Current logging events will show as errors in otel, - // refactor addresses this as well. - #[allow(clippy::too_many_lines)] pub(super) async fn run_until_stopped(mut self) -> eyre::Result<()> { - select!( + let mut nonce = select!( biased; () = self.shutdown_token.cancelled() => { - info!("received shutdown signal while running initialization routines; exiting"); - return Ok(()); + report_exit_reason(Ok("received shutdown signal while running initialization routines; exiting")); + return Ok(()) } - res = self.pre_run_checks() => { - res.wrap_err("required pre-run checks failed")?; + nonce = self.init() => { + nonce.wrap_err("initialization failed").inspect_err(|err| report_exit_reason(Err(err)))? } ); let mut submission_fut: Fuse> = Fuse::terminated(); - let mut nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics) - .await - .wrap_err("failed getting initial nonce from sequencer")?; self.metrics.set_current_nonce(nonce); @@ -256,14 +251,9 @@ impl Executor { } // process submission result and update nonce rsp = &mut submission_fut, if !submission_fut.is_terminated() => { - match rsp { - Ok(new_nonce) => nonce = new_nonce, - Err(error) => { - error!(%error, "failed submitting bundle to sequencer; aborting executor"); - break Err(error).wrap_err("failed submitting bundle to sequencer"); - } - } - block_timer.as_mut().reset(reset_time()); + if let Err(err) = process_result_update_nonce(&mut nonce, rsp, &mut block_timer, reset_time) { + break Err(err).wrap_err("failed submitting bundle to sequencer"); + }; } Some(next_bundle) = future::ready(bundle_factory.next_finished()), if submission_fut.is_terminated() => { @@ -275,28 +265,15 @@ impl Executor { // receive new seq_action and bundle it. will not pull from the channel if `bundle_factory` is full Some(seq_action) = self.serialized_rollup_transactions.recv(), if !bundle_factory.is_full() => { - let rollup_id = seq_action.rollup_id; - - if let Err(e) = bundle_factory.try_push(seq_action) { - self.metrics.increment_txs_dropped_too_large(&rollup_id); - warn!( - rollup_id = %rollup_id, - error = &e as &StdError, - "failed to bundle transaction, dropping it." - ); - } + self.bundle_seq_action(seq_action, &mut bundle_factory); } // try to preempt current bundle if the timer has ticked without submitting the next bundle () = &mut block_timer, if submission_fut.is_terminated() => { let bundle = bundle_factory.pop_now(); if bundle.is_empty() { - debug!("block timer ticked, but no bundle to submit to sequencer"); block_timer.as_mut().reset(reset_time()); } else { - debug!( - "forcing bundle submission to sequencer due to block timer" - ); submission_fut = self.submit_bundle(nonce, bundle, self.metrics); } } @@ -310,35 +287,17 @@ impl Executor { // sequence actions self.serialized_rollup_transactions.close(); - match &reason { - Ok(reason) => { - info!(reason, "starting shutdown process"); - } - Err(reason) => { - error!(%reason, "executor exited with error"); - // we error out because of a failure to submit a bundle to the sequencer - // we do not want to proceed with the shutdown process in this case - return Err(eyre!(reason.to_string())); - } - }; + report_exit_reason(reason.as_deref()); + if let Err(err) = reason { + return Err(err).wrap_err("failed to submit bundle to sequencer, aborting"); + } let mut bundles_to_drain: VecDeque = VecDeque::new(); let mut bundles_drained: Option = Some(0); - info!("draining already received transactions"); - // drain the receiver channel while let Ok(seq_action) = self.serialized_rollup_transactions.try_recv() { - let rollup_id = seq_action.rollup_id; - - if let Err(e) = bundle_factory.try_push(seq_action) { - self.metrics.increment_txs_dropped_too_large(&rollup_id); - warn!( - rollup_id = %rollup_id, - error = &e as &StdError, - "failed to bundle transaction, dropping it." - ); - } + self.bundle_seq_action(seq_action, &mut bundle_factory); } // when shutting down, drain all the remaining bundles and submit to the sequencer @@ -352,105 +311,30 @@ impl Executor { bundles_to_drain.push_back(bundle); } - info!( - no_of_bundles_to_drain = bundles_to_drain.len(), - "submitting remaining transaction bundles to sequencer" + let shutdown_logic = self.run_shutdown_logic( + submission_fut, + nonce, + &mut bundles_to_drain, + &mut bundles_drained, ); - let shutdown_logic = async { - // wait for the last bundle to be submitted - if !submission_fut.is_terminated() { - info!( - "waiting for the last bundle of transactions to be submitted to the sequencer" - ); - match submission_fut.await { - Ok(new_nonce) => { - debug!( - new_nonce = new_nonce, - "successfully submitted bundle of transactions" - ); + bundle_drain_timeout_handler(shutdown_logic).await; - nonce = new_nonce; - } - Err(error) => { - error!(%error, "failed submitting bundle to sequencer during shutdown; \ - aborting shutdown"); - - return Err(error); - } - } - } - - while let Some(bundle) = bundles_to_drain.pop_front() { - match self - .submit_bundle(nonce, bundle.clone(), self.metrics) - .await - { - Ok(new_nonce) => { - debug!( - bundle = %telemetry::display::json(&SizedBundleReport(&bundle)), - new_nonce = new_nonce, - "successfully submitted transaction bundle" - ); - - nonce = new_nonce; - bundles_drained = bundles_drained.and_then(|value| value.checked_add(1)); - } - Err(error) => { - error!( - bundle = %telemetry::display::json(&SizedBundleReport(&bundle)), - %error, - "failed submitting bundle to sequencer during shutdown; \ - aborting shutdown" - ); - // if we can't submit a bundle after multiple retries, we can abort - // the shutdown process - - return Err(error); - } - } - } - - Ok(()) - }; - - match tokio::time::timeout(BUNDLE_DRAINING_DURATION, shutdown_logic).await { - Ok(Ok(())) => info!("executor shutdown tasks completed successfully"), - Ok(Err(error)) => error!(%error, "executor shutdown tasks failed"), - Err(error) => error!(%error, "executor shutdown tasks failed to complete in time"), - } - - let number_of_submitted_bundles = if let Some(value) = bundles_drained { - value.to_string() - } else { - format!("more than {}", u64::MAX) - }; - if bundles_to_drain.is_empty() { - info!( - %number_of_submitted_bundles, - "submitted all outstanding bundles to sequencer during shutdown" - ); - } else { - // log all the bundles that have not been drained - let report: Vec = - bundles_to_drain.iter().map(SizedBundleReport).collect(); - - warn!( - %number_of_submitted_bundles, - number_of_missing_bundles = report.len(), - missing_bundles = %telemetry::display::json(&report), - "unable to drain all bundles within the allocated time" - ); - } + report_if_bundles_drained(&mut bundles_to_drain, &mut bundles_drained); reason.map(|_| ()) } /// Performs initialization checks prior to running the executor #[instrument(skip_all, err)] - async fn pre_run_checks(&self) -> eyre::Result<()> { - self.ensure_chain_id_is_correct().await?; - Ok(()) + async fn init(&self) -> eyre::Result { + self.ensure_chain_id_is_correct() + .await + .wrap_err("failed to validate chain id")?; + let nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics) + .await + .wrap_err("failed getting initial nonce from sequencer")?; + Ok(nonce) } /// Performs check to ensure the configured chain ID matches the remote chain ID @@ -499,6 +383,80 @@ impl Executor { .await?; Ok(client_genesis.chain_id) } + + /// Creates shutdown logic for submitting remaining bundles to the sequencer. + #[instrument(skip_all, err)] + async fn run_shutdown_logic( + &self, + submission_fut: Fuse>, + mut nonce: u32, + bundles_to_drain: &mut VecDeque, + bundles_drained: &mut Option, + ) -> eyre::Result<()> { + info!( + no_of_bundles_to_drain = bundles_to_drain.len(), + "submitting remaining transaction bundles to sequencer" + ); + + // wait for the last bundle to be submitted + if !submission_fut.is_terminated() { + info!("waiting for the last bundle of transactions to be submitted to the sequencer"); + match submission_fut.await { + Ok(new_nonce) => { + debug!(new_nonce, "successfully submitted bundle of transactions"); + + nonce = new_nonce; + } + Err(error) => { + return Err(error.wrap_err( + "failed submitting bundle to sequencer during shutdown; aborting shutdown", + )); + } + } + } + while let Some(bundle) = bundles_to_drain.pop_front() { + match self + .submit_bundle(nonce, bundle.clone(), self.metrics) + .await + { + Ok(new_nonce) => { + debug!( + bundle = %telemetry::display::json(&SizedBundleReport(&bundle)), + new_nonce, + "successfully submitted transaction bundle" + ); + + nonce = new_nonce; + *bundles_drained = bundles_drained.and_then(|value| value.checked_add(1)); + } + Err(error) => { + // if we can't submit a bundle after multiple retries, we can abort + // the shutdown process + return Err(error.wrap_err(format!( + "failed submitting bundle to sequencer during shutdown; aborting \ + shutdown. bundle = {}", + telemetry::display::json(&SizedBundleReport(&bundle)) + ))); + } + } + } + Ok(()) + } + + /// Pushes sequence action into current bundle, warning and dropping on failure. + #[instrument(skip_all)] + fn bundle_seq_action(&self, seq_action: SequenceAction, bundle_factory: &mut BundleFactory) { + let rollup_id = seq_action.rollup_id; + + if let Err(e) = bundle_factory.try_push(seq_action) { + self.metrics.increment_txs_dropped_too_large(&rollup_id); + warn!( + rollup_id = %rollup_id, + error = &e as &StdError, + "failed to bundle transaction, dropping it." + ); + } + } } /// Queries the sequencer for the latest nonce with an exponential backoff @@ -607,6 +565,66 @@ async fn submit_tx( res } +/// Handles timeout of shutdown process +#[instrument(skip_all)] +async fn bundle_drain_timeout_handler(shutdown_logic: impl Future>) { + match tokio::time::timeout(BUNDLE_DRAINING_DURATION, shutdown_logic).await { + Ok(Ok(())) => info!("executor shutdown tasks completed successfully"), + Ok(Err(error)) => error!(%error, "executor shutdown tasks failed"), + Err(_) => error!("executor shutdown tasks failed to complete in time"), + } +} + +/// Processes the result of bundle submission and updates nonce +#[instrument(skip_all, err)] +fn process_result_update_nonce( + nonce: &mut u32, + rsp: eyre::Result, + block_timer: &mut Pin<&mut Sleep>, + reset_time: impl Fn() -> Instant, +) -> eyre::Result<()> { + block_timer.as_mut().reset(reset_time()); + match rsp { + Ok(new_nonce) => { + *nonce = new_nonce; + Ok(()) + } + Err(error) => { + Err(error).wrap_err("failed submitting bundle to sequencer; aborting executor") + } + } +} + +/// Checks if all bundles have been drained, warning if not +#[instrument(skip_all)] +fn report_if_bundles_drained( + bundles_to_drain: &mut VecDeque, + bundles_drained: &mut Option, +) { + let number_of_submitted_bundles = if let Some(value) = bundles_drained { + value.to_string() + } else { + format!("more than {}", u64::MAX) + }; + if bundles_to_drain.is_empty() { + info!( + %number_of_submitted_bundles, + "submitted all outstanding bundles to sequencer during shutdown" + ); + } else { + // log all the bundles that have not been drained + let report: Vec = + bundles_to_drain.iter().map(SizedBundleReport).collect(); + + warn!( + %number_of_submitted_bundles, + number_of_missing_bundles = report.len(), + missing_bundles = %telemetry::display::json(&report), + "unable to drain all bundles within the allocated time" + ); + } +} + pin_project! { /// A future to submit a bundle to the sequencer, returning the next nonce that should be used for the next submission. /// diff --git a/crates/astria-composer/src/lib.rs b/crates/astria-composer/src/lib.rs index f61f464965..4341513c1b 100644 --- a/crates/astria-composer/src/lib.rs +++ b/crates/astria-composer/src/lib.rs @@ -49,6 +49,7 @@ pub(crate) mod metrics; mod rollup; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod utils; pub use build_info::BUILD_INFO; pub use composer::Composer; diff --git a/crates/astria-composer/src/utils.rs b/crates/astria-composer/src/utils.rs new file mode 100644 index 0000000000..ba157749cf --- /dev/null +++ b/crates/astria-composer/src/utils.rs @@ -0,0 +1,16 @@ +use astria_eyre::eyre::Report; +use tracing::{ + error, + info, +}; + +pub(crate) fn report_exit_reason(reason: Result<&str, &Report>) { + match &reason { + Ok(reason) => { + info!(reason, "shutting down"); + } + Err(reason) => { + error!(%reason, "shutting down"); + } + } +}