Skip to content

Commit

Permalink
chore(composer): Add instrumentation (#1326)
Browse files Browse the repository at this point in the history
## Summary
Added instrumentation to `astria-composer`

## Background
Adding instrumentation to all async calls will aid in tracing since
spans will be emitted even if no events happen under them.

## Changes
- Added instrumentation to all async function calls that are not
long-lived.
- Removed instrumentation from long-running functions.

## Related Issues
Part of #1321

---------

Co-authored-by: Fraser Hutchison <[email protected]>
Co-authored-by: Richard Janis Goldschmidt <[email protected]>
  • Loading branch information
3 people authored Aug 28, 2024
1 parent 3a3ace7 commit b311636
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 248 deletions.
6 changes: 5 additions & 1 deletion crates/astria-composer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use axum::{
use hyper::server::conn::AddrIncoming;
use serde::Serialize;
use tokio::sync::watch;
use tracing::debug;
use tracing::{
debug,
instrument,
};

use crate::composer;

Expand Down Expand Up @@ -74,6 +77,7 @@ impl IntoResponse for Readyz {
// axum does not allow non-async handlers. This attribute can be removed
// once this method contains `await` statements.
#[allow(clippy::unused_async)]
#[instrument(skip_all)]
async fn readyz(State(composer_status): State<ComposerStatus>) -> Readyz {
debug!("received readyz request");
if composer_status.borrow().is_ready() {
Expand Down
238 changes: 138 additions & 100 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ use astria_eyre::eyre::{
Report,
WrapErr as _,
};
use ethers::providers::{
Provider,
ProviderError,
Ws,
use ethers::{
providers::{
Provider,
ProviderError,
SubscriptionStream,
Ws,
},
types::Transaction,
};
use metrics::Counter;
use tokio::{
Expand All @@ -43,7 +47,6 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
error,
info,
instrument,
Expand All @@ -52,8 +55,12 @@ use tracing::{

use crate::{
collectors::EXECUTOR_SEND_TIMEOUT,
executor,
executor::{
self,
Handle,
},
metrics::Metrics,
utils::report_exit_reason,
};

type StdError = dyn std::error::Error;
Expand Down Expand Up @@ -148,10 +155,7 @@ impl Geth {

/// Starts the collector instance and runs until failure or until
/// explicitly closed
#[instrument(skip_all, fields(chain_name = self.chain_name, rollup_id = %self.rollup_id))]
pub(crate) async fn run_until_stopped(self) -> eyre::Result<()> {
use std::time::Duration;

use ethers::providers::Middleware as _;
use futures::stream::StreamExt as _;

Expand All @@ -166,55 +170,12 @@ impl Geth {
fee_asset,
} = self;

let txs_received_counter = metrics
.geth_txs_received(&chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_received counter"
);
Counter::noop()
});
let txs_dropped_counter = metrics
.geth_txs_dropped(&chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_dropped counter"
);
Counter::noop()
});

let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt, next_delay: Option<Duration>, error: &ProviderError| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &StdError,
"attempt to connect to geth node failed; retrying after backoff",
);
futures::future::ready(())
},
);
let txs_received_counter = txs_received_counter(metrics, &chain_name);
let txs_dropped_counter = txs_dropped_counter(metrics, &chain_name);

let client = tryhard::retry_fn(|| {
let url = url.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
}
})
.with_config(retry_config)
.await
.wrap_err("failed connecting to geth after several retries; giving up")?;
let client = connect_to_geth_node(url)
.await
.wrap_err("failed to connect to geth node")?;

let mut tx_stream = client
.subscribe_full_pending_txs()
Expand All @@ -232,7 +193,6 @@ impl Geth {
tx_res = tx_stream.next() => {
if let Some(tx) = tx_res {
let tx_hash = tx.hash;
debug!(transaction.hash = %tx_hash, "collected transaction from rollup");
let data = tx.rlp().to_vec();
let seq_action = SequenceAction {
rollup_id,
Expand All @@ -242,64 +202,142 @@ impl Geth {

txs_received_counter.increment(1);

match executor_handle
.send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT)
.await
{
Ok(()) => {},
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(),
"timed out sending new transaction to executor; dropping tx",
);
txs_dropped_counter.increment(1);
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
txs_dropped_counter.increment(1);
break Err(eyre!("executor channel closed while sending transaction"));
}
if let Err(err) = forward_geth_tx(
&executor_handle,
seq_action,
tx_hash,
&txs_dropped_counter,
).await {
break Err(err);
}

} else {
break Err(eyre!("geth tx stream ended"));
}
}
}
};

match &reason {
Ok(reason) => {
info!(reason, "shutting down");
}
Err(reason) => {
error!(%reason, "shutting down");
}
};
report_exit_reason(reason.as_deref());

status.send_modify(|status| status.is_connected = false);

// if the loop exits with an error, we can still proceed with unsubscribing the WSS
// stream as we could have exited due to an error in sending messages via the executor
// channel.
unsubscribe_from_rollup(&tx_stream).await;

// give 2s for the websocket connection to be unsubscribed as we want to avoid having
// this hang for too long
match tokio::time::timeout(WSS_UNSUBSCRIBE_TIMEOUT, tx_stream.unsubscribe()).await {
Ok(Ok(true)) => info!("unsubscribed from geth tx stream"),
Ok(Ok(false)) => warn!("failed to unsubscribe from geth tx stream"),
Ok(Err(err)) => {
error!(error = %Report::new(err), "failed unsubscribing from the geth tx stream");
}
Err(err) => {
error!(error = %Report::new(err), "timed out while unsubscribing from the geth tx stream");
}
reason.map(|_| ())
}
}

