Skip to content

Commit

Permalink
test: Test starting of block streams
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Dec 18, 2023
1 parent 8ad0d9c commit cba8500
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 42 deletions.
203 changes: 163 additions & 40 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,53 +62,176 @@ async fn synchronise_registry_config(
mod tests {
use super::*;

use mockall::predicate;
use std::collections::HashMap;

use registry_types::{IndexerRule, IndexerRuleKind, MatchingRule, Status};

use crate::registry::IndexerConfig;

#[tokio::test]
async fn something() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
mod block_stream {
use super::*;

#[tokio::test]
async fn uses_start_block_height_when_set() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 1,
updated_at_block_height: None,
start_block_height: Some(100),
},
created_at_block_height: 1,
updated_at_block_height: None,
start_block_height: None,
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| Ok(1));

let mut block_stream_handler = BlockStreamHandler::default();
block_stream_handler
.expect_start()
.returning(|_, _, _, _| Ok(()));

let _ =
synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler).await;
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(100),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn uses_updated_at_when_no_start_block_height() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 101,
updated_at_block_height: Some(200),
start_block_height: None,
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(200),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}

#[tokio::test]
async fn uses_created_at_when_no_updated_at_block_height() {
let mut registry = Registry::default();
registry.expect_fetch().returning(|| {
Ok(HashMap::from([(
"morgs.near".parse().unwrap(),
HashMap::from([(
"test".to_string(),
IndexerConfig {
account_id: "morgs.near".parse().unwrap(),
function_name: "test".to_string(),
code: String::new(),
schema: Some(String::new()),
filter: IndexerRule {
id: None,
name: None,
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
},
},
created_at_block_height: 101,
updated_at_block_height: None,
start_block_height: None,
},
)]),
)]))
});

let mut redis_client = RedisClient::default();
redis_client
.expect_get::<String, u64>()
.returning(|_| anyhow::bail!("none"));

let mut block_stream_handler = BlockStreamsHandler::default();
block_stream_handler
.expect_start()
.with(
predicate::eq(101),
predicate::eq("morgs.near".to_string()),
predicate::eq("test".to_string()),
predicate::eq(MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: Status::Any,
}),
)
.returning(|_, _, _, _| Ok(()));

synchronise_registry_config(&registry, &redis_client, &mut block_stream_handler)
.await
.unwrap();
}
}
}
6 changes: 4 additions & 2 deletions coordinator/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;

use redis::{aio::ConnectionManager, FromRedisValue, RedisError, ToRedisArgs};
pub use redis::RedisError;
use redis::{aio::ConnectionManager, FromRedisValue, ToRedisArgs};

#[cfg(test)]
pub use MockRedisClientImpl as RedisClient;
Expand All @@ -21,7 +22,7 @@ impl RedisClientImpl {
Ok(Self { connection })
}

pub async fn get<T, U>(&self, key: T) -> Result<U, RedisError>
pub async fn get<T, U>(&self, key: T) -> anyhow::Result<U>
where
T: ToRedisArgs + Debug + Send + Sync + 'static,
U: FromRedisValue + Send + Sync + 'static,
Expand All @@ -32,5 +33,6 @@ impl RedisClientImpl {
.arg(key)
.query_async(&mut self.connection.clone())
.await
.map_err(|e| e.into())
}
}

0 comments on commit cba8500

Please sign in to comment.