Skip to content

Commit

Permalink
refactor(composer): streamline geth and executor run_until_stopped (#…
Browse files Browse the repository at this point in the history
…1361)

## Summary
Shortened and streamlined geth and executor `run_until_stopped()`
functions to get rid of clippy exceptions and ensure events are emitted
within spans.

## Background
#1326 removed instrumentation on these `run_until_stopped()` functions,
which revealed a clippy warning for too many lines. Additionally, this
made it so that logging inside these functions would not be emitted
within any span.

## Changes
- Delegated many tasks to helper functions with instrumentation.
- Created new `utils` module to house a shared `report_exit_reason()`
function.
- Moved `ensure_chain_id_is_correct()` and `get_latest_nonce()` to
`init()` (previosuly `pre_run_checks()`)

## Testing
Passing all tests.


## Related Issues
Part of #1321

---------

Co-authored-by: Fraser Hutchison <[email protected]>
Co-authored-by: Richard Janis Goldschmidt <[email protected]>
  • Loading branch information
3 people authored Aug 28, 2024
1 parent acfa1b7 commit c876d2b
Show file tree
Hide file tree
Showing 4 changed files with 321 additions and 247 deletions.
239 changes: 139 additions & 100 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ use astria_eyre::eyre::{
Report,
WrapErr as _,
};
use ethers::providers::{
Provider,
ProviderError,
Ws,
use ethers::{
providers::{
Provider,
ProviderError,
SubscriptionStream,
Ws,
},
types::Transaction,
};
use metrics::Counter;
use tokio::{
Expand All @@ -43,16 +47,20 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
error,
info,
instrument,
warn,
};

use crate::{
collectors::EXECUTOR_SEND_TIMEOUT,
executor,
executor::{
self,
Handle,
},
metrics::Metrics,
utils::report_exit_reason,
};

type StdError = dyn std::error::Error;
Expand Down Expand Up @@ -147,9 +155,6 @@ impl Geth {

/// Starts the collector instance and runs until failure or until
/// explicitly closed
// Allow: Refactored in https://github.com/astriaorg/astria/pull/1361. Current logging events will show as errors in otel,
// refactor addresses this as well.
#[allow(clippy::too_many_lines)]
pub(crate) async fn run_until_stopped(self) -> eyre::Result<()> {
use ethers::providers::Middleware as _;
use futures::stream::StreamExt as _;
Expand All @@ -165,55 +170,12 @@ impl Geth {
fee_asset,
} = self;

let txs_received_counter = metrics
.geth_txs_received(&chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_received counter"
);
Counter::noop()
});
let txs_dropped_counter = metrics
.geth_txs_dropped(&chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_dropped counter"
);
Counter::noop()
});

let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt, next_delay: Option<Duration>, error: &ProviderError| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &StdError,
"attempt to connect to geth node failed; retrying after backoff",
);
futures::future::ready(())
},
);
let txs_received_counter = txs_received_counter(metrics, &chain_name);
let txs_dropped_counter = txs_dropped_counter(metrics, &chain_name);

let client = tryhard::retry_fn(|| {
let url = url.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
}
})
.with_config(retry_config)
.await
.wrap_err("failed connecting to geth after several retries; giving up")?;
let client = connect_to_geth_node(url)
.await
.wrap_err("failed to connect to geth node")?;

let mut tx_stream = client
.subscribe_full_pending_txs()
Expand All @@ -231,7 +193,6 @@ impl Geth {
tx_res = tx_stream.next() => {
if let Some(tx) = tx_res {
let tx_hash = tx.hash;
debug!(transaction.hash = %tx_hash, "collected transaction from rollup");
let data = tx.rlp().to_vec();
let seq_action = SequenceAction {
rollup_id,
Expand All @@ -241,64 +202,142 @@ impl Geth {

txs_received_counter.increment(1);

match executor_handle
.send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT)
.await
{
Ok(()) => {},
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(),
"timed out sending new transaction to executor; dropping tx",
);
txs_dropped_counter.increment(1);
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
txs_dropped_counter.increment(1);
break Err(eyre!("executor channel closed while sending transaction"));
}
if let Err(err) = forward_geth_tx(
&executor_handle,
seq_action,
tx_hash,
&txs_dropped_counter,
).await {
break Err(err);
}

} else {
break Err(eyre!("geth tx stream ended"));
}
}
}
};

