Skip to content

Commit

Permalink
chore(bridge-withdrawer): Add instrumentation (#1324)
Browse files Browse the repository at this point in the history
## Summary
Added instrumentation to `bridge-withdrawer`

## Background
Adding instrumentation to all async calls will aid in tracing since
spans will be emitted even if no events happen under them.

## Changes
- Added instrumentation to all async function calls that are not
long-lived.
- Minor refactor of `BridgeWithdrawer::run()` to avoid logging outside
of spans.
- Removed extraneous error logging from functions with
`#[instrument(..., err)]`.

## Related Issues
Part of #1321

---------

Co-authored-by: Fraser Hutchison <[email protected]>
  • Loading branch information
2 people authored and jbowen93 committed Sep 6, 2024
1 parent 319c8e5 commit 1d5a11f
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 56 deletions.
4 changes: 4 additions & 0 deletions crates/astria-bridge-withdrawer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use http::status::StatusCode;
use hyper::server::conn::AddrIncoming;
use serde::Serialize;
use tokio::sync::watch;
use tracing::instrument;

use crate::bridge_withdrawer::StateSnapshot;

Expand Down Expand Up @@ -51,6 +52,7 @@ pub(crate) fn start(socket_addr: SocketAddr, withdrawer_state: WithdrawerState)
}

