Skip to content

Commit

Permalink
refactor: Cache fixed amount of s3 futures
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Apr 16, 2024
1 parent 2ad6c4c commit 913cf06
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 40 deletions.
43 changes: 43 additions & 0 deletions block-streamer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ async-trait = "0.1.74"
aws-config = { version = "1.1.3", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.13.0"
borsh = "0.10.2"
cached = "0.49.3"
chrono = "0.4.25"
futures = "0.3.5"
lazy_static = "1.4.0"
Expand Down
55 changes: 55 additions & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,61 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
})),
}))
.await?;
let response = client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test1".to_string(),
version: 0,
redis_stream: "morgs.near/test1:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
})),
}))
.await?;

let response = client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test2".to_string(),
version: 0,
redis_stream: "morgs.near/test2:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
})),
}))
.await?;

let response = client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test3".to_string(),
version: 0,
redis_stream: "morgs.near/test3:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
})),
}))
.await?;

let response = client
.start_stream(Request::new(StartStreamRequest {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test4".to_string(),
version: 0,
redis_stream: "morgs.near/test4:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
})),
}))
.await?;

println!("{:#?}", response.into_inner());

Expand Down
20 changes: 10 additions & 10 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,18 @@ pub(crate) async fn start_block_stream(
.with_label_values(&[&indexer.get_full_name()])
.reset();

let last_indexed_delta_lake_block = process_delta_lake_blocks(
start_block_height,
delta_lake_client,
redis_client.clone(),
indexer,
redis_stream.clone(),
)
.await
.context("Failed during Delta Lake processing")?;
// let last_indexed_delta_lake_block = process_delta_lake_blocks(
// start_block_height,
// delta_lake_client,
// redis_client.clone(),
// indexer,
// redis_stream.clone(),
// )
// .await
// .context("Failed during Delta Lake processing")?;

let last_indexed_near_lake_block = process_near_lake_blocks(
last_indexed_delta_lake_block,
start_block_height,
lake_s3_client,
lake_prefetch_size,
redis_client,
Expand Down
91 changes: 61 additions & 30 deletions block-streamer/src/lake_s3_client.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
#![cfg_attr(test, allow(dead_code))]

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use cached::{Cached, SizedCache};
use futures::future::Shared;
use futures::{Future, FutureExt};
use near_lake_framework::s3_client::{GetObjectBytesError, ListCommonPrefixesError};
use tokio::sync::RwLock;
use tokio::sync::Mutex;

use crate::metrics;

/// Number of files added to Near Lake S3 per hour
const CACHE_SIZE: usize = 18_000;

#[cfg(test)]
pub use MockSharedLakeS3ClientImpl as SharedLakeS3Client;
#[cfg(not(test))]
Expand Down Expand Up @@ -61,18 +64,50 @@ impl near_lake_framework::s3_client::S3Client for SharedLakeS3ClientImpl {
}
}

#[derive(Debug)]
struct FuturesCache {
cache: Mutex<SizedCache<String, SharedGetObjectBytesFuture>>,
}

impl FuturesCache {
pub fn with_size(size: usize) -> Self {
Self {
cache: Mutex::new(SizedCache::with_size(size)),
}
}

#[cfg(test)]
pub async fn get(&self, key: &str) -> Option<SharedGetObjectBytesFuture> {
let mut cache = self.cache.lock().await;
cache.cache_get(key).cloned()
}

pub async fn get_or_set_with(
&self,
key: String,
f: impl FnOnce() -> SharedGetObjectBytesFuture,
) -> SharedGetObjectBytesFuture {
let mut cache = self.cache.lock().await;
cache.cache_get_or_set_with(key, f).clone()
}

pub async fn remove(&self, key: &str) {
let mut cache = self.cache.lock().await;
cache.cache_remove(key);
}
}

#[derive(Debug)]
pub struct LakeS3Client {
s3_client: crate::s3_client::S3Client,
// TODO use a more efficient cache
futures_cache: RwLock<HashMap<String, SharedGetObjectBytesFuture>>,
futures_cache: FuturesCache,
}

impl LakeS3Client {
pub fn new(s3_client: crate::s3_client::S3Client) -> Self {
Self {
s3_client,
futures_cache: RwLock::new(HashMap::new()),
futures_cache: FuturesCache::with_size(CACHE_SIZE),
}
}

Expand All @@ -82,7 +117,7 @@ impl LakeS3Client {
Self::new(s3_client)
}

fn get_object_bytes(&self, bucket: &str, prefix: &str) -> GetObjectBytesFuture {
fn get_object_bytes_shared(&self, bucket: &str, prefix: &str) -> SharedGetObjectBytesFuture {
let s3_client = self.s3_client.clone();
let bucket = bucket.to_owned();
let prefix = prefix.to_owned();
Expand All @@ -97,32 +132,21 @@ impl LakeS3Client {
Ok(bytes)
}
.boxed()
.shared()
}

async fn get_object_bytes_cached(&self, bucket: &str, prefix: &str) -> GetObjectBytesResult {
let existing_future = {
let futures_cache = self.futures_cache.read().await;

futures_cache.get(prefix).cloned()
};

let get_object_bytes_future = if let Some(future) = existing_future {
future
} else {
let mut futures_cache = self.futures_cache.write().await;

futures_cache
.entry(prefix.to_string())
.or_insert_with(|| self.get_object_bytes(bucket, prefix).shared())
.clone()
};
let get_object_bytes_future = self
.futures_cache
.get_or_set_with(prefix.to_string(), || {
self.get_object_bytes_shared(bucket, prefix)
})
.await;

let get_object_bytes_result = get_object_bytes_future.await;

if get_object_bytes_result.is_err() {
let mut futures_cache = self.futures_cache.write().await;

futures_cache.remove(prefix);
self.futures_cache.remove(prefix).await;
}

get_object_bytes_result
Expand Down Expand Up @@ -260,8 +284,12 @@ mod tests {
.get_object_bytes("bucket", "prefix")
.await;

let futures_cache = shared_lake_s3_client.inner.futures_cache.read().await;
assert!(futures_cache.get("prefix").is_some());
assert!(shared_lake_s3_client
.inner
.futures_cache
.get("prefix")
.await
.is_some());
}

#[tokio::test]
Expand All @@ -286,8 +314,11 @@ mod tests {
.get_object_bytes("bucket", "prefix")
.await;

let futures_cache = shared_lake_s3_client.inner.futures_cache.read().await;

assert!(futures_cache.get("prefix").is_none());
assert!(shared_lake_s3_client
.inner
.futures_cache
.get("prefix")
.await
.is_none());
}
}

0 comments on commit 913cf06

Please sign in to comment.