#[instrument(skip_all)]
async fn forward_geth_tx(
executor_handle: &Handle,
seq_action: SequenceAction,
tx_hash: ethers::types::H256,
txs_dropped_counter: &Counter,
) -> eyre::Result<()> {
match executor_handle
.send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT)
.await
{
Ok(()) => Ok(()),
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(),
"timed out sending new transaction to executor; dropping tx",
);
txs_dropped_counter.increment(1);
Ok(())
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
txs_dropped_counter.increment(1);
Err(eyre!("executor channel closed while sending transaction"))
}
}
}

reason.map(|_| ())
#[instrument(skip_all)]
async fn unsubscribe_from_rollup(tx_stream: &SubscriptionStream<'_, Ws, Transaction>) {
// give 2s for the websocket connection to be unsubscribed as we want to avoid having
// this hang for too long
match tokio::time::timeout(WSS_UNSUBSCRIBE_TIMEOUT, tx_stream.unsubscribe()).await {
Ok(Ok(true)) => info!("unsubscribed from geth tx stream"),
Ok(Ok(false)) => warn!("geth responded to unsubscribe request but returned `false`"),
Ok(Err(err)) => {
error!(error = %Report::new(err), "failed unsubscribing from the geth tx stream");
}
Err(_) => {
error!("timed out while unsubscribing from the geth tx stream");
}
}
}

#[instrument(skip_all)]
fn txs_received_counter(metrics: &'static Metrics, chain_name: &String) -> Counter {
metrics
.geth_txs_received(chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_received counter"
);
Counter::noop()
})
}

#[instrument(skip_all)]
fn txs_dropped_counter(metrics: &'static Metrics, chain_name: &String) -> Counter {
metrics
.geth_txs_dropped(chain_name)
.cloned()
.unwrap_or_else(|| {
error!(
rollup_chain_name = %chain_name,
"failed to get geth transactions_dropped counter"
);
Counter::noop()
})
}

#[instrument(skip_all, err)]
async fn connect_to_geth_node(url: String) -> eyre::Result<Provider<Ws>> {
let retry_config = tryhard::RetryFutureConfig::new(1024)
.exponential_backoff(Duration::from_millis(500))
.max_delay(Duration::from_secs(60))
.on_retry(
|attempt, next_delay: Option<Duration>, error: &ProviderError| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &StdError,
"attempt to connect to geth node failed; retrying after backoff",
);
futures::future::ready(())
},
);

let client = tryhard::retry_fn(|| {
let url = url.clone();
async move {
let websocket_client = Ws::connect_with_reconnects(url, 0).await?;
Ok(Provider::new(websocket_client))
}
})
.with_config(retry_config)
.await
.wrap_err("failed connecting to geth after several retries; giving up")?;

Ok(client)
}
4 changes: 4 additions & 0 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio_util::{
use tracing::{
error,
info,
instrument,
warn,
};

Expand Down Expand Up @@ -118,6 +119,7 @@ impl Composer {
///
/// An error is returned if the composer fails to be initialized.
/// See `[from_config]` for its error scenarios.
#[instrument(skip_all, err)]
pub async fn from_config(cfg: &Config) -> eyre::Result<Self> {
static METRICS: OnceLock<Metrics> = OnceLock::new();

Expand Down Expand Up @@ -464,6 +466,7 @@ fn spawn_geth_collectors(
}
}

#[instrument(skip_all, err)]
async fn wait_for_executor(
mut executor_status: watch::Receiver<executor::Status>,
composer_status_sender: &mut watch::Sender<composer::Status>,
Expand All @@ -481,6 +484,7 @@ async fn wait_for_executor(
}

/// Waits for all collectors to come online.
#[instrument(skip_all, err)]
async fn wait_for_collectors(
collector_statuses: &HashMap<String, watch::Receiver<collectors::geth::Status>>,
composer_status_sender: &mut watch::Sender<composer::Status>,
Expand Down
Loading

0 comments on commit b311636

Please sign in to comment.