From b4260275dcf0277ce906d358a19790c8a5494c7c Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Sat, 12 Oct 2024 23:02:49 +0100 Subject: [PATCH] added in basic delta lake processsing --- Cargo.toml | 7 +++--- src/delta_lake/mod.rs | 53 ++++++++++++++++++++++++------------------- src/main.rs | 12 ++++++---- 3 files changed, 41 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 67edf54..b9a9812 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,12 +25,13 @@ serde_json = "1.0.124" indicatif = "0.17.8" bytes = "1.7.2" csv = "1.3.0" -aws-config = "1.5.5" +aws-config = "1.5.8" aws-credential-types = "1.2.0" aws-sdk-sts = "1.39.0" aws-types = "1.3.3" -deltalake = "0.19.0" -deltalake-aws = "0.1.4" +deltalake = "0.20.1" +deltalake-aws = "0.3.0" +tokio = { version = "1.39.3", features = ["full"] } [lib] name = "nebby" diff --git a/src/delta_lake/mod.rs b/src/delta_lake/mod.rs index 9855a7a..a96b334 100644 --- a/src/delta_lake/mod.rs +++ b/src/delta_lake/mod.rs @@ -1,29 +1,29 @@ use aws_config; use aws_config::BehaviorVersion; use aws_sdk_sts::config::ProvideCredentials; -use deltalake::{open_table_with_storage_options, DeltaTable, DeltaTableError}; +use deltalake::{open_table_with_storage_options, DeltaTableError}; use std::collections::HashMap; use std::time::Duration; // Load AWS Creds into a hashmap for use with delta lake reader -pub fn get_aws_config() -> Result, Box> { - let runtime = tokio::runtime::Runtime::new().unwrap(); - let config = runtime.block_on(async { - aws_config::defaults(BehaviorVersion::latest()) - .retry_config(aws_config::retry::RetryConfig::standard().with_max_attempts(5)) - .timeout_config( - aws_config::timeout::TimeoutConfig::builder() - .operation_timeout(Duration::from_secs(30)) - .build(), - ) - .load() - .await - }); +pub async fn get_aws_config() -> Result, Box> { + // Locate aws creds + let config = aws_config::defaults(BehaviorVersion::latest()) + .retry_config(aws_config::retry::RetryConfig::standard().with_max_attempts(5)) + .timeout_config( + aws_config::timeout::TimeoutConfig::builder() + .operation_timeout(Duration::from_secs(30)) + .build(), + ) + .load() + .await; + // Create new hashmap to store creds let mut aws_info = HashMap::new(); + // Add credentials to HashMap if available if let Some(creds_provider) = config.credentials_provider() { - match runtime.block_on(creds_provider.provide_credentials()) { + match creds_provider.provide_credentials().await { Ok(creds) => { aws_info.insert( "AWS_ACCESS_KEY_ID".to_string(), @@ -42,23 +42,30 @@ pub fn get_aws_config() -> Result, Box, -) -> Result { +) -> Result<(), DeltaTableError> { + // load credentials let storage_options: HashMap = credential_hash_map; + // register aws backend deltalake_aws::register_handlers(None); - let remote_delta_lake_table = open_table(s3_uri, Some(storage_options))?; + // open delta lake table + let remote_delta_lake_table = open_table_with_storage_options(s3_uri, storage_options).await?; + + // Get and print the latest version + let version = remote_delta_lake_table.version(); + println!("Current version: {:?}", version); - println!("version: {}", remote_delta_lake_table.version()); - println!("metadata: {:?}", remote_delta_lake_table.metadata()); - Ok(remote_delta_lake_table) + // Get and print the table URI + let uri = remote_delta_lake_table.table_uri(); + println!("Table URI: {}", uri); + Ok(()) } diff --git a/src/main.rs b/src/main.rs index e0fa0a4..9bdbc13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use excel::{ analyze_excel_formatting, display_remote_basic_info, display_remote_basic_info_specify_header_idx, excel_quick_view, fetch_remote_file, }; +use tokio; use utils::create_progress_bar; #[derive(Parser, Debug)] @@ -69,7 +70,8 @@ enum Commands { } // Call commands and file logic -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { let cli = Cli::parse(); match &cli.command { Commands::BasicXl { url } => process_excel(url, "basic info"), @@ -79,7 +81,7 @@ fn main() -> Result<(), Box> { Commands::BasicJson { url } => process_json(url), Commands::Nibble { url } => process_view_bytes(url), Commands::BasicCsv { url } => process_csv(url), - Commands::DeltaLake { s3_uri } => process_delta_lake(s3_uri), + Commands::DeltaLake { s3_uri } => process_delta_lake(s3_uri).await, } } @@ -157,11 +159,11 @@ fn process_csv(url: &str) -> Result<(), Box> { result } -fn process_delta_lake(s3_uri: &str) -> Result<(), Box> { +async fn process_delta_lake(s3_uri: &str) -> Result<(), Box> { let pb = create_progress_bar("Processing Delta Lake table..."); - match get_aws_config() { - Ok(config) => match load_remote_delta_lake_table_info(s3_uri, config) { + match get_aws_config().await { + Ok(config) => match load_remote_delta_lake_table_info(s3_uri, config).await { Ok(_table) => { pb.finish_with_message("Successfully loaded the Delta table"); Ok(())