Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Handle StartBlock options within Coordinator & Block Streamer #553

Merged
merged 28 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2af8245
refactor: Support new registry types in block streamer
morgsmccauley Feb 6, 2024
44a8e55
refactor: Support new registry types in coordinator
morgsmccauley Feb 6, 2024
eda5264
chore: Remove unnecessary `mut`
morgsmccauley Feb 6, 2024
b994a8c
feat: Start block streams using `StartBlock`
morgsmccauley Feb 8, 2024
242128a
feat: Handle block streams post migration
morgsmccauley Feb 8, 2024
378b5c8
refactor: Remove unneeded version suffix from redis stream
morgsmccauley Feb 8, 2024
f1c1e12
refactor: Block stream handling
morgsmccauley Feb 8, 2024
3e7235a
chore: Add logging
morgsmccauley Feb 8, 2024
806547b
chore: Rename executor tests for clarity
morgsmccauley Feb 8, 2024
ad5a52f
fix: Always update redis stream version
morgsmccauley Feb 8, 2024
ece28d0
refactor: Update sync block stream logic
morgsmccauley Feb 8, 2024
38b85b3
test: Ensure assertions are called
morgsmccauley Feb 8, 2024
60e3160
refactor: extract `IndexerConfig` to own mod
morgsmccauley Feb 8, 2024
4e47c79
refactor: Abstract high-level redis methods
morgsmccauley Feb 8, 2024
1d1fbf4
refactor: Break sync stream logic to smaller functions
morgsmccauley Feb 8, 2024
caf3698
refactor: Break executor handling to smaller functions
morgsmccauley Feb 8, 2024
a0943b3
refactor: Extract sync logic to own mod
morgsmccauley Feb 8, 2024
e7fa041
feat: Tidy logs with tracing spans
morgsmccauley Feb 9, 2024
47277a8
refactor: Pass round `IndexerConfig` rather than individual values
morgsmccauley Feb 9, 2024
d1696bc
refactor: Encapsulate handler/sync logic
morgsmccauley Feb 9, 2024
fd31b24
chore: Restructure/tidy logging
morgsmccauley Feb 10, 2024
f0bb4e7
feat: Dont start stream without `last_published_block`
morgsmccauley Feb 11, 2024
28d48fe
feat: Capture errors thrown in sync process
morgsmccauley Feb 11, 2024
aaa8114
fix: Distinguish between new/migrated indexers
morgsmccauley Feb 12, 2024
8eefa86
feat: Print `StartBlock` when starting stream
morgsmccauley Feb 12, 2024
e8fa5c8
feat: Treat stream > registry version as outdated
morgsmccauley Feb 12, 2024
16083a0
fix: Actually delete redis keys
morgsmccauley Feb 13, 2024
d7e5091
fix: Continue block stream from correct height
morgsmccauley Feb 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::task::JoinHandle;

use crate::indexer_config::IndexerConfig;
use crate::rules::types::ChainId;
use registry_types::MatchingRule;
use registry_types::Rule;

