Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(conductor): Add instrumentation #1330

Merged
merged 5 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/astria-conductor/src/celestia/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl RawBlobs {
celestia_height,
sequencer_namespace = %base64(sequencer_namespace.as_ref()),
rollup_namespace = %base64(rollup_namespace.as_ref()),
err,
))]
pub(super) async fn fetch_new_blobs(
client: CelestiaClient,
Expand Down Expand Up @@ -85,6 +86,7 @@ pub(super) async fn fetch_new_blobs(
})
}

#[instrument(skip_all, err)]
async fn fetch_blobs_with_retry(
client: CelestiaClient,
height: u64,
Expand Down
87 changes: 55 additions & 32 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tracing::{
info_span,
instrument,
trace,
trace_span,
warn,
};

Expand Down Expand Up @@ -148,7 +149,9 @@ impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
() = self.shutdown.clone().cancelled_owned() => {
info!("received shutdown signal while waiting for Celestia reader task to initialize");
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
);
return Ok(());
}

Expand All @@ -163,6 +166,7 @@ impl Reader {
.await
}

#[instrument(skip_all, err)]
async fn initialize(
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
Expand Down Expand Up @@ -303,18 +307,19 @@ impl RunningReader {
})
}

#[instrument(skip(self))]
async fn run_until_stopped(mut self) -> eyre::Result<()> {
info!(
initial_celestia_height = self.celestia_next_height,
initial_max_celestia_height = self.max_permitted_celestia_height(),
celestia_variance = self.celestia_variance,
rollup_namespace = %base64(&self.rollup_namespace.as_bytes()),
rollup_id = %self.rollup_id,
sequencer_chain_id = %self.sequencer_chain_id,
sequencer_namespace = %base64(&self.sequencer_namespace.as_bytes()),
"starting firm block read loop",
);
info_span!("conductor::celestia::RunningReader::run_until_stopped").in_scope(|| {
info!(
initial_celestia_height = self.celestia_next_height,
initial_max_celestia_height = self.max_permitted_celestia_height(),
celestia_variance = self.celestia_variance,
rollup_namespace = %base64(&self.rollup_namespace.as_bytes()),
rollup_id = %self.rollup_id,
sequencer_chain_id = %self.sequencer_chain_id,
sequencer_namespace = %base64(&self.sequencer_namespace.as_bytes()),
"starting firm block read loop",
);
});

let reason = loop {
self.schedule_new_blobs();
Expand All @@ -329,7 +334,9 @@ impl RunningReader {
res = &mut self.enqueued_block, if self.waiting_for_executor_capacity() => {
match res {
Ok(celestia_height_of_forwarded_block) => {
trace!("submitted enqueued block to executor, resuming normal operation");
trace_span!("conductor::celestia::RunningReader::run_until_stopped")
.in_scope(||
trace!("submitted enqueued block to executor, resuming normal operation"));
self.advance_reference_celestia_height(celestia_height_of_forwarded_block);
}
Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"),
Expand All @@ -353,37 +360,34 @@ impl RunningReader {
}

Some(res) = self.latest_heights.next() => {
match res {
Ok(height) => {
info!(height, "observed latest height from Celestia");
self.record_latest_celestia_height(height);
}
Err(error) => {
warn!(
%error,
"failed fetching latest height from sequencer; waiting until next tick",
);
}
}
self.handle_latest_height(res);
}

);
};

// XXX: explicitly setting the event message (usually implicitly set by tracing)
let message = "shutting down";
match reason {
Ok(reason) => {
info!(reason, message);
Ok(())
report_exit(reason, message)
}

#[instrument(skip_all)]
fn handle_latest_height(&mut self, res: eyre::Result<u64>) {
match res {
Ok(height) => {
info!(height, "observed latest height from Celestia");
self.record_latest_celestia_height(height);
}
Err(reason) => {
error!(%reason, message);
Err(reason)
Err(error) => {
warn!(
%error,
"failed fetching latest height from sequencer; waiting until next tick",
);
}
}
}

#[instrument(skip_all)]
fn cache_reconstructed_blocks(&mut self, reconstructed: ReconstructedBlocks) {
for block in reconstructed.blocks {
let block_hash = block.block_hash;
Expand Down Expand Up @@ -413,6 +417,7 @@ impl RunningReader {
is_next_below_head && is_next_in_window && is_capacity_in_task_set
}

#[instrument(skip_all)]
fn schedule_new_blobs(&mut self) {
let mut scheduled = vec![];
while self.can_schedule_blobs() {
Expand Down Expand Up @@ -444,6 +449,7 @@ impl RunningReader {
*reference_height = max(*reference_height, candidate);
}

#[instrument(skip_all)]
fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
match self.executor.try_send_firm_block(block) {
Expand Down Expand Up @@ -507,6 +513,7 @@ impl FetchConvertVerifyAndReconstruct {
celestia_height = self.celestia_height,
rollup_namespace = %base64(self.rollup_namespace.as_bytes()),
sequencer_namespace = %base64(self.sequencer_namespace.as_bytes()),
err,
))]
async fn execute(self) -> eyre::Result<ReconstructedBlocks> {
let Self {
Expand Down Expand Up @@ -592,6 +599,7 @@ impl FetchConvertVerifyAndReconstruct {
}
}

#[instrument(skip_all, err)]
async fn enqueue_block(
executor: executor::Handle<StateIsInit>,
block: ReconstructedBlock,
Expand All @@ -601,6 +609,7 @@ async fn enqueue_block(
Ok(celestia_height)
}

#[instrument(skip_all, err)]
async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tendermint::chain::Id> {
use sequencer_client::Client as _;

Expand Down Expand Up @@ -633,3 +642,17 @@ async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tenderm
fn max_permitted_celestia_height(reference: u64, variance: u64) -> u64 {
reference.saturating_add(variance.saturating_mul(6))
}

#[instrument(skip_all)]
fn report_exit(exit_reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> {
match exit_reason {
Ok(reason) => {
info!(%reason, message);
Ok(())
}
Err(reason) => {
error!(%reason, message);
Err(reason)
}
}
}
6 changes: 6 additions & 0 deletions crates/astria-conductor/src/celestia/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tracing::{
instrument,
warn,
Instrument,
Level,
};
use tryhard::{
backoff_strategies::BackoffStrategy,
Expand Down Expand Up @@ -206,6 +207,7 @@ struct VerificationMeta {
}

impl VerificationMeta {
#[instrument(skip_all, err(level = Level::WARN))]
async fn fetch(
client: RateLimitedVerificationClient,
height: SequencerHeight,
Expand Down Expand Up @@ -298,6 +300,7 @@ impl BlobVerifier {
}
}

#[instrument(skip_all, err)]
async fn fetch_commit_with_retry(
client: SequencerClient,
height: SequencerHeight,
Expand Down Expand Up @@ -332,6 +335,7 @@ async fn fetch_commit_with_retry(
})
}

#[instrument(skip_all, err)]
async fn fetch_validators_with_retry(
client: SequencerClient,
prev_height: SequencerHeight,
Expand Down Expand Up @@ -447,6 +451,7 @@ struct RateLimitedVerificationClient {
}

impl RateLimitedVerificationClient {
#[instrument(skip_all, err)]
async fn get_commit(
mut self,
height: SequencerHeight,
Expand All @@ -468,6 +473,7 @@ impl RateLimitedVerificationClient {
}
}

#[instrument(skip_all, err)]
async fn get_validators(
mut self,
prev_height: SequencerHeight,
Expand Down
19 changes: 13 additions & 6 deletions crates/astria-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio_util::{
use tracing::{
error,
info,
info_span,
instrument,
warn,
};
Expand Down Expand Up @@ -52,6 +53,7 @@ impl Handle {
///
/// # Panics
/// Panics if called twice.
#[instrument(skip_all, err)]
pub async fn shutdown(&mut self) -> Result<(), tokio::task::JoinError> {
self.shutdown_token.cancel();
let task = self.task.take().expect("shutdown must not be called twice");
Expand Down Expand Up @@ -167,9 +169,8 @@ impl Conductor {
///
/// # Panics
/// Panics if it could not install a signal handler.
#[instrument(skip_all)]
async fn run_until_stopped(mut self) {
info!("conductor is running");
info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running"));

let exit_reason = select! {
biased;
Expand All @@ -187,10 +188,7 @@ impl Conductor {
};

let message = "initiating shutdown";
match exit_reason {
Ok(reason) => info!(reason, message),
Err(reason) => error!(%reason, message),
}
report_exit(exit_reason, message);
self.shutdown().await;
}

Expand All @@ -214,6 +212,7 @@ impl Conductor {
/// Waits 25 seconds for all tasks to shut down before aborting them. 25 seconds
/// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds
/// to abort the remaining tasks.
#[instrument(skip_all)]
async fn shutdown(mut self) {
self.shutdown.cancel();

Expand Down Expand Up @@ -245,3 +244,11 @@ impl Conductor {
info!("shutting down");
}
}

#[instrument(skip_all)]
fn report_exit(exit_reason: eyre::Result<&str>, message: &str) {
match exit_reason {
Ok(reason) => info!(%reason, message),
Err(reason) => error!(%reason, message),
}
}
3 changes: 3 additions & 0 deletions crates/astria-conductor/src/executor/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio::sync::{
Semaphore,
TryAcquireError,
};
use tracing::instrument;

/// Creates an mpsc channel for sending soft blocks between asynchronous task.
///
Expand Down Expand Up @@ -92,6 +93,7 @@ impl<T> Sender<T> {
/// Sends a block, waiting until the channel has permits.
///
/// Returns an error if the channel is closed.
#[instrument(skip_all, err)]
pub(super) async fn send(&self, block: T) -> Result<(), SendError> {
let sem = self.sem.upgrade().ok_or(SendError)?;
let permit = sem.acquire().await?;
Expand Down Expand Up @@ -151,6 +153,7 @@ impl<T> Receiver<T> {
}

/// Receives a block over the channel.
#[instrument(skip_all)]
pub(super) async fn recv(&mut self) -> Option<T> {
self.chan.recv().await
}
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/executor/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Client {
}

/// Calls remote procedure `astria.execution.v1alpha2.GetGenesisInfo`
#[instrument(skip_all, fields(uri = %self.uri))]
#[instrument(skip_all, fields(uri = %self.uri), err)]
pub(crate) async fn get_genesis_info_with_retry(&mut self) -> eyre::Result<GenesisInfo> {
let response = tryhard::retry_fn(|| {
let mut client = self.inner.clone();
Expand Down Expand Up @@ -182,7 +182,7 @@ impl Client {
///
/// * `firm` - The firm block
/// * `soft` - The soft block
#[instrument(skip_all, fields(uri = %self.uri))]
#[instrument(skip_all, fields(uri = %self.uri), err)]
pub(super) async fn update_commitment_state_with_retry(
&mut self,
commitment_state: CommitmentState,
Expand Down
Loading
Loading