match &reason {
Ok(reason) => {
info!(reason, "shutting down");
}
Err(reason) => {
error!(%reason, "shutting down");
}
};
report_exit_reason(reason.as_deref());

status.send_modify(|status| status.is_connected = false);

// if the loop exits with an error, we can still proceed with unsubscribing the WSS
// stream as we could have exited due to an error in sending messages via the executor
// channel.
unsubscribe_from_rollup(&tx_stream).await;

// give 2s for the websocket connection to be unsubscribed as we want to avoid having
// this hang for too long
match tokio::time::timeout(WSS_UNSUBSCRIBE_TIMEOUT, tx_stream.unsubscribe()).await {
Ok(Ok(true)) => info!("unsubscribed from geth tx stream"),
Ok(Ok(false)) => warn!("failed to unsubscribe from geth tx stream"),
Ok(Err(err)) => {
error!(error = %Report::new(err), "failed unsubscribing from the geth tx stream");
}
Err(err) => {
error!(error = %Report::new(err), "timed out while unsubscribing from the geth tx stream");
}
reason.map(|_| ())
}
}

#[instrument(skip_all)]
async fn forward_geth_tx(
executor_handle: &Handle,
seq_action: SequenceAction,
tx_hash: ethers::types::H256,
txs_dropped_counter: &Counter,
) -> eyre::Result<()> {
match executor_handle
.send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT)
.await
{
Ok(()) => Ok(()),
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(),
"timed out sending new transaction to executor; dropping tx",
);
txs_dropped_counter.increment(1);
Ok(())
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
txs_dropped_counter.increment(1);
Err(eyre!("executor channel closed while sending transaction"))
}
}
}

reason.map(|_| ())
#[instrument(skip_all)]
async fn unsubscribe_from_rollup(tx_stream: &SubscriptionStream<'_, Ws, Transaction>) {
// give 2s for the websocket connection to be unsubscribed as we want to avoid having
// this hang for too long
match tokio::time::timeout(WSS_UNSUBSCRIBE_TIMEOUT, tx_stream.unsubscribe()).await {
Ok(Ok(true)) => info!("unsubscribed from geth tx stream"),
Ok(Ok(false)) => warn!("geth responded to unsubscribe request but returned `false`"),
Ok(Err(err)) => {
error!(error = %Report::new(err), "failed unsubscribing from the geth tx stream");
}
Err(_) => {
error!("timed out while unsubscribing from the geth tx stream");
}
}
}

#[instrument(skip_all)]
fn txs_received_counter(metrics: &'static Metrics, chain_name: &String) -> Counter {
metrics
.geth_txs_received(chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_received counter"
);
Counter::noop()
})
}

#[instrument(skip_all)]
fn txs_dropped_counter(metrics: &'static Metrics, chain_name: &String) -> Counter {
metrics
.geth_txs_dropped(chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_dropped counter"
);
Counter::noop()
})
}

#[instrument(skip_all, err)]
async fn connect_to_geth_node(url: String) -> eyre::Result<Provider<Ws>> {
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt, next_delay: Option<Duration>, error: &ProviderError| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &StdError,
"attempt to connect to geth node failed; retrying after backoff",
);
futures::future::ready(())
},
);

let client = tryhard::retry_fn(|| {
let url = url.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
}
})
.with_config(retry_config)
.await
.wrap_err("failed connecting to geth after several retries; giving up")?;

Ok(client)
}
Loading

0 comments on commit c876d2b

Please sign in to comment.