From 2286a63312bf221610de1d7689ebc245020446a0 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Tue, 19 Dec 2023 09:44:49 +1300 Subject: [PATCH] feat: Write block stream version from coordinator --- coordinator/src/block_streams_handler.rs | 2 ++ coordinator/src/main.rs | 42 ++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/coordinator/src/block_streams_handler.rs b/coordinator/src/block_streams_handler.rs index eccc7e841..206a2d27c 100644 --- a/coordinator/src/block_streams_handler.rs +++ b/coordinator/src/block_streams_handler.rs @@ -26,6 +26,7 @@ impl BlockStreamsHandlerImpl { start_block_height: u64, account_id: String, function_name: String, + version: u64, rule: registry_types::MatchingRule, ) -> anyhow::Result<()> { let rule = match &rule { @@ -49,6 +50,7 @@ impl BlockStreamsHandlerImpl { start_block_height, account_id, function_name, + version, rule: Some(rule), })) .await?; diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 73a75138c..4a7a85c50 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -49,6 +49,9 @@ async fn synchronise_registry_config( start_block_height, indexer_config.account_id.to_string(), indexer_config.function_name.clone(), + indexer_config + .updated_at_block_height + .unwrap_or(indexer_config.created_at_block_height), indexer_config.filter.matching_rule.clone(), ) .await?; @@ -114,12 +117,13 @@ mod tests { predicate::eq(100), predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), + predicate::eq(1), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _| Ok(())); + .returning(|_, _, _, _, _| Ok(())); synchronise_registry_config(®istry, &redis_client, &mut block_stream_handler) .await @@ -168,12 +172,13 @@ mod tests { predicate::eq(200), predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), + predicate::eq(200), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _| Ok(())); + .returning(|_, _, _, _, _| Ok(())); synchronise_registry_config(®istry, &redis_client, &mut block_stream_handler) .await @@ -222,12 +227,43 @@ mod tests { predicate::eq(101), predicate::eq("morgs.near".to_string()), predicate::eq("test".to_string()), + predicate::eq(101), + predicate::eq(MatchingRule::ActionAny { + affected_account_id: "queryapi.dataplatform.near".to_string(), + status: Status::Any, + }), + ) + .returning(|_, _, _, _, _| Ok(())); + + synchronise_registry_config(®istry, &redis_client, &mut block_stream_handler) + .await + .unwrap(); + } + + #[tokio::test] + async fn stops_streams_which_arent_in_registry() { + let mut registry = Registry::default(); + registry.expect_fetch().returning(|| Ok(HashMap::from([]))); + + let mut redis_client = RedisClient::default(); + redis_client + .expect_get::() + .returning(|_| anyhow::bail!("none")); + + let mut block_stream_handler = BlockStreamsHandler::default(); + block_stream_handler + .expect_start() + .with( + predicate::eq(101), + predicate::eq("morgs.near".to_string()), + predicate::eq("test".to_string()), + predicate::eq(101), predicate::eq(MatchingRule::ActionAny { affected_account_id: "queryapi.dataplatform.near".to_string(), status: Status::Any, }), ) - .returning(|_, _, _, _| Ok(())); + .returning(|_, _, _, _, _| Ok(())); synchronise_registry_config(®istry, &redis_client, &mut block_stream_handler) .await