Skip to content

Commit

Permalink
refactor: Simplify nested registry iteration (#730)
Browse files Browse the repository at this point in the history
In Coordinator, to iterate over all Indexers you need two loops: one for
the accounts, and another for its functions. As iteration is quite
common, I've added a custom `Iterator` implementation which achieves the
same with a single loop.
  • Loading branch information
morgsmccauley authored May 15, 2024
1 parent f70284d commit 8d8a020
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 155 deletions.
67 changes: 33 additions & 34 deletions coordinator/src/block_streams/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,31 @@ pub async fn synchronise_block_streams(
) -> anyhow::Result<()> {
let mut active_block_streams = block_streams_handler.list().await?;

for (account_id, indexers) in indexer_registry.iter() {
for (function_name, indexer_config) in indexers.iter() {
let active_block_stream = active_block_streams
.iter()
.position(|stream| {
stream.account_id == *account_id && &stream.function_name == function_name
})
.map(|index| active_block_streams.swap_remove(index));

let _ = synchronise_block_stream(
active_block_stream,
indexer_config,
indexer_manager,
redis_client,
block_streams_handler,
for indexer_config in indexer_registry.iter() {
let active_block_stream = active_block_streams
.iter()
.position(|stream| {
stream.account_id == *indexer_config.account_id
&& stream.function_name == indexer_config.function_name
})
.map(|index| active_block_streams.swap_remove(index));

let _ = synchronise_block_stream(
active_block_stream,
indexer_config,
indexer_manager,
redis_client,
block_streams_handler,
)
.await
.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
version = indexer_config.get_registry_version(),
"failed to sync block stream: {err:?}"
)
.await
.map_err(|err| {
tracing::error!(
account_id = account_id.as_str(),
function_name,
version = indexer_config.get_registry_version(),
"failed to sync block stream: {err:?}"
)
});
}
});
}

for unregistered_block_stream in active_block_streams {
Expand Down Expand Up @@ -185,7 +184,7 @@ mod tests {
start_block: StartBlock::Height(100),
};

let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -243,7 +242,7 @@ mod tests {
start_block: StartBlock::Latest,
};

let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -300,7 +299,7 @@ mod tests {
updated_at_block_height: Some(200),
start_block: StartBlock::Height(100),
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -357,7 +356,7 @@ mod tests {
updated_at_block_height: Some(200),
start_block: StartBlock::Continue,
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -401,7 +400,7 @@ mod tests {

#[tokio::test]
async fn stops_stream_not_in_registry() {
let indexer_registry = HashMap::from([]);
let indexer_registry = IndexerRegistry::from(&[]);

let redis_client = RedisClient::default();

Expand Down Expand Up @@ -447,7 +446,7 @@ mod tests {
updated_at_block_height: None,
start_block: StartBlock::Latest,
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -497,7 +496,7 @@ mod tests {
updated_at_block_height: Some(199),
start_block: StartBlock::Height(1000),
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -565,7 +564,7 @@ mod tests {
updated_at_block_height: Some(200),
start_block: StartBlock::Continue,
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down Expand Up @@ -613,7 +612,7 @@ mod tests {
updated_at_block_height: None,
start_block: StartBlock::Height(50),
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry::from(&[(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
Expand Down
65 changes: 28 additions & 37 deletions coordinator/src/executors/synchronise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,31 @@ use crate::registry::IndexerRegistry;

use super::handler::{ExecutorInfo, ExecutorsHandler};

const V1_EXECUTOR_VERSION: u64 = 0;

pub async fn synchronise_executors(
indexer_registry: &IndexerRegistry,
executors_handler: &ExecutorsHandler,
) -> anyhow::Result<()> {
let active_executors = executors_handler.list().await?;

// Ignore V1 executors
let mut active_executors: Vec<_> = active_executors
.into_iter()
.filter(|executor| executor.version != V1_EXECUTOR_VERSION)
.collect();

for (account_id, indexers) in indexer_registry.iter() {
for (function_name, indexer_config) in indexers.iter() {
let active_executor = active_executors
.iter()
.position(|stream| {
stream.account_id == *account_id && &stream.function_name == function_name
})
.map(|index| active_executors.swap_remove(index));

let _ = synchronise_executor(active_executor, indexer_config, executors_handler)
.await
.map_err(|err| {
tracing::error!(
account_id = account_id.as_str(),
function_name,
version = indexer_config.get_registry_version(),
"failed to sync executor: {err:?}"
)
});
}
let mut active_executors = executors_handler.list().await?;

for indexer_config in indexer_registry.iter() {
let active_executor = active_executors
.iter()
.position(|stream| {
stream.account_id == *indexer_config.account_id
&& stream.function_name == indexer_config.function_name
})
.map(|index| active_executors.swap_remove(index));

let _ = synchronise_executor(active_executor, indexer_config, executors_handler)
.await
.map_err(|err| {
tracing::error!(
account_id = indexer_config.account_id.as_str(),
function_name = indexer_config.function_name,
version = indexer_config.get_registry_version(),
"failed to sync executor: {err:?}"
)
});
}

for unregistered_executor in active_executors {
Expand Down Expand Up @@ -113,10 +104,10 @@ mod tests {
updated_at_block_height: None,
start_block: StartBlock::Height(100),
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
)]));

let mut executors_handler = ExecutorsHandler::default();
executors_handler.expect_list().returning(|| Ok(vec![]));
Expand Down Expand Up @@ -146,10 +137,10 @@ mod tests {
updated_at_block_height: Some(2),
start_block: StartBlock::Height(100),
};
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), indexer_config.clone())]),
)]);
)]));

let mut executors_handler = ExecutorsHandler::default();
executors_handler.expect_list().returning(|| {
Expand Down Expand Up @@ -180,7 +171,7 @@ mod tests {

#[tokio::test]
async fn ignores_executor_with_matching_registry_version() {
let indexer_registry = HashMap::from([(
let indexer_registry = IndexerRegistry(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
Expand All @@ -198,7 +189,7 @@ mod tests {
start_block: StartBlock::Height(100),
},
)]),
)]);
)]));

