Skip to content

Commit

Permalink
added in basic delta lake processsing
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Oct 12, 2024
1 parent 1d5f05e commit b426027
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 31 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ serde_json = "1.0.124"
indicatif = "0.17.8"
bytes = "1.7.2"
csv = "1.3.0"
aws-config = "1.5.5"
aws-config = "1.5.8"
aws-credential-types = "1.2.0"
aws-sdk-sts = "1.39.0"
aws-types = "1.3.3"
deltalake = "0.19.0"
deltalake-aws = "0.1.4"
deltalake = "0.20.1"
deltalake-aws = "0.3.0"
tokio = { version = "1.39.3", features = ["full"] }

[lib]
name = "nebby"
Expand Down
53 changes: 30 additions & 23 deletions src/delta_lake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
use aws_config;
use aws_config::BehaviorVersion;
use aws_sdk_sts::config::ProvideCredentials;
use deltalake::{open_table_with_storage_options, DeltaTable, DeltaTableError};
use deltalake::{open_table_with_storage_options, DeltaTableError};
use std::collections::HashMap;
use std::time::Duration;

// Load AWS Creds into a hashmap for use with delta lake reader
pub fn get_aws_config() -> Result<HashMap<String, String>, Box<dyn std::error::Error>> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let config = runtime.block_on(async {
aws_config::defaults(BehaviorVersion::latest())
.retry_config(aws_config::retry::RetryConfig::standard().with_max_attempts(5))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(30))
.build(),
)
.load()
.await
});
pub async fn get_aws_config() -> Result<HashMap<String, String>, Box<dyn std::error::Error>> {
// Locate aws creds
let config = aws_config::defaults(BehaviorVersion::latest())
.retry_config(aws_config::retry::RetryConfig::standard().with_max_attempts(5))
.timeout_config(
aws_config::timeout::TimeoutConfig::builder()
.operation_timeout(Duration::from_secs(30))
.build(),
)
.load()
.await;

// Create new hashmap to store creds
let mut aws_info = HashMap::new();

// Add credentials to HashMap if available
if let Some(creds_provider) = config.credentials_provider() {
match runtime.block_on(creds_provider.provide_credentials()) {
match creds_provider.provide_credentials().await {
Ok(creds) => {
aws_info.insert(
"AWS_ACCESS_KEY_ID".to_string(),
Expand All @@ -42,23 +42,30 @@ pub fn get_aws_config() -> Result<HashMap<String, String>, Box<dyn std::error::E
} else {
return Err("No credentials provider found in the configuration".into());
}
// Add success message
println!("AWS configuration loaded successfully and added to HashMap.");

Ok(aws_info)
}

// Read basic info about delta lake stored in S3
pub fn load_remote_delta_lake_table_info(
pub async fn load_remote_delta_lake_table_info(
s3_uri: &str,
credential_hash_map: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
) -> Result<(), DeltaTableError> {
// load credentials
let storage_options: HashMap<String, String> = credential_hash_map;

// register aws backend
deltalake_aws::register_handlers(None);

let remote_delta_lake_table = open_table(s3_uri, Some(storage_options))?;
// open delta lake table
let remote_delta_lake_table = open_table_with_storage_options(s3_uri, storage_options).await?;

// Get and print the latest version
let version = remote_delta_lake_table.version();
println!("Current version: {:?}", version);

println!("version: {}", remote_delta_lake_table.version());
println!("metadata: {:?}", remote_delta_lake_table.metadata());
Ok(remote_delta_lake_table)
// Get and print the table URI
let uri = remote_delta_lake_table.table_uri();
println!("Table URI: {}", uri);
Ok(())
}
12 changes: 7 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use excel::{
analyze_excel_formatting, display_remote_basic_info,
display_remote_basic_info_specify_header_idx, excel_quick_view, fetch_remote_file,
};
use tokio;
use utils::create_progress_bar;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -69,7 +70,8 @@ enum Commands {
}

// Call commands and file logic
fn main() -> Result<(), Box<dyn std::error::Error>> {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
match &cli.command {
Commands::BasicXl { url } => process_excel(url, "basic info"),
Expand All @@ -79,7 +81,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
Commands::BasicJson { url } => process_json(url),
Commands::Nibble { url } => process_view_bytes(url),
Commands::BasicCsv { url } => process_csv(url),
Commands::DeltaLake { s3_uri } => process_delta_lake(s3_uri),
Commands::DeltaLake { s3_uri } => process_delta_lake(s3_uri).await,
}
}

Expand Down Expand Up @@ -157,11 +159,11 @@ fn process_csv(url: &str) -> Result<(), Box<dyn std::error::Error>> {
result
}

fn process_delta_lake(s3_uri: &str) -> Result<(), Box<dyn std::error::Error>> {
async fn process_delta_lake(s3_uri: &str) -> Result<(), Box<dyn std::error::Error>> {
let pb = create_progress_bar("Processing Delta Lake table...");

match get_aws_config() {
Ok(config) => match load_remote_delta_lake_table_info(s3_uri, config) {
match get_aws_config().await {
Ok(config) => match load_remote_delta_lake_table_info(s3_uri, config).await {
Ok(_table) => {
pb.finish_with_message("Successfully loaded the Delta table");
Ok(())
Expand Down

0 comments on commit b426027

Please sign in to comment.