Skip to content

Commit

Permalink
chore(conductor): Add instrumentation (#1330)
Browse files Browse the repository at this point in the history
## Summary
Added instrumentation to `astria-conductor`

## Background
Adding instrumentation to all async calls will aid in tracing since
spans will be emitted even if no events happen under them.

## Changes
- Added instrumentation to all async function calls that are not
long-lived.
- Added instrumentation on some non-async functions that utilize logging
such that logging occurs within a span.
- Removed instrumentation on `run_until_stopped()` functions.
- Minor refactoring of `run_until_stopped()` functions to ensure logging
occurs within spans.

## Related Issues
Part of #1321

---------

Co-authored-by: Fraser Hutchison <[email protected]>
  • Loading branch information
ethanoroshiba and Fraser999 authored Aug 30, 2024
1 parent 3e24c50 commit 293bc5c
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 86 deletions.
2 changes: 2 additions & 0 deletions crates/astria-conductor/src/celestia/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl RawBlobs {
celestia_height,
sequencer_namespace = %base64(sequencer_namespace.as_ref()),
rollup_namespace = %base64(rollup_namespace.as_ref()),
err,
))]
pub(super) async fn fetch_new_blobs(
client: CelestiaClient,
Expand Down Expand Up @@ -85,6 +86,7 @@ pub(super) async fn fetch_new_blobs(
})
}

#[instrument(skip_all, err)]
async fn fetch_blobs_with_retry(
client: CelestiaClient,
height: u64,
Expand Down
87 changes: 55 additions & 32 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tracing::{
info_span,
instrument,
trace,
trace_span,
warn,
};

Expand Down Expand Up @@ -148,7 +149,9 @@ impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
() = self.shutdown.clone().cancelled_owned() => {
info!("received shutdown signal while waiting for Celestia reader task to initialize");
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
);
return Ok(());
}

Expand All @@ -163,6 +166,7 @@ impl Reader {
.await
}

#[instrument(skip_all, err)]
async fn initialize(
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
Expand Down Expand Up @@ -303,18 +307,19 @@ impl RunningReader {
})
}

