diff --git a/block-streamer/Cargo.lock b/block-streamer/Cargo.lock index c3115a7c1..2c01070ef 100644 --- a/block-streamer/Cargo.lock +++ b/block-streamer/Cargo.lock @@ -967,6 +967,7 @@ dependencies = [ "aws-smithy-runtime", "aws-smithy-types", "borsh 0.10.3", + "cached", "chrono", "futures", "http 0.2.12", @@ -1142,6 +1143,39 @@ dependencies = [ "ppv-lite86", ] +[[package]] +name = "cached" +version = "0.49.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8e463fceca5674287f32d252fb1d94083758b8709c160efae66d263e5f4eba" +dependencies = [ + "ahash", + "cached_proc_macro", + "cached_proc_macro_types", + "hashbrown 0.14.3", + "instant", + "once_cell", + "thiserror", +] + +[[package]] +name = "cached_proc_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad9f16c0d84de31a2ab7fdf5f7783c14631f7075cf464eb3bb43119f61c9cb2a" +dependencies = [ + "darling 0.14.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "cached_proc_macro_types" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade8366b8bd5ba243f0a58f036cc0ca8a2f069cff1a2351ef1cac6b083e16fc0" + [[package]] name = "cc" version = "1.0.94" @@ -2252,6 +2286,15 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + [[package]] name = "itertools" version = "0.10.5" diff --git a/block-streamer/Cargo.toml b/block-streamer/Cargo.toml index 5b627e69d..d7f61d772 100644 --- a/block-streamer/Cargo.toml +++ b/block-streamer/Cargo.toml @@ -10,6 +10,7 @@ async-trait = "0.1.74" aws-config = { version = "1.1.3", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.13.0" borsh = "0.10.2" +cached = "0.49.3" chrono = "0.4.25" futures = "0.3.5" lazy_static = "1.4.0" diff --git a/block-streamer/examples/start_stream.rs b/block-streamer/examples/start_stream.rs index 922219ac3..0adf9d8b4 100644 --- a/block-streamer/examples/start_stream.rs +++ b/block-streamer/examples/start_stream.rs @@ -20,6 +20,61 @@ async fn main() -> Result<(), Box> { })), })) .await?; + let response = client + .start_stream(Request::new(StartStreamRequest { + start_block_height: 106700000, + account_id: "morgs.near".to_string(), + function_name: "test1".to_string(), + version: 0, + redis_stream: "morgs.near/test1:block_stream".to_string(), + rule: Some(Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "social.near".to_string(), + status: Status::Success.into(), + })), + })) + .await?; + + let response = client + .start_stream(Request::new(StartStreamRequest { + start_block_height: 106700000, + account_id: "morgs.near".to_string(), + function_name: "test2".to_string(), + version: 0, + redis_stream: "morgs.near/test2:block_stream".to_string(), + rule: Some(Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "social.near".to_string(), + status: Status::Success.into(), + })), + })) + .await?; + + let response = client + .start_stream(Request::new(StartStreamRequest { + start_block_height: 106700000, + account_id: "morgs.near".to_string(), + function_name: "test3".to_string(), + version: 0, + redis_stream: "morgs.near/test3:block_stream".to_string(), + rule: Some(Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "social.near".to_string(), + status: Status::Success.into(), + })), + })) + .await?; + + let response = client + .start_stream(Request::new(StartStreamRequest { + start_block_height: 106700000, + account_id: "morgs.near".to_string(), + function_name: "test4".to_string(), + version: 0, + redis_stream: "morgs.near/test4:block_stream".to_string(), + rule: Some(Rule::ActionAnyRule(ActionAnyRule { + affected_account_id: "social.near".to_string(), + status: Status::Success.into(), + })), + })) + .await?; println!("{:#?}", response.into_inner()); diff --git a/block-streamer/src/block_stream.rs b/block-streamer/src/block_stream.rs index ee403a235..42f0c4ea6 100644 --- a/block-streamer/src/block_stream.rs +++ b/block-streamer/src/block_stream.rs @@ -142,18 +142,18 @@ pub(crate) async fn start_block_stream( .with_label_values(&[&indexer.get_full_name()]) .reset(); - let last_indexed_delta_lake_block = process_delta_lake_blocks( - start_block_height, - delta_lake_client, - redis_client.clone(), - indexer, - redis_stream.clone(), - ) - .await - .context("Failed during Delta Lake processing")?; + // let last_indexed_delta_lake_block = process_delta_lake_blocks( + // start_block_height, + // delta_lake_client, + // redis_client.clone(), + // indexer, + // redis_stream.clone(), + // ) + // .await + // .context("Failed during Delta Lake processing")?; let last_indexed_near_lake_block = process_near_lake_blocks( - last_indexed_delta_lake_block, + start_block_height, lake_s3_client, lake_prefetch_size, redis_client, diff --git a/block-streamer/src/lake_s3_client.rs b/block-streamer/src/lake_s3_client.rs index df1251d3f..fc3152f2c 100644 --- a/block-streamer/src/lake_s3_client.rs +++ b/block-streamer/src/lake_s3_client.rs @@ -1,17 +1,20 @@ #![cfg_attr(test, allow(dead_code))] -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use cached::{Cached, SizedCache}; use futures::future::Shared; use futures::{Future, FutureExt}; use near_lake_framework::s3_client::{GetObjectBytesError, ListCommonPrefixesError}; -use tokio::sync::RwLock; +use tokio::sync::Mutex; use crate::metrics; +/// Number of files added to Near Lake S3 per hour +const CACHE_SIZE: usize = 18_000; + #[cfg(test)] pub use MockSharedLakeS3ClientImpl as SharedLakeS3Client; #[cfg(not(test))] @@ -61,18 +64,50 @@ impl near_lake_framework::s3_client::S3Client for SharedLakeS3ClientImpl { } } +#[derive(Debug)] +struct FuturesCache { + cache: Mutex>, +} + +impl FuturesCache { + pub fn with_size(size: usize) -> Self { + Self { + cache: Mutex::new(SizedCache::with_size(size)), + } + } + + #[cfg(test)] + pub async fn get(&self, key: &str) -> Option { + let mut cache = self.cache.lock().await; + cache.cache_get(key).cloned() + } + + pub async fn get_or_set_with( + &self, + key: String, + f: impl FnOnce() -> SharedGetObjectBytesFuture, + ) -> SharedGetObjectBytesFuture { + let mut cache = self.cache.lock().await; + cache.cache_get_or_set_with(key, f).clone() + } + + pub async fn remove(&self, key: &str) { + let mut cache = self.cache.lock().await; + cache.cache_remove(key); + } +} + #[derive(Debug)] pub struct LakeS3Client { s3_client: crate::s3_client::S3Client, - // TODO use a more efficient cache - futures_cache: RwLock>, + futures_cache: FuturesCache, } impl LakeS3Client { pub fn new(s3_client: crate::s3_client::S3Client) -> Self { Self { s3_client, - futures_cache: RwLock::new(HashMap::new()), + futures_cache: FuturesCache::with_size(CACHE_SIZE), } } @@ -82,7 +117,7 @@ impl LakeS3Client { Self::new(s3_client) } - fn get_object_bytes(&self, bucket: &str, prefix: &str) -> GetObjectBytesFuture { + fn get_object_bytes_shared(&self, bucket: &str, prefix: &str) -> SharedGetObjectBytesFuture { let s3_client = self.s3_client.clone(); let bucket = bucket.to_owned(); let prefix = prefix.to_owned(); @@ -97,32 +132,21 @@ impl LakeS3Client { Ok(bytes) } .boxed() + .shared() } async fn get_object_bytes_cached(&self, bucket: &str, prefix: &str) -> GetObjectBytesResult { - let existing_future = { - let futures_cache = self.futures_cache.read().await; - - futures_cache.get(prefix).cloned() - }; - - let get_object_bytes_future = if let Some(future) = existing_future { - future - } else { - let mut futures_cache = self.futures_cache.write().await; - - futures_cache - .entry(prefix.to_string()) - .or_insert_with(|| self.get_object_bytes(bucket, prefix).shared()) - .clone() - }; + let get_object_bytes_future = self + .futures_cache + .get_or_set_with(prefix.to_string(), || { + self.get_object_bytes_shared(bucket, prefix) + }) + .await; let get_object_bytes_result = get_object_bytes_future.await; if get_object_bytes_result.is_err() { - let mut futures_cache = self.futures_cache.write().await; - - futures_cache.remove(prefix); + self.futures_cache.remove(prefix).await; } get_object_bytes_result @@ -260,8 +284,12 @@ mod tests { .get_object_bytes("bucket", "prefix") .await; - let futures_cache = shared_lake_s3_client.inner.futures_cache.read().await; - assert!(futures_cache.get("prefix").is_some()); + assert!(shared_lake_s3_client + .inner + .futures_cache + .get("prefix") + .await + .is_some()); } #[tokio::test] @@ -286,8 +314,11 @@ mod tests { .get_object_bytes("bucket", "prefix") .await; - let futures_cache = shared_lake_s3_client.inner.futures_cache.read().await; - - assert!(futures_cache.get("prefix").is_none()); + assert!(shared_lake_s3_client + .inner + .futures_cache + .get("prefix") + .await + .is_none()); } }