Skip to content

Commit

Permalink
chore: Add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jul 16, 2024
1 parent e7039aa commit 72fbad5
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ impl<'a> LifecycleManager<'a> {
}
}

#[tracing::instrument(name = "initializing", skip_all)]
async fn handle_initializing(
&self,
config: &IndexerConfig,
_state: &IndexerState,
) -> LifecycleState {
tracing::info!("Initializing");

if self
.data_layer_handler
.ensure_provisioned(config)
Expand All @@ -68,52 +71,54 @@ impl<'a> LifecycleManager<'a> {
LifecycleState::Running
}

#[tracing::instrument(name = "running", skip_all)]
async fn handle_running(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState {
tracing::info!("Running");

if !state.enabled {
return LifecycleState::Stopping;
}

if self
if let Err(error) = self
.block_streams_handler
.synchronise_block_stream(config, state.block_stream_synced_at)
.await
.is_err()
{
tracing::warn!(?error, "Failed to synchronise block stream");

return LifecycleState::Repairing;
}

if self
.executors_handler
.synchronise_executor(config)
.await
.is_err()
{
if let Err(error) = self.executors_handler.synchronise_executor(config).await {
tracing::warn!(?error, "Failed to synchronise executor");

return LifecycleState::Repairing;
}

LifecycleState::Running
}

#[tracing::instrument(name = "stopping", skip_all)]
async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState {
if self
.block_streams_handler
.stop_if_needed(config)
.await
.is_err()
{
// Retry
tracing::info!("Stopping");

if let Err(error) = self.block_streams_handler.stop_if_needed(config).await {
tracing::warn!(?error, "Failed to stop block stream, retrying...");
return LifecycleState::Stopping;
}

if self.executors_handler.stop_if_needed(config).await.is_err() {
// Retry
if let Err(error) = self.executors_handler.stop_if_needed(config).await {
tracing::warn!(?error, "Failed to stop executor, retrying...");
return LifecycleState::Stopping;
}

LifecycleState::Stopped
}

#[tracing::instrument(name = "stopped", skip_all)]
async fn handle_stopped(&self, state: &IndexerState) -> LifecycleState {
tracing::info!("Stopped");

// TODO Transistion to `Running` on config update

if state.enabled {
Expand All @@ -123,16 +128,22 @@ impl<'a> LifecycleManager<'a> {
LifecycleState::Stopped
}

#[tracing::instrument(name = "repairing", skip_all)]
async fn handle_repairing(
&self,
_config: &IndexerConfig,
_state: &IndexerState,
) -> LifecycleState {
tracing::info!("Repairing");

// TODO Add more robust error handling, for now just stop
LifecycleState::Stopping
}

#[tracing::instrument(name = "deleting", skip_all)]
async fn handle_deleting(&self, state: &IndexerState) -> LifecycleState {
tracing::info!("Deleting");

if self.state_manager.delete_state(state).await.is_err() {
// Retry
return LifecycleState::Deleting;
Expand Down Expand Up @@ -160,6 +171,14 @@ impl<'a> LifecycleManager<'a> {
LifecycleState::Deleted
}

#[tracing::instrument(
name = "lifecycle_manager",
skip(self),
fields(
account_id = self.initial_config.account_id.as_str(),
function_name = self.initial_config.function_name.as_str()
)
)]
pub async fn run(&self) {
loop {
tokio::time::sleep(std::time::Duration::from_millis(LOOP_THROTTLE_MS)).await;
Expand All @@ -173,16 +192,22 @@ impl<'a> LifecycleManager<'a> {
.await
{
Ok(config) => config,
Err(_) => continue,
Err(error) => {
tracing::warn!(?error, "Failed to fetch config");
continue;
}
};

let mut state = match self.state_manager.get_state(&self.initial_config).await {
Ok(state) => state,
Err(_) => continue,
Err(error) => {
tracing::warn!(?error, "Failed to get state");
continue;
}
};

let next_lifecycle_state = if let Some(config) = config.clone() {
match state.lifecycle {
match state.lifecycle_state {
LifecycleState::Initializing => self.handle_initializing(&config, &state).await,
LifecycleState::Running => self.handle_running(&config, &state).await,
LifecycleState::Stopping => self.handle_stopping(&config).await,
Expand All @@ -195,7 +220,7 @@ impl<'a> LifecycleManager<'a> {
self.handle_deleting(&state).await
};

state.lifecycle = next_lifecycle_state;
state.lifecycle_state = next_lifecycle_state;

loop {
match self
Expand All @@ -205,7 +230,7 @@ impl<'a> LifecycleManager<'a> {
{
Ok(_) => break,
Err(e) => {
tracing::error!("Failed to set state: {:?}. Retrying...", e);
tracing::warn!("Failed to set state: {:?}. Retrying...", e);

tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
}
Expand Down

0 comments on commit 72fbad5

Please sign in to comment.