Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Count S3 get requests made by near-lake-framework #662

Merged
merged 6 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,477 changes: 1,182 additions & 295 deletions block-streamer/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2021"
actix-web = "4.5.1"
anyhow = "1.0.57"
async-trait = "0.1.74"
aws-config = { version = "1.0.0", features = ["behavior-version-latest"]}
aws-sdk-s3 = "0.39.1"
aws-config = { version = "1.1.3", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.13.0"
borsh = "0.10.2"
chrono = "0.4.25"
futures = "0.3.5"
Expand All @@ -30,7 +30,7 @@ wildmatch = "2.1.1"

registry-types = { path = "../registry/types", features = ["near-primitives"] }

near-lake-framework = "=0.7.4"
near-lake-framework = { git = "https://github.com/near/near-lake-framework-rs", branch = "feat/custom-s3-client" }
morgsmccauley marked this conversation as resolved.
Show resolved Hide resolved

[build-dependencies]
tonic-build = "0.10"
Expand Down
47 changes: 35 additions & 12 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl BlockStream {
start_block_height: near_indexer_primitives::types::BlockHeight,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_config: aws_sdk_s3::Config,
lake_s3_client: crate::lake_s3_client::LakeS3Client,
) -> anyhow::Result<()> {
if self.task.is_some() {
return Err(anyhow::anyhow!("BlockStreamer has already been started",));
Expand Down Expand Up @@ -76,7 +76,7 @@ impl BlockStream {
&indexer_config,
redis_client,
delta_lake_client,
lake_s3_config,
lake_s3_client,
&chain_id,
LAKE_PREFETCH_SIZE,
redis_stream
Expand Down Expand Up @@ -116,6 +116,7 @@ impl BlockStream {
}
}

#[allow(clippy::too_many_arguments)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this call out an indication that we need to rethink the function declaration? Either encapsulate some or all of these into some BlockStreamContext, or roll some of these into each other (redis stream does into IndexerConfig, Lake Framework stream created before being passed into start_block_stream, etc)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - this suppresses the error which I was getting sick of. BlockStreamContext is a good idea.

#[tracing::instrument(
skip_all,
fields(
Expand All @@ -130,7 +131,7 @@ pub(crate) async fn start_block_stream(
indexer: &IndexerConfig,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_config: aws_sdk_s3::Config,
lake_s3_client: crate::lake_s3_client::LakeS3Client,
chain_id: &ChainId,
lake_prefetch_size: usize,
redis_stream: String,
Expand All @@ -153,7 +154,7 @@ pub(crate) async fn start_block_stream(

let last_indexed_near_lake_block = process_near_lake_blocks(
last_indexed_delta_lake_block,
lake_s3_config,
lake_s3_client,
lake_prefetch_size,
redis_client,
indexer,
Expand Down Expand Up @@ -250,7 +251,7 @@ async fn process_delta_lake_blocks(

async fn process_near_lake_blocks(
start_block_height: near_indexer_primitives::types::BlockHeight,
lake_s3_config: aws_sdk_s3::Config,
lake_s3_client: crate::lake_s3_client::LakeS3Client,
lake_prefetch_size: usize,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
indexer: &IndexerConfig,
Expand All @@ -263,7 +264,7 @@ async fn process_near_lake_blocks(
ChainId::Mainnet => near_lake_framework::LakeConfigBuilder::default().mainnet(),
ChainId::Testnet => near_lake_framework::LakeConfigBuilder::default().testnet(),
}
.s3_config(lake_s3_config)
.s3_client(lake_s3_client)
.start_block_height(start_block_height)
.blocks_preload_pool_size(lake_prefetch_size)
.build()
Expand Down Expand Up @@ -313,10 +314,31 @@ async fn process_near_lake_blocks(
mod tests {
use super::*;

use std::sync::Arc;

use mockall::predicate;
use near_lake_framework::s3_client::GetObjectBytesError;

// FIX: near lake framework now infinitely retires - we need a way to stop it to allow the test
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to address this in a future PR, will probably need to add a config option to Near Lake which prevents infinite retries.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What change to Lake caused infinite retries?

// to finish
#[ignore]
#[tokio::test]
async fn adds_matching_blocks_from_index_and_lake() {
let mut mock_lake_s3_client = crate::lake_s3_client::LakeS3Client::default();

mock_lake_s3_client
.expect_get_object_bytes()
.returning(|_, prefix| {
let path = format!("{}/data/{}", env!("CARGO_MANIFEST_DIR"), prefix);

std::fs::read(path).map_err(|e| GetObjectBytesError(Arc::new(e)))
});

mock_lake_s3_client
.expect_list_common_prefixes()
.with(predicate::always(), predicate::eq(107503704.to_string()))
.returning(|_, _| Ok(vec![107503704.to_string(), 107503705.to_string()]));

let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
Expand Down Expand Up @@ -372,14 +394,12 @@ mod tests {
},
};

let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

start_block_stream(
91940840,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
lake_s3_config,
mock_lake_s3_client,
&ChainId::Mainnet,
1,
"stream key".to_string(),
Expand All @@ -388,8 +408,13 @@ mod tests {
.unwrap();
}

// FIX: near lake framework now infinitely retires - we need a way to stop it to allow the test
// to finish
#[ignore]
#[tokio::test]
async fn skips_caching_of_lake_block_over_stream_size_limit() {
let mock_lake_s3_client = crate::lake_s3_client::LakeS3Client::default();

let mut mock_delta_lake_client = crate::delta_lake_client::DeltaLakeClient::default();
mock_delta_lake_client
.expect_get_latest_block_metadata()
Expand Down Expand Up @@ -442,14 +467,12 @@ mod tests {
},
};

let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704, 107503705]);

start_block_stream(
107503704,
&indexer_config,
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
lake_s3_config,
mock_lake_s3_client,
&ChainId::Mainnet,
1,
"stream key".to_string(),
Expand Down
108 changes: 108 additions & 0 deletions block-streamer/src/lake_s3_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#![cfg_attr(test, allow(dead_code))]

use async_trait::async_trait;
use near_lake_framework::s3_client::{GetObjectBytesError, ListCommonPrefixesError};

use crate::metrics;

#[cfg(not(test))]
pub use LakeS3ClientImpl as LakeS3Client;
#[cfg(test)]
pub use MockLakeS3Client as LakeS3Client;

#[derive(Clone, Debug)]
pub struct LakeS3ClientImpl {
s3_client: aws_sdk_s3::Client,
}

impl LakeS3ClientImpl {
pub fn new(s3_client: aws_sdk_s3::Client) -> Self {
Self { s3_client }
}

pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self {
let s3_client = aws_sdk_s3::Client::from_conf(config);

Self::new(s3_client)
}
}

#[async_trait]
impl near_lake_framework::s3_client::S3Client for LakeS3ClientImpl {
async fn get_object_bytes(
&self,
bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError> {
metrics::LAKE_S3_GET_REQUEST_COUNT.inc();

let object = self
.s3_client
.get_object()
.bucket(bucket)
.key(prefix)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.send()
.await?;

let bytes = object.body.collect().await?.into_bytes().to_vec();

Ok(bytes)
}

async fn list_common_prefixes(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this function used for? Just testing or something else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the S3Client trait exposed by near_lake_framework. Under the hood it's used to list block heights, and was originally added to mock those heights returned. We don't do anything with it now, but still need to supply it.

We may want to add caching here too, but it's a bit more complicated because the arguments will likely vary widely across all near_lake_framework instances.

&self,
bucket: &str,
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError> {
let response = self
.s3_client
.list_objects_v2()
.max_keys(1000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a value we set somewhere previously or was 1000 the default by the API?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default from the API, essentially just a copy paste from near_lake_framework code.

.delimiter("/".to_string())
.start_after(start_after_prefix)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.bucket(bucket)
.send()
.await?;

let prefixes = match response.common_prefixes {
None => vec![],
Some(common_prefixes) => common_prefixes
.into_iter()
.filter_map(|common_prefix| common_prefix.prefix)
.collect::<Vec<String>>()
.into_iter()
.filter_map(|prefix_string| prefix_string.split('/').next().map(String::from))
.collect(),
};

Ok(prefixes)
}
}

#[cfg(test)]
mockall::mock! {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unable to use usual #[automock] due to the multiple impl blocks and trait implementation, so have to use this custom mock! block. The end result is essentially the same.

pub LakeS3Client {
pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self;
}

#[async_trait]
impl near_lake_framework::s3_client::S3Client for LakeS3Client {
async fn get_object_bytes(
&self,
bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError>;

async fn list_common_prefixes(
&self,
bucket: &str,
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError>;
}

impl Clone for LakeS3Client {
fn clone(&self) -> Self;
}
}
5 changes: 4 additions & 1 deletion block-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tracing_subscriber::prelude::*;
mod block_stream;
mod delta_lake_client;
mod indexer_config;
mod lake_s3_client;
mod metrics;
mod redis;
mod rules;
Expand Down Expand Up @@ -50,9 +51,11 @@ async fn main() -> anyhow::Result<()> {
let delta_lake_client =
std::sync::Arc::new(crate::delta_lake_client::DeltaLakeClient::new(s3_client));

let lake_s3_client = crate::lake_s3_client::LakeS3Client::from_conf(s3_config);

tokio::spawn(metrics::init_server(metrics_port).expect("Failed to start metrics server"));

server::init(&grpc_port, redis_client, delta_lake_client, s3_config).await?;
server::init(&grpc_port, redis_client, delta_lake_client, lake_s3_client).await?;

Ok(())
}
8 changes: 7 additions & 1 deletion block-streamer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use actix_web::{get, App, HttpServer, Responder};
use lazy_static::lazy_static;
use prometheus::{
register_int_counter_vec, register_int_gauge_vec, Encoder, IntCounterVec, IntGaugeVec,
register_int_counter, register_int_counter_vec, register_int_gauge_vec, Encoder, IntCounter,
IntCounterVec, IntGaugeVec,
};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

lazy_static! {
pub static ref LAKE_S3_GET_REQUEST_COUNT: IntCounter = register_int_counter!(
"queryapi_block_streamer_lake_s3_get_request_count",
"Number of requests made to S3 from near lake framework",
)
.unwrap();
pub static ref LAST_PROCESSED_BLOCK: IntGaugeVec = register_int_gauge_vec!(
"queryapi_block_streamer_last_processed_block",
"Height of last block seen",
Expand Down
5 changes: 3 additions & 2 deletions block-streamer/src/rules/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ fn match_account(
.split(',')
.any(|sub_account_id| match_account(sub_account_id.trim(), outcome_with_receipt)),
_ => {
wildmatch::WildMatch::new(account_id).matches(&outcome_with_receipt.receipt.receiver_id)
wildmatch::WildMatch::new(account_id)
.matches(outcome_with_receipt.receipt.receiver_id.as_str())
|| wildmatch::WildMatch::new(account_id)
.matches(&outcome_with_receipt.receipt.predecessor_id)
.matches(outcome_with_receipt.receipt.predecessor_id.as_str())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest near-lake-framework changed these to AccountId from String

}
}
}
Expand Down
15 changes: 9 additions & 6 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use blockstreamer::*;
pub struct BlockStreamerService {
redis_client: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_config: aws_sdk_s3::Config,
lake_s3_client: crate::lake_s3_client::LakeS3Client,
chain_id: ChainId,
block_streams: Mutex<HashMap<String, block_stream::BlockStream>>,
}
Expand All @@ -24,12 +24,12 @@ impl BlockStreamerService {
pub fn new(
redis_client: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_config: aws_sdk_s3::Config,
lake_s3_client: crate::lake_s3_client::LakeS3Client,
) -> Self {
Self {
redis_client,
delta_lake_client,
lake_s3_config,
lake_s3_client,
chain_id: ChainId::Mainnet,
block_streams: Mutex::new(HashMap::new()),
}
Expand Down Expand Up @@ -115,7 +115,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
request.start_block_height,
self.redis_client.clone(),
self.delta_lake_client.clone(),
self.lake_s3_config.clone(),
self.lake_s3_client.clone(),
)
.map_err(|_| Status::internal("Failed to start block stream"))?;

Expand Down Expand Up @@ -211,12 +211,15 @@ mod tests {
.expect_xadd::<String, u64>()
.returning(|_, _| Ok(()));

let lake_s3_config = crate::test_utils::create_mock_lake_s3_config(&[107503704]);
let mut mock_lake_s3_client = crate::lake_s3_client::LakeS3Client::default();
mock_lake_s3_client
.expect_clone()
.returning(crate::lake_s3_client::LakeS3Client::default);

BlockStreamerService::new(
std::sync::Arc::new(mock_redis_client),
std::sync::Arc::new(mock_delta_lake_client),
lake_s3_config,
mock_lake_s3_client,
)
}

Expand Down
4 changes: 2 additions & 2 deletions block-streamer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub async fn init(
port: &str,
redis_client: std::sync::Arc<crate::redis::RedisClient>,
delta_lake_client: std::sync::Arc<crate::delta_lake_client::DeltaLakeClient>,
lake_s3_config: aws_sdk_s3::Config,
lake_s3_client: crate::lake_s3_client::LakeS3Client,
) -> anyhow::Result<()> {
let addr = format!("0.0.0.0:{}", port).parse()?;

Expand All @@ -17,7 +17,7 @@ pub async fn init(
let block_streamer_service = block_streamer_service::BlockStreamerService::new(
redis_client,
delta_lake_client,
lake_s3_config,
lake_s3_client,
);

let block_streamer_server =
Expand Down
Loading