let mut executors_handler = ExecutorsHandler::default();
executors_handler.expect_list().returning(|| {
Expand All @@ -221,7 +212,7 @@ mod tests {

#[tokio::test]
async fn stops_executor_not_in_registry() {
let indexer_registry = HashMap::from([]);
let indexer_registry = IndexerRegistry::from(&[]);

let mut executors_handler = ExecutorsHandler::default();
executors_handler.expect_list().returning(|| {
Expand Down
80 changes: 36 additions & 44 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,17 @@ impl IndexerStateManagerImpl {
&self,
indexer_registry: &IndexerRegistry,
) -> anyhow::Result<IndexerRegistry> {
let mut filtered_registry: IndexerRegistry = std::collections::HashMap::new();
let mut filtered_registry = IndexerRegistry::new();

for (account_id, indexers) in indexer_registry.iter() {
for (function_name, indexer_config) in indexers.iter() {
let indexer_state = self.get_state(indexer_config).await?;
for indexer_config in indexer_registry.iter() {
let indexer_state = self.get_state(indexer_config).await?;

if indexer_state.enabled {
filtered_registry
.entry(account_id.clone())
.or_default()
.insert(function_name.clone(), indexer_config.clone());
}
if indexer_state.enabled {
filtered_registry
.0
.entry(indexer_config.account_id.clone())
.or_default()
.insert(indexer_config.function_name.clone(), indexer_config.clone());
}
}

Expand All @@ -112,20 +111,16 @@ impl IndexerStateManagerImpl {
if self.redis_client.is_migration_complete().await?.is_none() {
tracing::info!("Migrating indexer state");

for (_, indexers) in indexer_registry.iter() {
for (_, indexer_config) in indexers.iter() {
if let Some(version) =
self.redis_client.get_stream_version(indexer_config).await?
{
self.redis_client
.set_indexer_state(
indexer_config,
serde_json::to_string(&OldIndexerState {
block_stream_synced_at: Some(version),
})?,
)
.await?;
}
for indexer_config in indexer_registry.iter() {
if let Some(version) = self.redis_client.get_stream_version(indexer_config).await? {
self.redis_client
.set_indexer_state(
indexer_config,
serde_json::to_string(&OldIndexerState {
block_stream_synced_at: Some(version),
})?,
)
.await?;
}
}

Expand All @@ -142,24 +137,21 @@ impl IndexerStateManagerImpl {
{
tracing::info!("Migrating enabled flag");

for (_, indexers) in indexer_registry.iter() {
for (_, indexer_config) in indexers.iter() {
let existing_state =
self.redis_client.get_indexer_state(indexer_config).await?;

let state = match existing_state {
Some(state) => {
let old_state: OldIndexerState = serde_json::from_str(&state)?;
IndexerState {
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: true,
}
for indexer_config in indexer_registry.iter() {
let existing_state = self.redis_client.get_indexer_state(indexer_config).await?;

let state = match existing_state {
Some(state) => {
let old_state: OldIndexerState = serde_json::from_str(&state)?;
IndexerState {
block_stream_synced_at: old_state.block_stream_synced_at,
enabled: true,
}
None => IndexerState::default(),
};
}
None => IndexerState::default(),
};

self.set_state(indexer_config, state).await?;
}
self.set_state(indexer_config, state).await?;
}

self.redis_client
Expand Down Expand Up @@ -250,7 +242,7 @@ mod tests {
start_block: StartBlock::Height(100),
};

let indexer_registry = HashMap::from([
let indexer_registry = IndexerRegistry::from(&[
(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), morgs_config.clone())]),
Expand Down Expand Up @@ -322,7 +314,7 @@ mod tests {
start_block: StartBlock::Height(100),
};

let indexer_registry = HashMap::from([
let indexer_registry = IndexerRegistry::from(&[
(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), morgs_config.clone())]),
Expand Down Expand Up @@ -439,7 +431,7 @@ mod tests {
start_block: StartBlock::Height(100),
};

let indexer_registry = HashMap::from([
let indexer_registry = IndexerRegistry(HashMap::from([
(
"morgs.near".parse().unwrap(),
HashMap::from([("test".to_string(), morgs_config.clone())]),
Expand All @@ -448,7 +440,7 @@ mod tests {
"darunrs.near".parse().unwrap(),
HashMap::from([("test".to_string(), darunrs_config.clone())]),
),
]);
]));

let mut mock_redis_client = RedisClient::default();
mock_redis_client
Expand Down
Loading

0 comments on commit 8d8a020

Please sign in to comment.