diff --git a/crates/astria-conductor/src/celestia/fetch.rs b/crates/astria-conductor/src/celestia/fetch.rs index 4787cf1e43..3ba28e4513 100644 --- a/crates/astria-conductor/src/celestia/fetch.rs +++ b/crates/astria-conductor/src/celestia/fetch.rs @@ -52,6 +52,7 @@ impl RawBlobs { celestia_height, sequencer_namespace = %base64(sequencer_namespace.as_ref()), rollup_namespace = %base64(rollup_namespace.as_ref()), + err, ))] pub(super) async fn fetch_new_blobs( client: CelestiaClient, @@ -85,6 +86,7 @@ pub(super) async fn fetch_new_blobs( }) } +#[instrument(skip_all, err)] async fn fetch_blobs_with_retry( client: CelestiaClient, height: u64, diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 7c209802e6..5de1d5b093 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -51,6 +51,7 @@ use tracing::{ info_span, instrument, trace, + trace_span, warn, }; @@ -148,7 +149,9 @@ impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { let (executor, sequencer_chain_id) = select!( () = self.shutdown.clone().cancelled_owned() => { - info!("received shutdown signal while waiting for Celestia reader task to initialize"); + info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(|| + info!("received shutdown signal while waiting for Celestia reader task to initialize") + ); return Ok(()); } @@ -163,6 +166,7 @@ impl Reader { .await } + #[instrument(skip_all, err)] async fn initialize( &mut self, ) -> eyre::Result<(executor::Handle, tendermint::chain::Id)> { @@ -303,18 +307,19 @@ impl RunningReader { }) } - #[instrument(skip(self))] async fn run_until_stopped(mut self) -> eyre::Result<()> { - info!( - initial_celestia_height = self.celestia_next_height, - initial_max_celestia_height = self.max_permitted_celestia_height(), - celestia_variance = self.celestia_variance, - rollup_namespace = %base64(&self.rollup_namespace.as_bytes()), - rollup_id = %self.rollup_id, - sequencer_chain_id = %self.sequencer_chain_id, - sequencer_namespace = %base64(&self.sequencer_namespace.as_bytes()), - "starting firm block read loop", - ); + info_span!("conductor::celestia::RunningReader::run_until_stopped").in_scope(|| { + info!( + initial_celestia_height = self.celestia_next_height, + initial_max_celestia_height = self.max_permitted_celestia_height(), + celestia_variance = self.celestia_variance, + rollup_namespace = %base64(&self.rollup_namespace.as_bytes()), + rollup_id = %self.rollup_id, + sequencer_chain_id = %self.sequencer_chain_id, + sequencer_namespace = %base64(&self.sequencer_namespace.as_bytes()), + "starting firm block read loop", + ); + }); let reason = loop { self.schedule_new_blobs(); @@ -329,7 +334,9 @@ impl RunningReader { res = &mut self.enqueued_block, if self.waiting_for_executor_capacity() => { match res { Ok(celestia_height_of_forwarded_block) => { - trace!("submitted enqueued block to executor, resuming normal operation"); + trace_span!("conductor::celestia::RunningReader::run_until_stopped") + .in_scope(|| + trace!("submitted enqueued block to executor, resuming normal operation")); self.advance_reference_celestia_height(celestia_height_of_forwarded_block); } Err(err) => break Err(err).wrap_err("failed sending enqueued block to executor"), @@ -353,18 +360,7 @@ impl RunningReader { } Some(res) = self.latest_heights.next() => { - match res { - Ok(height) => { - info!(height, "observed latest height from Celestia"); - self.record_latest_celestia_height(height); - } - Err(error) => { - warn!( - %error, - "failed fetching latest height from sequencer; waiting until next tick", - ); - } - } + self.handle_latest_height(res); } ); @@ -372,18 +368,26 @@ impl RunningReader { // XXX: explicitly setting the event message (usually implicitly set by tracing) let message = "shutting down"; - match reason { - Ok(reason) => { - info!(reason, message); - Ok(()) + report_exit(reason, message) + } + + #[instrument(skip_all)] + fn handle_latest_height(&mut self, res: eyre::Result) { + match res { + Ok(height) => { + info!(height, "observed latest height from Celestia"); + self.record_latest_celestia_height(height); } - Err(reason) => { - error!(%reason, message); - Err(reason) + Err(error) => { + warn!( + %error, + "failed fetching latest height from sequencer; waiting until next tick", + ); } } } + #[instrument(skip_all)] fn cache_reconstructed_blocks(&mut self, reconstructed: ReconstructedBlocks) { for block in reconstructed.blocks { let block_hash = block.block_hash; @@ -413,6 +417,7 @@ impl RunningReader { is_next_below_head && is_next_in_window && is_capacity_in_task_set } + #[instrument(skip_all)] fn schedule_new_blobs(&mut self) { let mut scheduled = vec![]; while self.can_schedule_blobs() { @@ -444,6 +449,7 @@ impl RunningReader { *reference_height = max(*reference_height, candidate); } + #[instrument(skip_all)] fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let celestia_height = block.celestia_height; match self.executor.try_send_firm_block(block) { @@ -507,6 +513,7 @@ impl FetchConvertVerifyAndReconstruct { celestia_height = self.celestia_height, rollup_namespace = %base64(self.rollup_namespace.as_bytes()), sequencer_namespace = %base64(self.sequencer_namespace.as_bytes()), + err, ))] async fn execute(self) -> eyre::Result { let Self { @@ -592,6 +599,7 @@ impl FetchConvertVerifyAndReconstruct { } } +#[instrument(skip_all, err)] async fn enqueue_block( executor: executor::Handle, block: ReconstructedBlock, @@ -601,6 +609,7 @@ async fn enqueue_block( Ok(celestia_height) } +#[instrument(skip_all, err)] async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result { use sequencer_client::Client as _; @@ -633,3 +642,17 @@ async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result u64 { reference.saturating_add(variance.saturating_mul(6)) } + +#[instrument(skip_all)] +fn report_exit(exit_reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> { + match exit_reason { + Ok(reason) => { + info!(%reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } + } +} diff --git a/crates/astria-conductor/src/celestia/verify.rs b/crates/astria-conductor/src/celestia/verify.rs index 1f44662da5..67b964ed9c 100644 --- a/crates/astria-conductor/src/celestia/verify.rs +++ b/crates/astria-conductor/src/celestia/verify.rs @@ -38,6 +38,7 @@ use tracing::{ instrument, warn, Instrument, + Level, }; use tryhard::{ backoff_strategies::BackoffStrategy, @@ -206,6 +207,7 @@ struct VerificationMeta { } impl VerificationMeta { + #[instrument(skip_all, err(level = Level::WARN))] async fn fetch( client: RateLimitedVerificationClient, height: SequencerHeight, @@ -298,6 +300,7 @@ impl BlobVerifier { } } +#[instrument(skip_all, err)] async fn fetch_commit_with_retry( client: SequencerClient, height: SequencerHeight, @@ -332,6 +335,7 @@ async fn fetch_commit_with_retry( }) } +#[instrument(skip_all, err)] async fn fetch_validators_with_retry( client: SequencerClient, prev_height: SequencerHeight, @@ -447,6 +451,7 @@ struct RateLimitedVerificationClient { } impl RateLimitedVerificationClient { + #[instrument(skip_all, err)] async fn get_commit( mut self, height: SequencerHeight, @@ -468,6 +473,7 @@ impl RateLimitedVerificationClient { } } + #[instrument(skip_all, err)] async fn get_validators( mut self, prev_height: SequencerHeight, diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index b924294db2..7dffb08a09 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -23,6 +23,7 @@ use tokio_util::{ use tracing::{ error, info, + info_span, instrument, warn, }; @@ -52,6 +53,7 @@ impl Handle { /// /// # Panics /// Panics if called twice. + #[instrument(skip_all, err)] pub async fn shutdown(&mut self) -> Result<(), tokio::task::JoinError> { self.shutdown_token.cancel(); let task = self.task.take().expect("shutdown must not be called twice"); @@ -167,9 +169,8 @@ impl Conductor { /// /// # Panics /// Panics if it could not install a signal handler. - #[instrument(skip_all)] async fn run_until_stopped(mut self) { - info!("conductor is running"); + info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running")); let exit_reason = select! { biased; @@ -187,10 +188,7 @@ impl Conductor { }; let message = "initiating shutdown"; - match exit_reason { - Ok(reason) => info!(reason, message), - Err(reason) => error!(%reason, message), - } + report_exit(exit_reason, message); self.shutdown().await; } @@ -214,6 +212,7 @@ impl Conductor { /// Waits 25 seconds for all tasks to shut down before aborting them. 25 seconds /// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds /// to abort the remaining tasks. + #[instrument(skip_all)] async fn shutdown(mut self) { self.shutdown.cancel(); @@ -245,3 +244,11 @@ impl Conductor { info!("shutting down"); } } + +#[instrument(skip_all)] +fn report_exit(exit_reason: eyre::Result<&str>, message: &str) { + match exit_reason { + Ok(reason) => info!(%reason, message), + Err(reason) => error!(%reason, message), + } +} diff --git a/crates/astria-conductor/src/executor/channel.rs b/crates/astria-conductor/src/executor/channel.rs index c4348bd080..450dfb23d9 100644 --- a/crates/astria-conductor/src/executor/channel.rs +++ b/crates/astria-conductor/src/executor/channel.rs @@ -20,6 +20,7 @@ use tokio::sync::{ Semaphore, TryAcquireError, }; +use tracing::instrument; /// Creates an mpsc channel for sending soft blocks between asynchronous task. /// @@ -92,6 +93,7 @@ impl Sender { /// Sends a block, waiting until the channel has permits. /// /// Returns an error if the channel is closed. + #[instrument(skip_all, err)] pub(super) async fn send(&self, block: T) -> Result<(), SendError> { let sem = self.sem.upgrade().ok_or(SendError)?; let permit = sem.acquire().await?; @@ -151,6 +153,7 @@ impl Receiver { } /// Receives a block over the channel. + #[instrument(skip_all)] pub(super) async fn recv(&mut self) -> Option { self.chan.recv().await } diff --git a/crates/astria-conductor/src/executor/client.rs b/crates/astria-conductor/src/executor/client.rs index 46d784b4a2..6f9f9fcdb9 100644 --- a/crates/astria-conductor/src/executor/client.rs +++ b/crates/astria-conductor/src/executor/client.rs @@ -86,7 +86,7 @@ impl Client { } /// Calls remote procedure `astria.execution.v1alpha2.GetGenesisInfo` - #[instrument(skip_all, fields(uri = %self.uri))] + #[instrument(skip_all, fields(uri = %self.uri), err)] pub(crate) async fn get_genesis_info_with_retry(&mut self) -> eyre::Result { let response = tryhard::retry_fn(|| { let mut client = self.inner.clone(); @@ -182,7 +182,7 @@ impl Client { /// /// * `firm` - The firm block /// * `soft` - The soft block - #[instrument(skip_all, fields(uri = %self.uri))] + #[instrument(skip_all, fields(uri = %self.uri), err)] pub(super) async fn update_commitment_state_with_retry( &mut self, commitment_state: CommitmentState, diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 2ef56213d9..916a951e28 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -32,6 +32,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::{ debug, + debug_span, error, info, instrument, @@ -142,6 +143,7 @@ impl Handle { } impl Handle { + #[instrument(skip_all, err)] pub(crate) async fn send_firm_block( self, block: ReconstructedBlock, @@ -162,6 +164,7 @@ impl Handle { Ok(()) } + #[instrument(skip_all, err)] pub(crate) async fn send_soft_block_owned( self, block: FilteredSequencerBlock, @@ -197,6 +200,7 @@ impl Handle { self.state.next_expected_soft_sequencer_height() } + #[instrument(skip_all)] pub(crate) async fn next_expected_soft_height_if_changed( &mut self, ) -> Result { @@ -252,15 +256,13 @@ pub(crate) struct Executor { } impl Executor { - #[instrument(skip_all, err)] pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { select!( () = self.shutdown.clone().cancelled_owned() => { - info!( + return report_exit(Ok( "received shutdown signal while initializing task; \ aborting intialization and exiting" - ); - return Ok(()); + ), ""); } res = self.init() => { res.wrap_err("initialization failed")?; @@ -285,11 +287,11 @@ impl Executor { Some(block) = async { self.firm_blocks.as_mut().unwrap().recv().await }, if self.firm_blocks.is_some() => { - debug!( + debug_span!("conductor::Executor::run_until_stopped").in_scope(||debug!( block.height = %block.sequencer_height(), block.hash = %telemetry::display::base64(&block.block_hash), "received block from celestia reader", - ); + )); if let Err(error) = self.execute_firm(block).await { break Err(error).wrap_err("failed executing firm block"); } @@ -298,11 +300,11 @@ impl Executor { Some(block) = async { self.soft_blocks.as_mut().unwrap().recv().await }, if self.soft_blocks.is_some() && spread_not_too_large => { - debug!( + debug_span!("conductor::Executor::run_until_stopped").in_scope(||debug!( block.height = %block.height(), block.hash = %telemetry::display::base64(&block.block_hash()), "received block from sequencer reader", - ); + )); if let Err(error) = self.execute_soft(block).await { break Err(error).wrap_err("failed executing soft block"); } @@ -312,19 +314,11 @@ impl Executor { // XXX: explicitly setting the message (usually implicitly set by tracing) let message = "shutting down"; - match reason { - Ok(reason) => { - info!(reason, message); - Ok(()) - } - Err(reason) => { - error!(%reason, message); - Err(reason) - } - } + report_exit(reason, message) } /// Runs the init logic that needs to happen before [`Executor`] can enter its main loop. + #[instrument(skip_all, err)] async fn init(&mut self) -> eyre::Result<()> { self.set_initial_node_state() .await @@ -400,6 +394,7 @@ impl Executor { #[instrument(skip_all, fields( block.hash = %telemetry::display::base64(&block.block_hash()), block.height = block.height().value(), + err, ))] async fn execute_soft(&mut self, block: FilteredSequencerBlock) -> eyre::Result<()> { // TODO(https://github.com/astriaorg/astria/issues/624): add retry logic before failing hard. @@ -463,6 +458,7 @@ impl Executor { #[instrument(skip_all, fields( block.hash = %telemetry::display::base64(&block.block_hash), block.height = block.sequencer_height().value(), + err, ))] async fn execute_firm(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let celestia_height = block.celestia_height; @@ -544,6 +540,7 @@ impl Executor { block.height = block.height.value(), block.num_of_transactions = block.transactions.len(), rollup.parent_hash = %telemetry::display::base64(&parent_hash), + err ))] async fn execute_block( &mut self, @@ -576,7 +573,7 @@ impl Executor { Ok(executed_block) } - #[instrument(skip_all)] + #[instrument(skip_all, err)] async fn set_initial_node_state(&mut self) -> eyre::Result<()> { let genesis_info = { async { @@ -613,7 +610,7 @@ impl Executor { Ok(()) } - #[instrument(skip_all)] + #[instrument(skip_all, err)] async fn update_commitment_state(&mut self, update: Update) -> eyre::Result<()> { use Update::{ OnlyFirm, @@ -675,6 +672,20 @@ impl Executor { } } +#[instrument(skip_all)] +fn report_exit(reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> { + match reason { + Ok(reason) => { + info!(%reason, message); + Ok(()) + } + Err(error) => { + error!(%error, message); + Err(error) + } + } +} + enum Update { OnlyFirm(Block, CelestiaHeight), OnlySoft(Block), diff --git a/crates/astria-conductor/src/executor/state.rs b/crates/astria-conductor/src/executor/state.rs index bf63098341..ccaa49a6b6 100644 --- a/crates/astria-conductor/src/executor/state.rs +++ b/crates/astria-conductor/src/executor/state.rs @@ -20,6 +20,7 @@ use tokio::sync::watch::{ self, error::RecvError, }; +use tracing::instrument; pub(super) fn channel() -> (StateSender, StateReceiver) { let (tx, rx) = watch::channel(None); @@ -50,6 +51,7 @@ pub(super) struct StateReceiver { } impl StateReceiver { + #[instrument(skip_all, err)] pub(super) async fn wait_for_init(&mut self) -> eyre::Result<()> { self.inner .wait_for(Option::is_some) @@ -82,6 +84,7 @@ impl StateReceiver { ) } + #[instrument(skip_all)] pub(crate) async fn next_expected_soft_height_if_changed( &mut self, ) -> Result { diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index 535c61d4fd..719fb72050 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -28,10 +28,13 @@ use tokio::select; use tokio_util::sync::CancellationToken; use tracing::{ debug, + debug_span, error, info, + instrument, trace, warn, + warn_span, }; use crate::{ @@ -82,8 +85,7 @@ impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { let executor = select!( () = self.shutdown.clone().cancelled_owned() => { - info!("received shutdown signal while waiting for Sequencer reader task to initialize"); - return Ok(()); + return report_exit(Ok("received shutdown signal while waiting for Sequencer reader task to initialize"), ""); } res = self.initialize() => { res? @@ -95,6 +97,7 @@ impl Reader { .await } + #[instrument(skip_all, err)] async fn initialize(&mut self) -> eyre::Result> { self.executor .wait_for_init() @@ -174,16 +177,7 @@ impl RunningReader { // XXX: explicitly setting the message (usually implicitly set by tracing) let message = "shutting down"; - match stop_reason { - Ok(stop_reason) => { - info!(stop_reason, message); - Ok(()) - } - Err(stop_reason) => { - error!(%stop_reason, message); - Err(stop_reason) - } - } + report_exit(stop_reason, message) } async fn run_loop(&mut self) -> eyre::Result<&'static str> { @@ -200,7 +194,9 @@ impl RunningReader { // Process block execution which was enqueued due to executor channel being full. res = &mut self.enqueued_block, if !self.enqueued_block.is_terminated() => { res.wrap_err("failed sending enqueued block to executor")?; - debug!("submitted enqueued block to executor, resuming normal operation"); + debug_span!("conductor::sequencer::RunningReader::run_loop").in_scope(|| + debug!("submitted enqueued block to executor, resuming normal operation") + ); } // Skip heights that executor has already executed (e.g. firm blocks from Celestia) @@ -220,29 +216,37 @@ impl RunningReader { // otherwise recover from a failed block fetch. let block = block.wrap_err("the stream of new blocks returned a catastrophic error")?; if let Err(error) = self.block_cache.insert(block) { - warn!(%error, "failed pushing block into sequential cache, dropping it"); + warn_span!("conductor::sequencer::RunningReader::run_loop").in_scope(|| + warn!(%error, "failed pushing block into sequential cache, dropping it") + ); } } // Record the latest height of the Sequencer network, allowing `blocks_from_heights` to progress. Some(res) = self.latest_height_stream.next() => { - match res { - Ok(height) => { - debug!(%height, "received latest height from sequencer"); - self.blocks_from_heights.set_latest_observed_height_if_greater(height); - } - Err(error) => { - warn!( - error = %Report::new(error), - "failed fetching latest height from sequencer; waiting until next tick", - ); - } - } + self.handle_latest_height(res); } } } } + #[instrument(skip_all)] + fn handle_latest_height(&mut self, res: Result) { + match res { + Ok(height) => { + debug!(%height, "received latest height from sequencer"); + self.blocks_from_heights + .set_latest_observed_height_if_greater(height); + } + Err(error) => { + warn!( + error = %Report::new(error), + "failed fetching latest height from sequencer; waiting until next tick", + ); + } + } + } + /// Sends `block` to the executor task. /// /// Enqueues the block is the channel to the executor is full, sending it once @@ -295,3 +299,17 @@ impl RunningReader { self.block_cache.drop_obsolete(next_height); } } + +#[instrument(skip_all)] +fn report_exit(reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> { + match reason { + Ok(reason) => { + info!(%reason, message); + Ok(()) + } + Err(reason) => { + error!(%reason, message); + Err(reason) + } + } +}