Skip to content

Commit

Permalink
feat: Handle StartBlock options within Coordinator & Block Streamer (
Browse files Browse the repository at this point in the history
…#553)

This PR adds support for the updated Registry types, including the new
`StartBlock` config option, across the Block Streamer/Coordinator.

With `StartBlock`, the logic within Coordinator is much more straight
forward - we no longer need to guess whether we should "continue" or
"start over", that is build in to the configuration options.

This only really affects the handling of Block Streams, the executor
flow remains the same: always restart when a new version is published.

To summarise the Block Stream synchronisation process:
- `StartBlock::Continue` - Resumes process, keeping the current data in
the Redis Stream, and start the Block Stream from `last_published_block`
- `StartBlock::Latest` - Starts a new process, clears the Redis Stream,
starts the Block Stream from the registry version, essentially being
latest
- `StartBlock::Height(u64)` - Starts a new process, clears the Redis
Stream, starts the Block Stream from the height configured

Additionally, Accounts/Indexers which have just been migrated, and also
streams which have been stopped but have unchanged versions (e.g. after
Block Streamer restart), will be treated the same as
`StartBlock::Continue`.
  • Loading branch information
morgsmccauley authored Feb 14, 2024
1 parent c8ff7d1 commit 2fc8311
Show file tree
Hide file tree
Showing 20 changed files with 1,202 additions and 1,144 deletions.
23 changes: 9 additions & 14 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::task::JoinHandle;

use crate::indexer_config::IndexerConfig;
use crate::rules::types::ChainId;
use registry_types::MatchingRule;
use registry_types::Rule;

/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
Expand Down Expand Up @@ -135,8 +135,8 @@ pub(crate) async fn start_block_stream(
.last_indexed_block
.parse::<near_indexer_primitives::types::BlockHeight>()?;

let blocks_from_index = match &indexer.indexer_rule.matching_rule {
MatchingRule::ActionAny {
let blocks_from_index = match &indexer.rule {
Rule::ActionAny {
affected_account_id,
..
} => {
Expand All @@ -151,11 +151,11 @@ pub(crate) async fn start_block_stream(
.list_matching_block_heights(start_block_height, affected_account_id)
.await
}
MatchingRule::ActionFunctionCall { .. } => {
Rule::ActionFunctionCall { .. } => {
tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])
}
MatchingRule::Event { .. } => {
Rule::Event { .. } => {
tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])
}
Expand Down Expand Up @@ -222,7 +222,7 @@ pub(crate) async fn start_block_stream(
.context("Failed to set last_published_block")?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&indexer.rule,
&streamer_message,
chain_id.clone(),
);
Expand Down Expand Up @@ -294,14 +294,9 @@ mod tests {
)
.unwrap(),
function_name: "test".to_string(),
indexer_rule: registry_types::OldIndexerRule {
indexer_rule_kind: registry_types::IndexerRuleKind::Action,
matching_rule: registry_types::MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
name: None,
id: None,
rule: registry_types::Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
};

Expand Down
8 changes: 2 additions & 6 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ use near_lake_framework::near_indexer_primitives::types::AccountId;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use registry_types::OldIndexerRule as IndexerRule;
use registry_types::Rule;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct IndexerConfig {
pub account_id: AccountId,
pub function_name: String,
// pub code: String,
// pub start_block_height: Option<u64>,
// pub schema: Option<String>,
// pub provisioned: bool,
pub indexer_rule: IndexerRule,
pub rule: Rule,
}

impl IndexerConfig {
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(test, allow(dead_code))]

use std::fmt::Debug;

use redis::{aio::ConnectionManager, RedisError, ToRedisArgs};
Expand Down
12 changes: 6 additions & 6 deletions block-streamer/src/rules/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ use near_lake_framework::near_indexer_primitives::{
views::{ActionView, ExecutionStatusView, ReceiptEnumView},
IndexerExecutionOutcomeWithReceipt,
};
use registry_types::{MatchingRule, Status};
use registry_types::{Rule, Status};

use crate::rules::types::Event;

pub fn matches(
matching_rule: &MatchingRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
) -> bool {
match matching_rule {
MatchingRule::ActionAny {
match indexer_rule {
Rule::ActionAny {
affected_account_id,
status,
} => match_action_any(affected_account_id, status, receipt_execution_outcome),
MatchingRule::ActionFunctionCall {
Rule::ActionFunctionCall {
affected_account_id,
status,
function,
Expand All @@ -25,7 +25,7 @@ pub fn matches(
function,
receipt_execution_outcome,
),
MatchingRule::Event {
Rule::Event {
contract_account_id,
event,
standard,
Expand Down
10 changes: 4 additions & 6 deletions block-streamer/src/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ pub mod outcomes_reducer;
pub mod types;

use near_lake_framework::near_indexer_primitives::StreamerMessage;
use registry_types::{MatchingRule, OldIndexerRule as IndexerRule};
use registry_types::Rule;

use types::{ChainId, IndexerRuleMatch};

pub fn reduce_indexer_rule_matches(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> Vec<IndexerRuleMatch> {
match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. }
| MatchingRule::ActionFunctionCall { .. }
| MatchingRule::Event { .. } => {
match &indexer_rule {
Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } | Rule::Event { .. } => {
outcomes_reducer::reduce_indexer_rule_matches_from_outcomes(
indexer_rule,
streamer_message,
Expand Down
86 changes: 27 additions & 59 deletions block-streamer/src/rules/outcomes_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::rules::matcher;
use crate::rules::types::Event;
use crate::rules::types::{ChainId, IndexerRuleMatch, IndexerRuleMatchPayload};
use crate::rules::{IndexerRule, MatchingRule};
use crate::rules::Rule;
use near_lake_framework::near_indexer_primitives::{
IndexerExecutionOutcomeWithReceipt, StreamerMessage,
};

pub fn reduce_indexer_rule_matches_from_outcomes(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> Vec<IndexerRuleMatch> {
Expand All @@ -20,7 +20,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes(
.iter()
// future: when extracting Actions, Events, etc this will be a filter operation
.find(|receipt_execution_outcome| {
matcher::matches(&indexer_rule.matching_rule, receipt_execution_outcome)
matcher::matches(indexer_rule, receipt_execution_outcome)
})
})
.map(|receipt_execution_outcome| {
Expand All @@ -36,16 +36,14 @@ pub fn reduce_indexer_rule_matches_from_outcomes(
}

fn build_indexer_rule_match(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
block_header_hash: String,
block_height: u64,
chain_id: ChainId,
) -> IndexerRuleMatch {
IndexerRuleMatch {
chain_id,
indexer_rule_id: indexer_rule.id,
indexer_rule_name: indexer_rule.name.clone(),
payload: build_indexer_rule_match_payload(
indexer_rule,
receipt_execution_outcome,
Expand All @@ -56,23 +54,23 @@ fn build_indexer_rule_match(
}

fn build_indexer_rule_match_payload(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
block_header_hash: String,
) -> IndexerRuleMatchPayload {
// future enhancement will extract and enrich fields from block & context as
// specified in the indexer function config.
let transaction_hash = None;

match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } => {
match &indexer_rule {
Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } => {
IndexerRuleMatchPayload::Actions {
block_hash: block_header_hash,
receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(),
transaction_hash,
}
}
MatchingRule::Event {
Rule::Event {
event,
standard,
version,
Expand Down Expand Up @@ -115,21 +113,16 @@ fn build_indexer_rule_match_payload(

#[cfg(test)]
mod tests {
use registry_types::{IndexerRuleKind, MatchingRule, OldIndexerRule as IndexerRule, Status};
use registry_types::{Rule, Status};

use crate::rules::outcomes_reducer::reduce_indexer_rule_matches_from_outcomes;
use crate::rules::types::{ChainId, IndexerRuleMatch};

#[tokio::test]
async fn match_wildcard_no_match() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "*.nearcrow.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "*.nearcrow.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -144,14 +137,9 @@ mod tests {

#[tokio::test]
async fn match_wildcard_contract_subaccount_name() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "*.nearcrowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "*.nearcrowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -166,14 +154,9 @@ mod tests {

#[tokio::test]
async fn match_wildcard_mid_contract_name() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "*crowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "*crowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -185,14 +168,9 @@ mod tests {

assert_eq!(result.len(), 1); // see Extraction note in previous test

let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "app.nea*owd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "app.nea*owd.near".to_string(),
status: Status::Success,
};

let result: Vec<IndexerRuleMatch> = reduce_indexer_rule_matches_from_outcomes(
Expand All @@ -206,14 +184,9 @@ mod tests {

#[tokio::test]
async fn match_csv_account() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -228,14 +201,9 @@ mod tests {

#[tokio::test]
async fn match_csv_wildcard_account() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand Down
2 changes: 0 additions & 2 deletions block-streamer/src/rules/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub type BlockHashString = String;
)]
pub struct IndexerRuleMatch {
pub chain_id: ChainId,
pub indexer_rule_id: Option<u32>,
pub indexer_rule_name: Option<String>,
pub payload: IndexerRuleMatchPayload,
pub block_height: u64,
}
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/src/s3_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(test, allow(dead_code))]

const MAX_S3_LIST_REQUESTS: usize = 1000;

#[cfg(test)]
Expand Down
Loading

0 comments on commit 2fc8311

Please sign in to comment.