#[allow(clippy::unused_async)] // Permit because axum handlers must be async
#[instrument(skip_all)]
async fn get_healthz(State(withdrawer_state): State<WithdrawerState>) -> Healthz {
if withdrawer_state.borrow().is_healthy() {
Healthz::Ok
Expand All @@ -66,6 +68,7 @@ async fn get_healthz(State(withdrawer_state): State<WithdrawerState>) -> Healthz
/// + there is a current sequencer height (implying a block from sequencer was received)
/// + there is a current data availability height (implying a height was received from the DA)
#[allow(clippy::unused_async)] // Permit because axum handlers must be async
#[instrument(skip_all)]
async fn get_readyz(State(withdrawer_state): State<WithdrawerState>) -> Readyz {
let is_withdrawer_online = withdrawer_state.borrow().is_ready();
if is_withdrawer_online {
Expand All @@ -76,6 +79,7 @@ async fn get_readyz(State(withdrawer_state): State<WithdrawerState>) -> Readyz {
}

#[allow(clippy::unused_async)] // Permit because axum handlers must be async
#[instrument(skip_all)]
async fn get_status(State(withdrawer_state): State<WithdrawerState>) -> Json<StateSnapshot> {
Json(withdrawer_state.borrow().clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{
debug,
info,
info_span,
instrument,
warn,
};
Expand Down Expand Up @@ -296,11 +297,13 @@ async fn watch_for_blocks(
bail!("current rollup block missing block number")
};

info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);
info_span!("watch_for_blocks").in_scope(|| {
info!(
block.height = current_rollup_block_height.as_u64(),
block.hash = current_rollup_block.hash.map(tracing::field::display),
"got current block"
);
});

// sync any blocks missing between `next_rollup_block_height` and the current latest
// (inclusive).
Expand All @@ -317,7 +320,7 @@ async fn watch_for_blocks(
loop {
select! {
() = shutdown_token.cancelled() => {
info!("block watcher shutting down");
info_span!("watch_for_blocks").in_scope(|| info!("block watcher shutting down"));
return Ok(());
}
block = block_rx.next() => {
Expand Down
92 changes: 72 additions & 20 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,20 @@ use astria_eyre::eyre::{
self,
WrapErr as _,
};
use axum::{
routing::IntoMakeService,
Router,
Server,
};
use ethereum::watcher::Watcher;
use hyper::server::conn::AddrIncoming;
use startup::Startup;
use tokio::{
select,
sync::oneshot,
sync::oneshot::{
self,
Receiver,
},
task::{
JoinError,
JoinHandle,
Expand All @@ -24,6 +35,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{
error,
info,
instrument,
};

pub(crate) use self::state::StateSnapshot;
Expand Down Expand Up @@ -167,34 +179,29 @@ impl BridgeWithdrawer {
// Separate the API shutdown signal from the cancellation token because we want it to live
// until the very end.
let (api_shutdown_signal, api_shutdown_signal_rx) = oneshot::channel::<()>();
let mut api_task = tokio::spawn(async move {
api_server
.with_graceful_shutdown(async move {
let _ = api_shutdown_signal_rx.await;
})
.await
.wrap_err("api server ended unexpectedly")
});
info!("spawned API server");

let mut startup_task = Some(tokio::spawn(startup.run()));
info!("spawned startup task");

let mut submitter_task = tokio::spawn(submitter.run());
info!("spawned submitter task");
let mut ethereum_watcher_task = tokio::spawn(ethereum_watcher.run());
info!("spawned ethereum watcher task");
let TaskHandles {
mut api_task,
mut startup_task,
mut submitter_task,
mut ethereum_watcher_task,
} = spawn_tasks(
api_server,
api_shutdown_signal_rx,
startup,
submitter,
ethereum_watcher,
);

let shutdown = loop {
select!(
o = async { startup_task.as_mut().unwrap().await }, if startup_task.is_none() => {
match o {
Ok(_) => {
info!(task = "startup", "task has exited");
report_exit("startup", Ok(Ok(())));
startup_task = None;
},
Err(error) => {
error!(task = "startup", %error, "task returned with error");
report_exit("startup", Err(error));
break Shutdown {
api_task: Some(api_task),
submitter_task: Some(submitter_task),
Expand Down Expand Up @@ -245,6 +252,48 @@ impl BridgeWithdrawer {
}
}

#[allow(clippy::struct_field_names)] // allow: for parity with the `Shutdown` struct.
struct TaskHandles {
api_task: JoinHandle<eyre::Result<()>>,
startup_task: Option<JoinHandle<eyre::Result<()>>>,
submitter_task: JoinHandle<eyre::Result<()>>,
ethereum_watcher_task: JoinHandle<eyre::Result<()>>,
}

#[instrument(skip_all)]
fn spawn_tasks(
api_server: Server<AddrIncoming, IntoMakeService<Router>>,
api_shutdown_signal_rx: Receiver<()>,
startup: Startup,
submitter: Submitter,
ethereum_watcher: Watcher,
) -> TaskHandles {
let api_task = tokio::spawn(async move {
api_server
.with_graceful_shutdown(async move {
let _ = api_shutdown_signal_rx.await;
})
.await
.wrap_err("api server ended unexpectedly")
});
info!("spawned API server");

let startup_task = Some(tokio::spawn(startup.run()));
info!("spawned startup task");

let submitter_task = tokio::spawn(submitter.run());
info!("spawned submitter task");
let ethereum_watcher_task = tokio::spawn(ethereum_watcher.run());
info!("spawned ethereum watcher task");

TaskHandles {
api_task,
startup_task,
submitter_task,
ethereum_watcher_task,
}
}

/// A handle for instructing the [`Service`] to shut down.
///
/// It is returned along with its related `Service` from [`Service::new`]. The
Expand Down Expand Up @@ -275,6 +324,7 @@ impl ShutdownHandle {
}

impl Drop for ShutdownHandle {
#[instrument(skip_all)]
fn drop(&mut self) {
if !self.token.is_cancelled() {
info!("shutdown handle dropped, issuing shutdown to all services");
Expand All @@ -283,6 +333,7 @@ impl Drop for ShutdownHandle {
}
}

#[instrument(skip_all)]
fn report_exit(task_name: &str, outcome: Result<eyre::Result<()>, JoinError>) {
match outcome {
Ok(Ok(())) => info!(task = task_name, "task has exited"),
Expand Down Expand Up @@ -314,6 +365,7 @@ impl Shutdown {
const STARTUP_SHUTDOWN_TIMEOUT_SECONDS: u64 = 1;
const SUBMITTER_SHUTDOWN_TIMEOUT_SECONDS: u64 = 19;

#[instrument(skip_all)]
async fn run(self) {
let Self {
api_task,
Expand Down
17 changes: 12 additions & 5 deletions crates/astria-bridge-withdrawer/src/bridge_withdrawer/startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use tracing::{
instrument,
warn,
Instrument as _,
Level,
Span,
};
use tryhard::backoff_strategies::ExponentialBackoff;
Expand Down Expand Up @@ -120,6 +121,7 @@ impl InfoHandle {
}
}

#[instrument(skip_all, err)]
pub(super) async fn get_info(&mut self) -> eyre::Result<Info> {
let state = self
.rx
Expand Down Expand Up @@ -202,6 +204,7 @@ impl Startup {
/// - `self.chain_id` does not match the value returned from the sequencer node
/// - `self.fee_asset` is not a valid fee asset on the sequencer node
/// - `self.sequencer_bridge_address` does not have a sufficient balance of `self.fee_asset`.
#[instrument(skip_all, err)]
async fn confirm_sequencer_config(&self) -> eyre::Result<()> {
// confirm the sequencer chain id
let actual_chain_id =
Expand Down Expand Up @@ -250,6 +253,7 @@ impl Startup {
/// in the sequencer logic).
/// 5. Failing to convert the transaction data from bytes to proto.
/// 6. Failing to convert the transaction data from proto to `SignedTransaction`.
#[instrument(skip_all, err)]
async fn get_last_transaction(&self) -> eyre::Result<Option<SignedTransaction>> {
// get last transaction hash by the bridge account, if it exists
let last_transaction_hash_resp = get_bridge_account_last_transaction_hash(
Expand Down Expand Up @@ -323,6 +327,7 @@ impl Startup {
/// the sequencer logic)
/// 3. The last transaction by the bridge account did not contain a withdrawal action
/// 4. The memo of the last transaction by the bridge account could not be parsed
#[instrument(skip_all, err)]
async fn get_starting_rollup_height(&mut self) -> eyre::Result<u64> {
let signed_transaction = self
.get_last_transaction()
Expand All @@ -347,6 +352,7 @@ impl Startup {
}
}

#[instrument(skip_all, err(level = Level::WARN))]
async fn ensure_mempool_empty(
cometbft_client: sequencer_client::HttpClient,
sequencer_client: sequencer_service_client::SequencerServiceClient<Channel>,
Expand Down Expand Up @@ -391,6 +397,7 @@ async fn ensure_mempool_empty(
/// 2. Failing to get the latest nonce from cometBFT's mempool.
/// 3. The pending nonce from the Sequencer's app-side mempool does not match the latest nonce from
/// cometBFT's mempool after the exponential backoff times out.
#[instrument(skip_all, err)]
async fn wait_for_empty_mempool(
cometbft_client: sequencer_client::HttpClient,
sequencer_grpc_endpoint: String,
Expand Down Expand Up @@ -481,7 +488,7 @@ fn rollup_height_from_signed_transaction(
Ok(last_batch_rollup_height)
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_bridge_account_last_transaction_hash(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -503,7 +510,7 @@ async fn get_bridge_account_last_transaction_hash(
res
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_sequencer_transaction_at_hash(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -521,7 +528,7 @@ async fn get_sequencer_transaction_at_hash(
res
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_sequencer_chain_id(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -538,7 +545,7 @@ async fn get_sequencer_chain_id(
Ok(genesis.chain_id)
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_allowed_fee_assets(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand All @@ -555,7 +562,7 @@ async fn get_allowed_fee_assets(
res
}

#[instrument(skip_all)]
#[instrument(skip_all, err)]
async fn get_latest_nonce(
client: sequencer_client::HttpClient,
state: Arc<State>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use astria_eyre::eyre::{
};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{
info,
instrument,
};

use super::state::State;
use crate::{
Expand All @@ -30,6 +33,7 @@ impl Handle {
}
}

#[instrument(skip_all, err)]
pub(crate) async fn send_batch(&self, batch: Batch) -> eyre::Result<()> {
self.batches_tx
.send(batch)
Expand Down
Loading

0 comments on commit 1d5a11f

Please sign in to comment.