/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
Expand Down Expand Up @@ -135,8 +135,8 @@ pub(crate) async fn start_block_stream(
.last_indexed_block
.parse::<near_indexer_primitives::types::BlockHeight>()?;

let blocks_from_index = match &indexer.indexer_rule.matching_rule {
MatchingRule::ActionAny {
let blocks_from_index = match &indexer.rule {
Rule::ActionAny {
affected_account_id,
..
} => {
Expand All @@ -151,11 +151,11 @@ pub(crate) async fn start_block_stream(
.list_matching_block_heights(start_block_height, affected_account_id)
.await
}
MatchingRule::ActionFunctionCall { .. } => {
Rule::ActionFunctionCall { .. } => {
tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])
}
MatchingRule::Event { .. } => {
Rule::Event { .. } => {
tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])
}
Expand Down Expand Up @@ -222,7 +222,7 @@ pub(crate) async fn start_block_stream(
.context("Failed to set last_published_block")?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&indexer.rule,
&streamer_message,
chain_id.clone(),
);
Expand Down Expand Up @@ -294,14 +294,9 @@ mod tests {
)
.unwrap(),
function_name: "test".to_string(),
indexer_rule: registry_types::OldIndexerRule {
indexer_rule_kind: registry_types::IndexerRuleKind::Action,
matching_rule: registry_types::MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
name: None,
id: None,
rule: registry_types::Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
};

Expand Down
8 changes: 2 additions & 6 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ use near_lake_framework::near_indexer_primitives::types::AccountId;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use registry_types::OldIndexerRule as IndexerRule;
use registry_types::Rule;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct IndexerConfig {
pub account_id: AccountId,
pub function_name: String,
// pub code: String,
// pub start_block_height: Option<u64>,
// pub schema: Option<String>,
// pub provisioned: bool,
pub indexer_rule: IndexerRule,
pub rule: Rule,
}

impl IndexerConfig {
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(test, allow(dead_code))]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cargo thinks that the real implementation is "unused", and the mock version is used instead. This suppresses those warnings.


use std::fmt::Debug;

use redis::{aio::ConnectionManager, RedisError, ToRedisArgs};
Expand Down
12 changes: 6 additions & 6 deletions block-streamer/src/rules/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ use near_lake_framework::near_indexer_primitives::{
views::{ActionView, ExecutionStatusView, ReceiptEnumView},
IndexerExecutionOutcomeWithReceipt,
};
use registry_types::{MatchingRule, Status};
use registry_types::{Rule, Status};

use crate::rules::types::Event;

pub fn matches(
matching_rule: &MatchingRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
) -> bool {
match matching_rule {
MatchingRule::ActionAny {
match indexer_rule {
Rule::ActionAny {
affected_account_id,
status,
} => match_action_any(affected_account_id, status, receipt_execution_outcome),
MatchingRule::ActionFunctionCall {
Rule::ActionFunctionCall {
affected_account_id,
status,
function,
Expand All @@ -25,7 +25,7 @@ pub fn matches(
function,
receipt_execution_outcome,
),
MatchingRule::Event {
Rule::Event {
contract_account_id,
event,
standard,
Expand Down
10 changes: 4 additions & 6 deletions block-streamer/src/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ pub mod outcomes_reducer;
pub mod types;

use near_lake_framework::near_indexer_primitives::StreamerMessage;
use registry_types::{MatchingRule, OldIndexerRule as IndexerRule};
use registry_types::Rule;

use types::{ChainId, IndexerRuleMatch};

pub fn reduce_indexer_rule_matches(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> Vec<IndexerRuleMatch> {
match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. }
| MatchingRule::ActionFunctionCall { .. }
| MatchingRule::Event { .. } => {
match &indexer_rule {
Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } | Rule::Event { .. } => {
outcomes_reducer::reduce_indexer_rule_matches_from_outcomes(
indexer_rule,
streamer_message,
Expand Down
86 changes: 27 additions & 59 deletions block-streamer/src/rules/outcomes_reducer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::rules::matcher;
use crate::rules::types::Event;
use crate::rules::types::{ChainId, IndexerRuleMatch, IndexerRuleMatchPayload};
use crate::rules::{IndexerRule, MatchingRule};
use crate::rules::Rule;
use near_lake_framework::near_indexer_primitives::{
IndexerExecutionOutcomeWithReceipt, StreamerMessage,
};

pub fn reduce_indexer_rule_matches_from_outcomes(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> Vec<IndexerRuleMatch> {
Expand All @@ -20,7 +20,7 @@ pub fn reduce_indexer_rule_matches_from_outcomes(
.iter()
// future: when extracting Actions, Events, etc this will be a filter operation
.find(|receipt_execution_outcome| {
matcher::matches(&indexer_rule.matching_rule, receipt_execution_outcome)
matcher::matches(indexer_rule, receipt_execution_outcome)
})
})
.map(|receipt_execution_outcome| {
Expand All @@ -36,16 +36,14 @@ pub fn reduce_indexer_rule_matches_from_outcomes(
}

fn build_indexer_rule_match(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
block_header_hash: String,
block_height: u64,
chain_id: ChainId,
) -> IndexerRuleMatch {
IndexerRuleMatch {
chain_id,
indexer_rule_id: indexer_rule.id,
indexer_rule_name: indexer_rule.name.clone(),
payload: build_indexer_rule_match_payload(
indexer_rule,
receipt_execution_outcome,
Expand All @@ -56,23 +54,23 @@ fn build_indexer_rule_match(
}

fn build_indexer_rule_match_payload(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
block_header_hash: String,
) -> IndexerRuleMatchPayload {
// future enhancement will extract and enrich fields from block & context as
// specified in the indexer function config.
let transaction_hash = None;

match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. } | MatchingRule::ActionFunctionCall { .. } => {
match &indexer_rule {
Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } => {
IndexerRuleMatchPayload::Actions {
block_hash: block_header_hash,
receipt_id: receipt_execution_outcome.receipt.receipt_id.to_string(),
transaction_hash,
}
}
MatchingRule::Event {
Rule::Event {
event,
standard,
version,
Expand Down Expand Up @@ -115,21 +113,16 @@ fn build_indexer_rule_match_payload(

#[cfg(test)]
mod tests {
use registry_types::{IndexerRuleKind, MatchingRule, OldIndexerRule as IndexerRule, Status};
use registry_types::{Rule, Status};

use crate::rules::outcomes_reducer::reduce_indexer_rule_matches_from_outcomes;
use crate::rules::types::{ChainId, IndexerRuleMatch};

#[tokio::test]
async fn match_wildcard_no_match() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "*.nearcrow.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "*.nearcrow.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -144,14 +137,9 @@ mod tests {

#[tokio::test]
async fn match_wildcard_contract_subaccount_name() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "*.nearcrowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "*.nearcrowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -166,14 +154,9 @@ mod tests {

#[tokio::test]
async fn match_wildcard_mid_contract_name() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "*crowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "*crowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -185,14 +168,9 @@ mod tests {

assert_eq!(result.len(), 1); // see Extraction note in previous test

let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "app.nea*owd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "app.nea*owd.near".to_string(),
status: Status::Success,
};

let result: Vec<IndexerRuleMatch> = reduce_indexer_rule_matches_from_outcomes(
Expand All @@ -206,14 +184,9 @@ mod tests {

#[tokio::test]
async fn match_csv_account() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "notintheblockaccount.near, app.nearcrowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand All @@ -228,14 +201,9 @@ mod tests {

#[tokio::test]
async fn match_csv_wildcard_account() {
let wildcard_rule = IndexerRule {
indexer_rule_kind: IndexerRuleKind::Action,
matching_rule: MatchingRule::ActionAny {
affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(),
status: Status::Success,
},
id: None,
name: None,
let wildcard_rule = Rule::ActionAny {
affected_account_id: "notintheblockaccount.near, *.nearcrowd.near".to_string(),
status: Status::Success,
};

let streamer_message = crate::test_utils::get_streamer_message(93085141);
Expand Down
2 changes: 0 additions & 2 deletions block-streamer/src/rules/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ pub type BlockHashString = String;
)]
pub struct IndexerRuleMatch {
pub chain_id: ChainId,
pub indexer_rule_id: Option<u32>,
pub indexer_rule_name: Option<String>,
pub payload: IndexerRuleMatchPayload,
pub block_height: u64,
}
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/src/s3_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(test, allow(dead_code))]

const MAX_S3_LIST_REQUESTS: usize = 1000;

#[cfg(test)]
Expand Down
Loading
Loading