Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prod Release 15/02/24 #561

Merged
merged 12 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 79 additions & 41 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::task::JoinHandle;

use crate::indexer_config::IndexerConfig;
use crate::rules::types::ChainId;
use registry_types::MatchingRule;
use registry_types::Rule;

/// The number of blocks to prefetch within `near-lake-framework`. The internal default is 100, but
/// we need this configurable for testing purposes.
Expand Down Expand Up @@ -113,6 +113,15 @@ impl BlockStream {
}
}

#[tracing::instrument(
skip_all,
fields(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
start_block_height = start_block_height,
redis_stream = redis_stream
)
)]
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
Expand All @@ -123,26 +132,59 @@ pub(crate) async fn start_block_stream(
lake_prefetch_size: usize,
redis_stream: String,
) -> anyhow::Result<()> {
tracing::info!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
tracing::info!("Starting block stream",);

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
"Starting block stream",
delta_lake_client,
redis_client.clone(),
indexer,
redis_stream.clone(),
)
.await?;

let last_indexed_near_lake_block = process_near_lake_blocks(
last_indexed_delta_lake_block,
lake_s3_config,
lake_prefetch_size,
redis_client,
indexer,
redis_stream,
chain_id,
)
.await?;

tracing::debug!(
last_indexed_block = last_indexed_near_lake_block,
"Stopped block stream",
);

Ok(())
}

async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?;
let last_indexed_block = latest_block_metadata
let last_indexed_block_from_metadata = latest_block_metadata
.last_indexed_block
.parse::<near_indexer_primitives::types::BlockHeight>()?;
.parse::<near_indexer_primitives::types::BlockHeight>()
.context("Failed to parse Delta Lake metadata")?;

