Skip to content

Commit

Permalink
fix: Various block streamer issues (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Feb 14, 2024
1 parent 2fc8311 commit abcc48e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 29 deletions.
97 changes: 70 additions & 27 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ impl BlockStream {
}
}

#[tracing::instrument(
skip_all,
fields(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
start_block_height = start_block_height,
redis_stream = redis_stream
)
)]
pub(crate) async fn start_block_stream(
start_block_height: near_indexer_primitives::types::BlockHeight,
indexer: &IndexerConfig,
Expand All @@ -123,26 +132,59 @@ pub(crate) async fn start_block_stream(
lake_prefetch_size: usize,
redis_stream: String,
) -> anyhow::Result<()> {
tracing::info!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
tracing::info!("Starting block stream",);

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
"Starting block stream",
delta_lake_client,
redis_client.clone(),
indexer,
redis_stream.clone(),
)
.await?;

let last_indexed_near_lake_block = process_near_lake_blocks(
last_indexed_delta_lake_block,
lake_s3_config,
lake_prefetch_size,
redis_client,
indexer,
redis_stream,
chain_id,
)
.await?;

tracing::debug!(
last_indexed_block = last_indexed_near_lake_block,
"Stopped block stream",
);

Ok(())
}

async fn process_delta_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
) -> anyhow::Result<u64> {
let latest_block_metadata = delta_lake_client.get_latest_block_metadata().await?;
let last_indexed_block = latest_block_metadata
let last_indexed_block_from_metadata = latest_block_metadata
.last_indexed_block
.parse::<near_indexer_primitives::types::BlockHeight>()?;
.parse::<near_indexer_primitives::types::BlockHeight>()
.context("Failed to parse Delta Lake metadata")?;

if start_block_height >= last_indexed_block_from_metadata {
return Ok(start_block_height);
}

let blocks_from_index = match &indexer.rule {
Rule::ActionAny {
affected_account_id,
..
} => {
tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Fetching block heights starting from {} from delta lake",
start_block_height,
);
Expand All @@ -162,8 +204,6 @@ pub(crate) async fn start_block_stream(
}?;

tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Flushing {} block heights from index files to Redis Stream",
blocks_from_index.len(),
);
Expand All @@ -183,30 +223,40 @@ pub(crate) async fn start_block_stream(
.context("Failed to set last_published_block")?;
}

let mut last_indexed_block =
let last_indexed_block =
blocks_from_index
.last()
.map_or(last_indexed_block, |&last_block_in_index| {
.map_or(last_indexed_block_from_metadata, |&last_block_in_index| {
// Check for the case where index files are written right after we fetch the last_indexed_block metadata
std::cmp::max(last_block_in_index, last_indexed_block)
std::cmp::max(last_block_in_index, last_indexed_block_from_metadata)
});

tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Starting near-lake-framework from {last_indexed_block} for indexer",
);
Ok(last_indexed_block)
}

async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_config: aws_sdk_s3::Config,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
redis_stream: String,
chain_id: &ChainId,
) -> anyhow::Result<u64> {
tracing::debug!(start_block_height, "Starting near-lake-framework",);

let lake_config = match &chain_id {
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.s3_config(lake_s3_config)
.start_block_height(last_indexed_block)
.start_block_height(start_block_height)
.blocks_preload_pool_size(lake_prefetch_size)
.build()
.context("Failed to build lake config")?;

let mut last_indexed_block = start_block_height;

let (sender, mut stream) = near_lake_framework::streamer(lake_config);

while let Some(streamer_message) = stream.recv().await {
Expand Down Expand Up @@ -240,14 +290,7 @@ pub(crate) async fn start_block_stream(

drop(sender);

tracing::debug!(
account_id = indexer.account_id.as_str(),
function_name = indexer.function_name,
"Stopped block stream at {}",
last_indexed_block,
);

Ok(())
Ok(last_indexed_block)
}

#[cfg(test)]
Expand Down
36 changes: 34 additions & 2 deletions block-streamer/src/delta_lake_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl DeltaLakeClientImpl {
}
})
.flat_map(|index_file| index_file.heights)
.filter(|block_height| *block_height >= start_block_height)
.collect();

let pattern_has_multiple_contracts = contract_pattern.chars().any(|c| c == ',' || c == '*');
Expand All @@ -260,7 +261,6 @@ impl DeltaLakeClientImpl {
contract_pattern,
);

// TODO Remove all block heights after start_block_height
Ok(block_heights)
}
}
Expand Down Expand Up @@ -637,10 +637,42 @@ mod tests {

assert_eq!(
block_heights,
vec![45894617, 45894627, 45894628, 45894712, 45898413, 45898423, 45898424]
vec![45894628, 45894712, 45898413, 45898423, 45898424]
)
}

#[tokio::test]
async fn filters_heights_less_than_start_block() {
let mut mock_s3_client = crate::s3_client::S3Client::default();

mock_s3_client
.expect_get_text_file()
.with(
predicate::eq("near-lake-data-mainnet"),
predicate::eq("000045898423/block.json"),
)
.returning(|_bucket, _prefix| Ok(generate_block_with_timestamp("2021-05-26")));
mock_s3_client.expect_list_all_objects().returning(|_, _| {
Ok(vec![
"silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json"
.to_string(),
])
});
mock_s3_client
.expect_get_text_file()
.with(predicate::eq(DELTA_LAKE_BUCKET.to_string()), predicate::eq("silver/accounts/action_receipt_actions/metadata/near/keypom/2023-10-31.json".to_string()))
.returning(|_bucket, _prefix| Ok("{\"heights\":[45898424,45898423,45898413,45894712],\"actions\":[{\"action_kind\":\"ADD_KEY\",\"block_heights\":[104616819]}]}".to_string()));

let delta_lake_client = DeltaLakeClientImpl::new(mock_s3_client);

let block_heights = delta_lake_client
.list_matching_block_heights(45898423, "keypom.near, hackathon.agency.near")
.await
.unwrap();

assert_eq!(block_heights, vec![45898423, 45898424])
}

#[tokio::test]
async fn gets_the_date_of_the_closest_block() {
let mut mock_s3_client = crate::s3_client::S3Client::default();
Expand Down

0 comments on commit abcc48e

Please sign in to comment.