Skip to content

Commit

Permalink
feat: Create initial LakeS3Client
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Apr 15, 2024
1 parent ad8f20d commit bab4627
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions block-streamer/src/lake_s3_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use async_trait::async_trait;
use near_lake_framework::s3_client::{GetObjectBytesError, ListCommonPrefixesError};

#[derive(Clone, Debug)]
pub struct LakeS3Client {
s3_client: aws_sdk_s3::Client,
}

impl LakeS3Client {
pub fn new(s3_client: aws_sdk_s3::Client) -> Self {
Self { s3_client }
}
}

impl LakeS3Client {
pub fn from_conf(config: aws_sdk_s3::config::Config) -> Self {
let s3_client = aws_sdk_s3::Client::from_conf(config);

Self { s3_client }
}
}

#[async_trait]
impl near_lake_framework::s3_client::S3Client for LakeS3Client {
async fn get_object_bytes(
&self,
bucket: &str,
prefix: &str,
) -> Result<Vec<u8>, GetObjectBytesError> {
let object = self
.s3_client
.get_object()
.bucket(bucket)
.key(prefix)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.send()
.await?;

let bytes = object.body.collect().await?.into_bytes().to_vec();

Ok(bytes)
}

async fn list_common_prefixes(
&self,
bucket: &str,
start_after_prefix: &str,
) -> Result<Vec<String>, ListCommonPrefixesError> {
let response = self
.s3_client
.list_objects_v2()
.max_keys(1000)
.delimiter("/".to_string())
.start_after(start_after_prefix)
.request_payer(aws_sdk_s3::types::RequestPayer::Requester)
.bucket(bucket)
.send()
.await?;

let prefixes = match response.common_prefixes {
None => vec![],
Some(common_prefixes) => common_prefixes
.into_iter()
.filter_map(|common_prefix| common_prefix.prefix)
.collect::<Vec<String>>()
.into_iter()
.filter_map(|prefix_string| prefix_string.split('/').next().map(String::from))
.collect(),
};

Ok(prefixes)
}
}

0 comments on commit bab4627

Please sign in to comment.