diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index c5114bdb3..815c750f5 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -113,6 +113,15 @@ impl BlockStream { } } +#[tracing::instrument( + skip_all, + fields( + account_id = indexer.account_id.as_str(), + function_name = indexer.function_name, + start_block_height = start_block_height, + redis_stream = redis_stream + ) +)] pub(crate) async fn start_block_stream( start_block_height: near_indexer_primitives::types::BlockHeight, indexer: &IndexerConfig, @@ -123,17 +132,52 @@ pub(crate) async fn start_block_stream( lake_prefetch_size: usize, redis_stream: String, ) -> anyhow::Result<()> { - tracing::info!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, + tracing::info!("Starting block stream",); + + let last_indexed_delta_lake_block = process_delta_lake_blocks( start_block_height, - "Starting block stream", + delta_lake_client, + redis_client.clone(), + indexer, + redis_stream.clone(), + ) + .await?; + + let last_indexed_near_lake_block = process_near_lake_blocks( + last_indexed_delta_lake_block, + lake_s3_config, + lake_prefetch_size, + redis_client, + indexer, + redis_stream, + chain_id, + ) + .await?; + + tracing::debug!( + last_indexed_block = last_indexed_near_lake_block, + "Stopped block stream", ); + Ok(()) +} + +async fn process_delta_lake_blocks( + start_block_height: near_indexer_primitives::types::BlockHeight, + delta_lake_client: std::sync::Arc, + redis_client: std::sync::Arc, + indexer: &IndexerConfig, + redis_stream: String, +) -> anyhow::Result { let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?; - let last_indexed_block = latest_block_metadata + let last_indexed_block_from_metadata = latest_block_metadata .last_indexed_block - .parse::()?; + .parse::() + .context("Failed to parse Delta Lake metadata")?; + + if start_block_height >= last_indexed_block_from_metadata { + return Ok(start_block_height); + } let blocks_from_index = match &indexer.rule { Rule::ActionAny { @@ -141,8 +185,6 @@ pub(crate) async fn start_block_stream( .. } => { tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, "Fetching block heights starting from {} from delta lake", start_block_height, ); @@ -162,8 +204,6 @@ pub(crate) async fn start_block_stream( }?; tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, "Flushing {} block heights from index files to Redis Stream", blocks_from_index.len(), ); @@ -183,30 +223,40 @@ pub(crate) async fn start_block_stream( .context("Failed to set last_published_block")?; } - let mut last_indexed_block = + let last_indexed_block = blocks_from_index .last() - .map_or(last_indexed_block, |&last_block_in_index| { + .map_or(last_indexed_block_from_metadata, |&last_block_in_index| { // Check for the case where index files are written right after we fetch the last_indexed_block metadata - std::cmp::max(last_block_in_index, last_indexed_block) + std::cmp::max(last_block_in_index, last_indexed_block_from_metadata) }); - tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, - "Starting near-lake-framework from {last_indexed_block} for indexer", - ); + Ok(last_indexed_block) +} + +async fn process_near_lake_blocks( + start_block_height: near_indexer_primitives::types::BlockHeight, + lake_s3_config: aws_sdk_s3::Config, + lake_prefetch_size: usize, + redis_client: std::sync::Arc, + indexer: &IndexerConfig, + redis_stream: String, + chain_id: &ChainId, +) -> anyhow::Result { + tracing::debug!(start_block_height, "Starting near-lake-framework",); let lake_config = match &chain_id { ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(), ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(), } .s3_config(lake_s3_config) - .start_block_height(last_indexed_block) + .start_block_height(start_block_height) .blocks_preload_pool_size(lake_prefetch_size) .build() .context("Failed to build lake config")?; + let mut last_indexed_block = start_block_height; + let (sender, mut stream) = near_lake_framework::streamer(lake_config); while let Some(streamer_message) = stream.recv().await { @@ -240,14 +290,7 @@ pub(crate) async fn start_block_stream( drop(sender); - tracing::debug!( - account_id = indexer.account_id.as_str(), - function_name = indexer.function_name, - "Stopped block stream at {}", - last_indexed_block, - ); - - Ok(()) + Ok(last_indexed_block) } #[cfg(test)] diff --git a/block-streamer/src/delta_lake_client.rs b/block-streamer/src/delta_lake_client.rs index ae02d6c20..ed0a0d542 100644 --- a/block-streamer/src/delta_lake_client.rs +++ b/block-streamer/src/delta_lake_client.rs @@ -246,6 +246,7 @@ impl DeltaLakeClientImpl { } }) .flat_map(|index_file| index_file.heights) + .filter(|block_height| *block_height >= start_block_height) .collect(); let pattern_has_multiple_contracts = contract_pattern.chars().any(|c| c == ',' || c == '*'); @@ -260,7 +261,6 @@ impl DeltaLakeClientImpl { contract_pattern, ); - // TODO Remove all block heights after start_block_height Ok(block_heights) } } @@ -637,10 +637,42 @@ mod tests { assert_eq!( block_heights, - vec![45894617, 45894627, 45894628, 45894712, 45898413, 45898423, 45898424] + vec![45894628, 45894712, 45898413, 45898423, 45898424] ) } + #[tokio::test] + async fn filters_heights_less_than_start_block() { + let mut mock_s3_client = crate::s3_client::S3Client::default(); + + mock_s3_client + .expect_get_text_file() + .with( + predicate::eq("near-lake-data-mainnet"), + predicate::eq("000045898423/block.json"), + ) + .returning(|_bucket, _prefix| Ok(generate_block_with_timestamp("2021-05-26"))); + mock_s3_client.expect_list_all_objects().returning(|_, _| { + Ok(vec![ + "silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json" + .to_string(), + ]) + }); + mock_s3_client + .expect_get_text_file() + .with(predicate::eq(DELTA_LAKE_BUCKET.to_string()), predicate::eq("silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json".to_string())) + .returning(|_bucket, _prefix| Ok("{\"heights\":[45898424,45898423,45898413,45894712],\"actions\":[{\"action_kind\":\"ADD_KEY\",\"block_heights\":[104616819]}]}".to_string())); + + let delta_lake_client = DeltaLakeClientImpl::new(mock_s3_client); + + let block_heights = delta_lake_client + .list_matching_block_heights(45898423, "keypom.near, hackathon.agency.near") + .await + .unwrap(); + + assert_eq!(block_heights, vec![45898423, 45898424]) + } + #[tokio::test] async fn gets_the_date_of_the_closest_block() { let mut mock_s3_client = crate::s3_client::S3Client::default();