let blocks_from_index = match &indexer.indexer_rule.matching_rule {
MatchingRule::ActionAny {
if start_block_height >= last_indexed_block_from_metadata {
return Ok(start_block_height);
}

let blocks_from_index = match &indexer.rule {
Rule::ActionAny {
affected_account_id,
..
} => {
tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Fetching block heights starting from {} from delta lake",
start_block_height,
);
Expand All @@ -151,19 +193,17 @@ pub(crate) async fn start_block_stream(
.list_matching_block_heights(start_block_height, affected_account_id)
.await
}
MatchingRule::ActionFunctionCall { .. } => {
Rule::ActionFunctionCall { .. } => {
tracing::error!("ActionFunctionCall matching rule not yet supported for delta lake processing, function: {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])
}
MatchingRule::Event { .. } => {
Rule::Event { .. } => {
tracing::error!("Event matching rule not yet supported for delta lake processing, function {:?} {:?}", indexer.account_id, indexer.function_name);
Ok(vec![])
}
}?;

tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Flushing {} block heights from index files to Redis Stream",
blocks_from_index.len(),
);
Expand All @@ -183,30 +223,40 @@ pub(crate) async fn start_block_stream(
.context("Failed to set last_published_block")?;
}

let mut last_indexed_block =
let last_indexed_block =
blocks_from_index
.last()
.map_or(last_indexed_block, |&last_block_in_index| {
.map_or(last_indexed_block_from_metadata, |&last_block_in_index| {
// Check for the case where index files are written right after we fetch the last_indexed_block metadata
std::cmp::max(last_block_in_index, last_indexed_block)
std::cmp::max(last_block_in_index, last_indexed_block_from_metadata)
});

tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Starting near-lake-framework from {last_indexed_block} for indexer",
);
Ok(last_indexed_block)
}

async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_config: aws_sdk_s3::Config,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
) -> anyhow::Result<u64> {
tracing::debug!(start_block_height, "Starting near-lake-framework",);

let lake_config = match &chain_id {
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.s3_config(lake_s3_config)
.start_block_height(last_indexed_block)
.start_block_height(start_block_height)
.blocks_preload_pool_size(lake_prefetch_size)
.build()
.context("Failed to build lake config")?;

let mut last_indexed_block = start_block_height;

let (sender, mut stream) = near_lake_framework::streamer(lake_config);

while let Some(streamer_message) = stream.recv().await {
Expand All @@ -222,7 +272,7 @@ pub(crate) async fn start_block_stream(
.context("Failed to set last_published_block")?;

let matches = crate::rules::reduce_indexer_rule_matches(
&indexer.indexer_rule,
&indexer.rule,
&streamer_message,
chain_id.clone(),
);
Expand All @@ -240,14 +290,7 @@ pub(crate) async fn start_block_stream(

drop(sender);

tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Stopped block stream at {}",
last_indexed_block,
);

Ok(())
Ok(last_indexed_block)
}

#[cfg(test)]
Expand Down Expand Up @@ -294,14 +337,9 @@ mod tests {
)
.unwrap(),
function_name: "test".to_string(),
indexer_rule: registry_types::IndexerRule {
indexer_rule_kind: registry_types::IndexerRuleKind::Action,
matching_rule: registry_types::MatchingRule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
name: None,
id: None,
rule: registry_types::Rule::ActionAny {
affected_account_id: "queryapi.dataplatform.near".to_string(),
status: registry_types::Status::Success,
},
};

Expand Down
36 changes: 34 additions & 2 deletions block-streamer/src/delta_lake_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl DeltaLakeClientImpl {
}
})
.flat_map(|index_file| index_file.heights)
.filter(|block_height| *block_height >= start_block_height)
.collect();

let pattern_has_multiple_contracts = contract_pattern.chars().any(|c| c == ',' || c == '*');
Expand All @@ -260,7 +261,6 @@ impl DeltaLakeClientImpl {
contract_pattern,
);

// TODO Remove all block heights after start_block_height
Ok(block_heights)
}
}
Expand Down Expand Up @@ -637,10 +637,42 @@ mod tests {

assert_eq!(
block_heights,
vec![45894617, 45894627, 45894628, 45894712, 45898413, 45898423, 45898424]
vec![45894628, 45894712, 45898413, 45898423, 45898424]
)
}

#[tokio::test]
async fn filters_heights_less_than_start_block() {
let mut mock_s3_client = crate::s3_client::S3Client::default();

mock_s3_client
.expect_get_text_file()
.with(
predicate::eq("near-lake-data-mainnet"),
predicate::eq("000045898423/block.json"),
)
.returning(|_bucket, _prefix| Ok(generate_block_with_timestamp("2021-05-26")));
mock_s3_client.expect_list_all_objects().returning(|_, _| {
Ok(vec![
"silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json"
.to_string(),
])
});
mock_s3_client
.expect_get_text_file()
.with(predicate::eq(DELTA_LAKE_BUCKET.to_string()), predicate::eq("silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json".to_string()))
.returning(|_bucket, _prefix| Ok("{\"heights\":[45898424,45898423,45898413,45894712],\"actions\":[{\"action_kind\":\"ADD_KEY\",\"block_heights\":[104616819]}]}".to_string()));

let delta_lake_client = DeltaLakeClientImpl::new(mock_s3_client);

let block_heights = delta_lake_client
.list_matching_block_heights(45898423, "keypom.near, hackathon.agency.near")
.await
.unwrap();

assert_eq!(block_heights, vec![45898423, 45898424])
}

#[tokio::test]
async fn gets_the_date_of_the_closest_block() {
let mut mock_s3_client = crate::s3_client::S3Client::default();
Expand Down
8 changes: 2 additions & 6 deletions block-streamer/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ use near_lake_framework::near_indexer_primitives::types::AccountId;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};

use registry_types::IndexerRule;
use registry_types::Rule;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct IndexerConfig {
pub account_id: AccountId,
pub function_name: String,
// pub code: String,
// pub start_block_height: Option<u64>,
// pub schema: Option<String>,
// pub provisioned: bool,
pub indexer_rule: IndexerRule,
pub rule: Rule,
}

impl IndexerConfig {
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/src/redis.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![cfg_attr(test, allow(dead_code))]

use std::fmt::Debug;

use redis::{aio::ConnectionManager, RedisError, ToRedisArgs};
Expand Down
12 changes: 6 additions & 6 deletions block-streamer/src/rules/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ use near_lake_framework::near_indexer_primitives::{
views::{ActionView, ExecutionStatusView, ReceiptEnumView},
IndexerExecutionOutcomeWithReceipt,
};
use registry_types::{MatchingRule, Status};
use registry_types::{Rule, Status};

use crate::rules::types::Event;

pub fn matches(
matching_rule: &MatchingRule,
indexer_rule: &Rule,
receipt_execution_outcome: &IndexerExecutionOutcomeWithReceipt,
) -> bool {
match matching_rule {
MatchingRule::ActionAny {
match indexer_rule {
Rule::ActionAny {
affected_account_id,
status,
} => match_action_any(affected_account_id, status, receipt_execution_outcome),
MatchingRule::ActionFunctionCall {
Rule::ActionFunctionCall {
affected_account_id,
status,
function,
Expand All @@ -25,7 +25,7 @@ pub fn matches(
function,
receipt_execution_outcome,
),
MatchingRule::Event {
Rule::Event {
contract_account_id,
event,
standard,
Expand Down
10 changes: 4 additions & 6 deletions block-streamer/src/rules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ pub mod outcomes_reducer;
pub mod types;

use near_lake_framework::near_indexer_primitives::StreamerMessage;
use registry_types::{IndexerRule, MatchingRule};
use registry_types::Rule;

use types::{ChainId, IndexerRuleMatch};

pub fn reduce_indexer_rule_matches(
indexer_rule: &IndexerRule,
indexer_rule: &Rule,
streamer_message: &StreamerMessage,
chain_id: ChainId,
) -> Vec<IndexerRuleMatch> {
match &indexer_rule.matching_rule {
MatchingRule::ActionAny { .. }
| MatchingRule::ActionFunctionCall { .. }
| MatchingRule::Event { .. } => {
match &indexer_rule {
Rule::ActionAny { .. } | Rule::ActionFunctionCall { .. } | Rule::Event { .. } => {
outcomes_reducer::reduce_indexer_rule_matches_from_outcomes(
indexer_rule,
streamer_message,
Expand Down
Loading
Loading