Skip to content

Commit

Permalink
feat: Write block stream version from coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 18, 2023
1 parent 89f966e commit 2286a63
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
2 changes: 2 additions & 0 deletions coordinator/src/block_streams_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl BlockStreamsHandlerImpl {
start_block_height: u64,
account_id: String,
function_name: String,
version: u64,
rule: registry_types::MatchingRule,
) -> anyhow::Result<()> {
let rule = match &rule {
Expand All @@ -49,6 +50,7 @@ impl BlockStreamsHandlerImpl {
start_block_height,
account_id,
function_name,
version,
rule: Some(rule),
}))
.await?;
Expand Down
42 changes: 39 additions & 3 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ async fn synchronise_registry_config(
start_block_height,
indexer_config.account_id.to_string(),
indexer_config.function_name.clone(),
indexer_config
.updated_at_block_height
.unwrap_or(indexer_config.created_at_block_height),
indexer_config.filter.matching_rule.clone(),
)
.await?;
Expand Down Expand Up @@ -114,12 +117,13 @@ mod tests {
predicate::eq(100),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(1),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
Expand Down Expand Up @@ -168,12 +172,13 @@ mod tests {
predicate::eq(200),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(200),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
Expand Down Expand Up @@ -222,12 +227,43 @@ mod tests {
predicate::eq(101),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(101),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn stops_streams_which_arent_in_registry() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| Ok(HashMap::from([])));

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(101),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(101),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));
.returning(|_, _, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
Expand Down

0 comments on commit 2286a63

Please sign in to comment.