Skip to content

Commit

Permalink
chore: Restructure/tidy logging
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Feb 10, 2024
1 parent 488d468 commit 3f8bac2
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 41 deletions.
62 changes: 32 additions & 30 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,12 @@ pub async fn synchronise_block_streams(
})
.map(|index| active_block_streams.swap_remove(index));

let span = tracing::info_span!(
"Synchronising block stream",
account_id = account_id.as_str(),
function_name = function_name.as_str(),
current_version = indexer_config.get_registry_version()
);

synchronise_block_stream(
active_block_stream,
indexer_config,
redis_client,
block_streams_handler,
)
.instrument(span)
.await?;
}
}
Expand All @@ -59,6 +51,14 @@ pub async fn synchronise_block_streams(
Ok(())
}

#[tracing::instrument(
skip_all,
fields(
account_id = %indexer_config.account_id,
function_name = indexer_config.function_name,
version = indexer_config.get_registry_version()
)
)]
async fn synchronise_block_stream(
active_block_stream: Option<StreamInfo>,
indexer_config: &IndexerConfig,
Expand All @@ -82,12 +82,10 @@ async fn synchronise_block_stream(

let stream_version = redis_client.get_stream_version(indexer_config).await?;

let start_block_height =
determine_start_block_height(stream_version, indexer_config, redis_client).await?;

clear_block_stream_if_needed(stream_version, indexer_config, redis_client).await?;

tracing::info!("Starting block stream");
let start_block_height =
determine_start_block_height(stream_version, indexer_config, redis_client).await?;

block_streams_handler
.start(start_block_height, indexer_config)
Expand Down Expand Up @@ -131,32 +129,36 @@ async fn determine_start_block_height(
let just_migrated_or_unmodified =
stream_version.map_or(true, |version| version == registry_version);

if just_migrated_or_unmodified || indexer_config.start_block == StartBlock::Continue {
return Ok(redis_client
.get_last_published_block(indexer_config)
.await?
.unwrap_or_else(|| {
tracing::warn!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
version = registry_version,
"Indexer has no `last_published_block`, using registry version"
);

// TODO Probably throw error rather than use this
registry_version
}));
if just_migrated_or_unmodified {
tracing::info!("Resuming block stream");

return get_continuation_block_height(indexer_config, redis_client).await;
}

tracing::info!("Stating new block stream");

match indexer_config.start_block {
StartBlock::Latest => Ok(registry_version),
StartBlock::Height(height) => Ok(height),
_ => {
unreachable!("StartBlock::Continue already handled")
}
StartBlock::Continue => get_continuation_block_height(indexer_config, redis_client).await,
}
}

async fn get_continuation_block_height(
indexer_config: &IndexerConfig,
redis_client: &RedisClient,
) -> anyhow::Result<u64> {
Ok(redis_client
.get_last_published_block(indexer_config)
.await?
.unwrap_or_else(|| {
tracing::warn!("Indexer has no `last_published_block`, using registry version");

// TODO Probably throw error rather than use this
indexer_config.get_registry_version()
}))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
21 changes: 10 additions & 11 deletions coordinator/src/executors/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,8 @@ pub async fn synchronise_executors(
})
.map(|index| active_executors.swap_remove(index));

let span = tracing::info_span!(
"Synchronising executor",
account_id = account_id.as_str(),
function_name = function_name.as_str(),
current_version = indexer_config.get_registry_version()
);

// TODO capture errors thrown in here
synchronise_executor(active_executor, indexer_config, executors_handler)
.instrument(span)
.await?;
synchronise_executor(active_executor, indexer_config, executors_handler).await?;
}
}

Expand All @@ -60,6 +51,14 @@ pub async fn synchronise_executors(
Ok(())
}

#[tracing::instrument(
skip_all,
fields(
account_id = %indexer_config.account_id,
function_name = indexer_config.function_name,
version = indexer_config.get_registry_version()
)
)]
async fn synchronise_executor(
active_executor: Option<ExecutorInfo>,
indexer_config: &IndexerConfig,
Expand All @@ -72,7 +71,7 @@ async fn synchronise_executor(
return Ok(());
}

tracing::info!("Stopping executor");
tracing::info!("Stopping outdated executor");

executors_handler.stop(active_executor.executor_id).await?;
}
Expand Down

0 comments on commit 3f8bac2

Please sign in to comment.