#[instrument(skip(self))]
async fn run_until_stopped(mut self) -> eyre::Result<()> {
info!(
initial_celestia_height = self.celestia_next_height,
initial_max_celestia_height = self.max_permitted_celestia_height(),
celestia_variance = self.celestia_variance,
rollup_namespace = %base64(&self.rollup_namespace.as_bytes()),
rollup_id = %self.rollup_id,
sequencer_chain_id = %self.sequencer_chain_id,
sequencer_namespace = %base64(&self.sequencer_namespace.as_bytes()),
"starting firm block read loop",
);
info_span!("conductor::celestia::RunningReader::run_until_stopped").in_scope(|| {
info!(
initial_celestia_height = self.celestia_next_height,
initial_max_celestia_height = self.max_permitted_celestia_height(),
celestia_variance = self.celestia_variance,
rollup_namespace = %base64(&self.rollup_namespace.as_bytes()),
rollup_id = %self.rollup_id,
sequencer_chain_id = %self.sequencer_chain_id,
sequencer_namespace = %base64(&self.sequencer_namespace.as_bytes()),
"starting firm block read loop",
);
});

let reason = loop {
self.schedule_new_blobs();
Expand All @@ -329,7 +334,9 @@ impl RunningReader {
res = &mut self.enqueued_block, if self.waiting_for_executor_capacity() => {
match res {
Ok(celestia_height_of_forwarded_block) => {
trace!("submitted enqueued block to executor, resuming normal operation");
trace_span!("conductor::celestia::RunningReader::run_until_stopped")
.in_scope(||
trace!("submitted enqueued block to executor, resuming normal operation"));
self.advance_reference_celestia_height(celestia_height_of_forwarded_block);
}
Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"),
Expand All @@ -353,37 +360,34 @@ impl RunningReader {
}

Some(res) = self.latest_heights.next() => {
match res {
Ok(height) => {
info!(height, "observed latest height from Celestia");
self.record_latest_celestia_height(height);
}
Err(error) => {
warn!(
%error,
"failed fetching latest height from sequencer; waiting until next tick",
);
}
}
self.handle_latest_height(res);
}

);
};

// XXX: explicitly setting the event message (usually implicitly set by tracing)
let message = "shutting down";
match reason {
Ok(reason) => {
info!(reason, message);
Ok(())
report_exit(reason, message)
}

#[instrument(skip_all)]
fn handle_latest_height(&mut self, res: eyre::Result<u64>) {
match res {
Ok(height) => {
info!(height, "observed latest height from Celestia");
self.record_latest_celestia_height(height);
}
Err(reason) => {
error!(%reason, message);
Err(reason)
Err(error) => {
warn!(
%error,
"failed fetching latest height from sequencer; waiting until next tick",
);
}
}
}

#[instrument(skip_all)]
fn cache_reconstructed_blocks(&mut self, reconstructed: ReconstructedBlocks) {
for block in reconstructed.blocks {
let block_hash = block.block_hash;
Expand Down Expand Up @@ -413,6 +417,7 @@ impl RunningReader {
is_next_below_head && is_next_in_window && is_capacity_in_task_set
}

#[instrument(skip_all)]
fn schedule_new_blobs(&mut self) {
let mut scheduled = vec![];
while self.can_schedule_blobs() {
Expand Down Expand Up @@ -444,6 +449,7 @@ impl RunningReader {
*reference_height = max(*reference_height, candidate);
}

#[instrument(skip_all)]
fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
match self.executor.try_send_firm_block(block) {
Expand Down Expand Up @@ -507,6 +513,7 @@ impl FetchConvertVerifyAndReconstruct {
celestia_height = self.celestia_height,
rollup_namespace = %base64(self.rollup_namespace.as_bytes()),
sequencer_namespace = %base64(self.sequencer_namespace.as_bytes()),
err,
))]
async fn execute(self) -> eyre::Result<ReconstructedBlocks> {
let Self {
Expand Down Expand Up @@ -592,6 +599,7 @@ impl FetchConvertVerifyAndReconstruct {
}
}

#[instrument(skip_all, err)]
async fn enqueue_block(
executor: executor::Handle<StateIsInit>,
block: ReconstructedBlock,
Expand All @@ -601,6 +609,7 @@ async fn enqueue_block(
Ok(celestia_height)
}

#[instrument(skip_all, err)]
async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tendermint::chain::Id> {
use sequencer_client::Client as _;

Expand Down Expand Up @@ -633,3 +642,17 @@ async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tenderm
fn max_permitted_celestia_height(reference: u64, variance: u64) -> u64 {
reference.saturating_add(variance.saturating_mul(6))
}

#[instrument(skip_all)]
fn report_exit(exit_reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> {
match exit_reason {
Ok(reason) => {
info!(%reason, message);
Ok(())
}
Err(reason) => {
error!(%reason, message);
Err(reason)
}
}
}
6 changes: 6 additions & 0 deletions crates/astria-conductor/src/celestia/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tracing::{
instrument,
warn,
Instrument,
Level,
};
use tryhard::{
backoff_strategies::BackoffStrategy,
Expand Down Expand Up @@ -206,6 +207,7 @@ struct VerificationMeta {
}

impl VerificationMeta {
#[instrument(skip_all, err(level = Level::WARN))]
async fn fetch(
client: RateLimitedVerificationClient,
height: SequencerHeight,
Expand Down Expand Up @@ -298,6 +300,7 @@ impl BlobVerifier {
}
}

#[instrument(skip_all, err)]
async fn fetch_commit_with_retry(
client: SequencerClient,
height: SequencerHeight,
Expand Down Expand Up @@ -332,6 +335,7 @@ async fn fetch_commit_with_retry(
})
}

#[instrument(skip_all, err)]
async fn fetch_validators_with_retry(
client: SequencerClient,
prev_height: SequencerHeight,
Expand Down Expand Up @@ -447,6 +451,7 @@ struct RateLimitedVerificationClient {
}

impl RateLimitedVerificationClient {
#[instrument(skip_all, err)]
async fn get_commit(
mut self,
height: SequencerHeight,
Expand All @@ -468,6 +473,7 @@ impl RateLimitedVerificationClient {
}
}

#[instrument(skip_all, err)]
async fn get_validators(
mut self,
prev_height: SequencerHeight,
Expand Down
19 changes: 13 additions & 6 deletions crates/astria-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio_util::{
use tracing::{
error,
info,
info_span,
instrument,
warn,
};
Expand Down Expand Up @@ -52,6 +53,7 @@ impl Handle {
///
/// # Panics
/// Panics if called twice.
#[instrument(skip_all, err)]
pub async fn shutdown(&mut self) -> Result<(), tokio::task::JoinError> {
self.shutdown_token.cancel();
let task = self.task.take().expect("shutdown must not be called twice");
Expand Down Expand Up @@ -173,9 +175,8 @@ impl Conductor {
///
/// # Panics
/// Panics if it could not install a signal handler.
#[instrument(skip_all)]
async fn run_until_stopped(mut self) {
info!("conductor is running");
info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running"));

let exit_reason = select! {
biased;
Expand All @@ -193,10 +194,7 @@ impl Conductor {
};

let message = "initiating shutdown";
match exit_reason {
Ok(reason) => info!(reason, message),
Err(reason) => error!(%reason, message),
}
report_exit(exit_reason, message);
self.shutdown().await;
}

Expand All @@ -220,6 +218,7 @@ impl Conductor {
/// Waits 25 seconds for all tasks to shut down before aborting them. 25 seconds
/// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds
/// to abort the remaining tasks.
#[instrument(skip_all)]
async fn shutdown(mut self) {
self.shutdown.cancel();

Expand Down Expand Up @@ -251,3 +250,11 @@ impl Conductor {
info!("shutting down");
}
}

#[instrument(skip_all)]
fn report_exit(exit_reason: eyre::Result<&str>, message: &str) {
match exit_reason {
Ok(reason) => info!(%reason, message),
Err(reason) => error!(%reason, message),
}
}
3 changes: 3 additions & 0 deletions crates/astria-conductor/src/executor/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio::sync::{
Semaphore,
TryAcquireError,
};
use tracing::instrument;

/// Creates an mpsc channel for sending soft blocks between asynchronous task.
///
Expand Down Expand Up @@ -92,6 +93,7 @@ impl<T> Sender<T> {
/// Sends a block, waiting until the channel has permits.
///
/// Returns an error if the channel is closed.
#[instrument(skip_all, err)]
pub(super) async fn send(&self, block: T) -> Result<(), SendError> {
let sem = self.sem.upgrade().ok_or(SendError)?;
let permit = sem.acquire().await?;
Expand Down Expand Up @@ -151,6 +153,7 @@ impl<T> Receiver<T> {
}

/// Receives a block over the channel.
#[instrument(skip_all)]
pub(super) async fn recv(&mut self) -> Option<T> {
self.chan.recv().await
}
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/executor/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Client {
}

/// Calls remote procedure `astria.execution.v1alpha2.GetGenesisInfo`
#[instrument(skip_all, fields(uri = %self.uri))]
#[instrument(skip_all, fields(uri = %self.uri), err)]
pub(crate) async fn get_genesis_info_with_retry(&mut self) -> eyre::Result<GenesisInfo> {
let response = tryhard::retry_fn(|| {
let mut client = self.inner.clone();
Expand Down Expand Up @@ -182,7 +182,7 @@ impl Client {
///
/// * `firm` - The firm block
/// * `soft` - The soft block
#[instrument(skip_all, fields(uri = %self.uri))]
#[instrument(skip_all, fields(uri = %self.uri), err)]
pub(super) async fn update_commitment_state_with_retry(
&mut self,
commitment_state: CommitmentState,
Expand Down
Loading

0 comments on commit 293bc5c

Please sign in to comment.