From 72fbad54455e3cdc65c480be12eb71698ddf83c6 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 16 Jul 2024 16:57:02 +1200 Subject: [PATCH] chore: Add logging --- coordinator/src/lifecycle.rs | 69 ++++++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/coordinator/src/lifecycle.rs b/coordinator/src/lifecycle.rs index 65b5053b..cc7877bb 100644 --- a/coordinator/src/lifecycle.rs +++ b/coordinator/src/lifecycle.rs @@ -51,11 +51,14 @@ impl<'a> LifecycleManager<'a> { } } + #[tracing::instrument(name = "initializing", skip_all)] async fn handle_initializing( &self, config: &IndexerConfig, _state: &IndexerState, ) -> LifecycleState { + tracing::info!("Initializing"); + if self .data_layer_handler .ensure_provisioned(config) @@ -68,52 +71,54 @@ impl<'a> LifecycleManager<'a> { LifecycleState::Running } + #[tracing::instrument(name = "running", skip_all)] async fn handle_running(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState { + tracing::info!("Running"); + if !state.enabled { return LifecycleState::Stopping; } - if self + if let Err(error) = self .block_streams_handler .synchronise_block_stream(config, state.block_stream_synced_at) .await - .is_err() { + tracing::warn!(?error, "Failed to synchronise block stream"); + return LifecycleState::Repairing; } - if self - .executors_handler - .synchronise_executor(config) - .await - .is_err() - { + if let Err(error) = self.executors_handler.synchronise_executor(config).await { + tracing::warn!(?error, "Failed to synchronise executor"); + return LifecycleState::Repairing; } LifecycleState::Running } + #[tracing::instrument(name = "stopping", skip_all)] async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState { - if self - .block_streams_handler - .stop_if_needed(config) - .await - .is_err() - { - // Retry + tracing::info!("Stopping"); + + if let Err(error) = self.block_streams_handler.stop_if_needed(config).await { + tracing::warn!(?error, "Failed to stop block stream, retrying..."); return LifecycleState::Stopping; } - if self.executors_handler.stop_if_needed(config).await.is_err() { - // Retry + if let Err(error) = self.executors_handler.stop_if_needed(config).await { + tracing::warn!(?error, "Failed to stop executor, retrying..."); return LifecycleState::Stopping; } LifecycleState::Stopped } + #[tracing::instrument(name = "stopped", skip_all)] async fn handle_stopped(&self, state: &IndexerState) -> LifecycleState { + tracing::info!("Stopped"); + // TODO Transistion to `Running` on config update if state.enabled { @@ -123,16 +128,22 @@ impl<'a> LifecycleManager<'a> { LifecycleState::Stopped } + #[tracing::instrument(name = "repairing", skip_all)] async fn handle_repairing( &self, _config: &IndexerConfig, _state: &IndexerState, ) -> LifecycleState { + tracing::info!("Repairing"); + // TODO Add more robust error handling, for now just stop LifecycleState::Stopping } + #[tracing::instrument(name = "deleting", skip_all)] async fn handle_deleting(&self, state: &IndexerState) -> LifecycleState { + tracing::info!("Deleting"); + if self.state_manager.delete_state(state).await.is_err() { // Retry return LifecycleState::Deleting; @@ -160,6 +171,14 @@ impl<'a> LifecycleManager<'a> { LifecycleState::Deleted } + #[tracing::instrument( + name = "lifecycle_manager", + skip(self), + fields( + account_id = self.initial_config.account_id.as_str(), + function_name = self.initial_config.function_name.as_str() + ) + )] pub async fn run(&self) { loop { tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await; @@ -173,16 +192,22 @@ impl<'a> LifecycleManager<'a> { .await { Ok(config) => config, - Err(_) => continue, + Err(error) => { + tracing::warn!(?error, "Failed to fetch config"); + continue; + } }; let mut state = match self.state_manager.get_state(&self.initial_config).await { Ok(state) => state, - Err(_) => continue, + Err(error) => { + tracing::warn!(?error, "Failed to get state"); + continue; + } }; let next_lifecycle_state = if let Some(config) = config.clone() { - match state.lifecycle { + match state.lifecycle_state { LifecycleState::Initializing => self.handle_initializing(&config, &state).await, LifecycleState::Running => self.handle_running(&config, &state).await, LifecycleState::Stopping => self.handle_stopping(&config).await, @@ -195,7 +220,7 @@ impl<'a> LifecycleManager<'a> { self.handle_deleting(&state).await }; - state.lifecycle = next_lifecycle_state; + state.lifecycle_state = next_lifecycle_state; loop { match self @@ -205,7 +230,7 @@ impl<'a> LifecycleManager<'a> { { Ok(_) => break, Err(e) => { - tracing::error!("Failed to set state: {:?}. Retrying...", e); + tracing::warn!("Failed to set state: {:?}. Retrying...", e); tokio::time::sleep(std::time::Duration::from_millis(1000)).await; }