-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Reduce requests made to Near Lake S3 #665
Conversation
49551e6
to
2ad6c4c
Compare
near-lake-framework
near-lake-framework
05628d3
to
9c436e3
Compare
Self::new(s3_client) | ||
} | ||
|
||
fn get_object_bytes_shared(&self, bucket: &str, prefix: &str) -> SharedGetObjectBytesFuture { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially an async fn
, and can be await
ed as such, but, this Future
can be cloned. To achieve this, the values referenced must be cloned to ensure they live long enough.
async fn get_object_bytes_cached(&self, bucket: &str, prefix: &str) -> GetObjectBytesResult { | ||
let get_object_bytes_future = self | ||
.futures_cache | ||
.get_or_set_with(prefix.to_string(), || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get/set must be done in one operation at the cache level. Doing them in sequence leaves room for multiple cache writes and therefore multiple requests.
let call_count_clone = s3_get_call_count.clone(); | ||
|
||
let mut mock_s3_client = crate::s3_client::S3Client::default(); | ||
mock_s3_client.expect_clone().returning(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't truely clone this mock instance, so therefore can't simply assert that handler was called once()
. Each clone gets a completely new instance.
To work around this, I use an atomic counter to count the actual number of requests made.
block-streamer/src/lake_s3_client.rs
Outdated
|
||
let shared_lake_s3_client = SharedLakeS3ClientImpl::new(LakeS3Client::new(mock_s3_client)); | ||
|
||
let barrier = Arc::new(Barrier::new(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocks execution until it has been wait
ed the specified number of times (10
), making the requests fire in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we covered this in the meeting. We can probably try bumping the parallel threads/requests to something like 50 to more consistently test this behavior.
block-streamer/src/s3_client.rs
Outdated
@@ -52,7 +51,7 @@ impl S3ClientImpl { | |||
.list_objects_v2() | |||
.delimiter("/") | |||
.bucket(bucket) | |||
.prefix(prefix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This limits the keys to only those which begin with prefix
, so for listing block heights, only 1 value is ever returned.
This may have been intentional for DeltaLakeClient
, still need to confirm this doesn't break anything there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was actually a breaking change - I reverted so existing functionality uses the original implementation, and this PR uses a new method.
@@ -120,3 +119,42 @@ impl S3ClientImpl { | |||
Ok(results) | |||
} | |||
} | |||
|
|||
#[cfg(test)] | |||
mockall::mock! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to "manual" mock implementation so that .clone()
can also be mocked. The default mock doesn't implement Clone
.
#[cfg(not(test))] | ||
pub use LakeS3ClientImpl as LakeS3Client; | ||
/// Number of files added to Near Lake S3 per hour | ||
const CACHE_SIZE: usize = 18_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With 1 block produced per second - 60 seconds x 60 minutes x 5 files per block.
So roughly caching 1 hours worth of blocks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to think how backfills would impact what constitutes the cache size. Backfill block requests might force out some of the queue size to be usable for catchups, but I think a lot would need to occur simultaneously for that to happen. I think this should work in the main scenarios I would expect us to gain the most benefits from:
- When Block Streamer starts up after a pause.
- When many streams are up to date.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backfills do use Delta Lake rather than Near Lake, so won't actually use this cache. But after that, you're right, they will start to flood it.
We can definitely adjust this value, or whole caching strategy, as we go :)
9c436e3
to
b05e807
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super cool change!
#[cfg(not(test))] | ||
pub use LakeS3ClientImpl as LakeS3Client; | ||
/// Number of files added to Near Lake S3 per hour | ||
const CACHE_SIZE: usize = 18_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to think how backfills would impact what constitutes the cache size. Backfill block requests might force out some of the queue size to be usable for catchups, but I think a lot would need to occur simultaneously for that to happen. I think this should work in the main scenarios I would expect us to gain the most benefits from:
- When Block Streamer starts up after a pause.
- When many streams are up to date.
block-streamer/src/lake_s3_client.rs
Outdated
|
||
let shared_lake_s3_client = SharedLakeS3ClientImpl::new(LakeS3Client::new(mock_s3_client)); | ||
|
||
let barrier = Arc::new(Barrier::new(10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we covered this in the meeting. We can probably try bumping the parallel threads/requests to something like 50 to more consistently test this behavior.
4b1d2a1
to
0f4fea5
Compare
Each
BlockStream
uses its own dedicatednear-lake-framework
instance, and hence manages its own connection with S3. This leads to many duplicate S3 requests, particularly across the large majority of Indexers which follow the network tip, which request the same block data at the same time.This PR introduces a shared S3 client to be used across all
near-lake-framework
instances.SharedLakeS3Client
ensures that duplicate requests made within a short time-frame, including those made in parallel, result in only a single request to S3.Cache Strategy
This implementation will mostly impact
BlockStream
s following the network tip, i.e.From Latest
. These streams will wait for new data in Near Lake S3, and request it as soon as it is available, at the same time. Therefore, it would be enough to cache the result alone, by the time we actually prime the cache, all other requests would have missed it and fired a request of their own. Locking while the request is in-flight also is not feasible, as this would force every request to execute in sequence.Instead of caching the result of the request, we cache its computation. The first request initiates the request and stores its
Future
, then all subsequent requests retrieve thatFuture
from cache andawait
its result, ensuring only one underlying request at most.Performance Impact
My main concern with this implementation is the impact it will have on performance. Each request made must block to check the cache, introducing contention/delays. The lock is only held while checking the cache, and not while the request is being made, so my hope is that it does not impact too much. This may be something that needs to be iterated over time.
From local testing the impact seemed to be negligible, but that was with 5 Indexers, it may be worse with many. I've added a metric to measure lock wait time, to determine whether this contention is becoming a problem.