From 8553677c9065867861f54eabb6cd8be62ae49d3b Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Wed, 23 Oct 2024 20:46:29 +0300 Subject: [PATCH] Adds support for COPY TO/FROM Azure Blob Storage Supports following Azure Blob uri forms: - `az://{container}/key` - `azure://{container}/key` - `https://{account}.blob.core.windows.net/{container}` **Configuration** The simplest way to configure object storage is by creating the standard [`~/.azure/config`](https://learn.microsoft.com/en-us/cli/azure/azure-cli-configuration?view=azure-cli-latest) file: ```bash $ cat ~/.azure/config [storage] account = devstoreaccount1 key = Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== ``` Alternatively, you can use the following environment variables when starting postgres to configure the Azure Blob Storage client: - `AZURE_STORAGE_ACCOUNT`: the storage account name of the Azure Blob - `AZURE_STORAGE_KEY`: the storage key of the Azure Blob - `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob - `AZURE_CONFIG_FILE`: an alternative location for the config file **Bonus** Additionally, PR supports following S3 uri forms: - `s3://{bucket}/key` - `s3a://{bucket}/key` - `https://s3.amazonaws.com/{bucket}/key` - `https://{bucket}.s3.amazonaws.com/key` Closes #50 --- .devcontainer/.env | 8 + .devcontainer/Dockerfile | 5 + .devcontainer/create-test-buckets.sh | 2 + .devcontainer/docker-compose.yml | 17 +- .github/workflows/ci.yml | 16 ++ Cargo.lock | 50 ++++- Cargo.toml | 4 +- README.md | 38 +++- src/arrow_parquet/uri_utils.rs | 214 +++++++++++++++++---- src/pgrx_tests/object_store.rs | 271 +++++++++++++++++++++++++-- 10 files changed, 566 insertions(+), 59 deletions(-) diff --git a/.devcontainer/.env b/.devcontainer/.env index 14f05d0..d94153e 100644 --- a/.devcontainer/.env +++ b/.devcontainer/.env @@ -6,6 +6,14 @@ AWS_S3_TEST_BUCKET=testbucket MINIO_ROOT_USER=minioadmin MINIO_ROOT_PASSWORD=minioadmin +# Azure Blob tests +AZURE_STORAGE_ACCOUNT=devstoreaccount1 +AZURE_STORAGE_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" +AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;" +AZURE_TEST_CONTAINER_NAME=testcontainer +AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D" +AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D" + # Others RUST_TEST_THREADS=1 PG_PARQUET_TEST=true diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 522d00a..dfad9b6 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -12,6 +12,11 @@ RUN apt-get update && apt-get -y install build-essential libreadline-dev zlib1g- curl lsb-release ca-certificates gnupg sudo git \ nano net-tools awscli +# install azure-cli +RUN curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | tee /etc/apt/keyrings/microsoft.gpg > /dev/null +RUN echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | tee /etc/apt/sources.list.d/azure-cli.list +RUN apt-get update && apt-get install -y azure-cli + # install Postgres RUN sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' RUN wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - diff --git a/.devcontainer/create-test-buckets.sh b/.devcontainer/create-test-buckets.sh index 65dfef0..9ad1360 100644 --- a/.devcontainer/create-test-buckets.sh +++ b/.devcontainer/create-test-buckets.sh @@ -1,3 +1,5 @@ #!/bin/bash aws --endpoint-url http://localhost:9000 s3 mb s3://$AWS_S3_TEST_BUCKET + +az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 259cfc8..5147410 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -10,13 +10,16 @@ services: - ${USERPROFILE}${HOME}/.ssh:/home/rust/.ssh:ro - ${USERPROFILE}${HOME}/.ssh/known_hosts:/home/rust/.ssh/known_hosts:rw - ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro - - ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:ro + - ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw + - ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw + env_file: - .env cap_add: - SYS_PTRACE depends_on: - minio + - azurite minio: image: minio/minio @@ -30,3 +33,15 @@ services: interval: 6s timeout: 2s retries: 3 + + azurite: + image: mcr.microsoft.com/azure-storage/azurite + env_file: + - .env + network_mode: host + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "http://localhost:10000"] + interval: 6s + timeout: 2s + retries: 3 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7d4ce9d..723c037 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -85,6 +85,11 @@ jobs: postgresql-client-${{ env.PG_MAJOR }} \ libpq-dev + - name: Install azure-cli + run: | + curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | sudo tee /etc/apt/keyrings/microsoft.gpg > /dev/null + echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/keyrings/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | sudo tee /etc/apt/sources.list.d/azure-cli.list + sudo apt-get update && sudo apt-get install -y azure-cli - name: Install and configure pgrx run: | @@ -116,6 +121,17 @@ jobs: aws --endpoint-url http://localhost:9000 s3 mb s3://$AWS_S3_TEST_BUCKET + - name: Start Azurite for Azure Blob Storage emulator tests + run: | + docker run -d --env-file .devcontainer/.env -p 10000:10000 mcr.microsoft.com/azure-storage/azurite + + while ! nc -z localhost 10000; do + echo "Waiting for localhost:10000..." + sleep 1 + done + + az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING + - name: Run tests run: | # Run tests with coverage tool diff --git a/Cargo.lock b/Cargo.lock index a6702d3..b547a37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,7 +137,7 @@ dependencies = [ "arrow-schema", "chrono", "half 2.4.1", - "hashbrown", + "hashbrown 0.15.2", "num", ] @@ -1009,6 +1009,15 @@ dependencies = [ "syn", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "either" version = "1.13.0" @@ -1308,6 +1317,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1706,7 +1721,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -2108,6 +2123,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "outref" version = "0.5.1" @@ -2168,7 +2193,7 @@ dependencies = [ "flate2", "futures", "half 2.4.1", - "hashbrown", + "hashbrown 0.15.2", "lz4_flex", "num", "num-bigint", @@ -2236,11 +2261,13 @@ dependencies = [ "aws-config", "aws-credential-types", "futures", + "home", "object_store", "once_cell", "parquet", "pgrx", "pgrx-tests", + "rust-ini", "tokio", "url", ] @@ -2741,6 +2768,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rust-ini" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3551,6 +3589,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index e59a625..d71a000 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,8 @@ arrow-schema = {version = "53", default-features = false} aws-config = { version = "1.5", default-features = false, features = ["rustls"]} aws-credential-types = {version = "1.2", default-features = false} futures = "0.3" -object_store = {version = "0.11", default-features = false, features = ["aws"]} +home = "0.5" +object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]} once_cell = "1" parquet = {version = "53", default-features = false, features = [ "arrow", @@ -38,6 +39,7 @@ parquet = {version = "53", default-features = false, features = [ "object_store", ]} pgrx = "=0.12.9" +rust-ini = "0.21" tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]} url = "2" diff --git a/README.md b/README.md index 353b01f..2060589 100644 --- a/README.md +++ b/README.md @@ -156,7 +156,13 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM ``` ## Object Store Support -`pg_parquet` supports reading and writing Parquet files from/to `S3` object store. Only the uris with `s3://` scheme is supported. +`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores. + +> [!NOTE] +> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. +> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. + +#### S3 Storage The simplest way to configure object storage is by creating the standard `~/.aws/credentials` and `~/.aws/config` files: @@ -179,9 +185,33 @@ Alternatively, you can use the following environment variables when starting pos - `AWS_CONFIG_FILE`: an alternative location for the config file - `AWS_PROFILE`: the name of the profile from the credentials and config file (default profile name is `default`) -> [!NOTE] -> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. -> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. +Supported S3 uri formats are shown below: +- s3:// \ / \ +- s3a:// \ / \ +- https:// \.s3.amazonaws.com / \ +- https:// s3.amazonaws.com / \ / \ + +#### Azure Blob Storage + +The simplest way to configure object storage is by creating the standard [`~/.azure/config`](https://learn.microsoft.com/en-us/cli/azure/azure-cli-configuration?view=azure-cli-latest) file: + +```bash +$ cat ~/.azure/config +[storage] +account = devstoreaccount1 +key = Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== +``` + +Alternatively, you can use the following environment variables when starting postgres to configure the Azure Blob Storage client: +- `AZURE_STORAGE_ACCOUNT`: the storage account name of the Azure Blob +- `AZURE_STORAGE_KEY`: the storage key of the Azure Blob +- `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob +- `AZURE_CONFIG_FILE`: an alternative location for the config file + +Supported Azure Blob Storage uri formats are shown below: +- az:// \ / \ +- azure:// \ / \ +- https:// \.blob.core.windows.net / \ ## Copy Options `pg_parquet` supports the following options in the `COPY TO` command: diff --git a/src/arrow_parquet/uri_utils.rs b/src/arrow_parquet/uri_utils.rs index 3ff97af..438bc35 100644 --- a/src/arrow_parquet/uri_utils.rs +++ b/src/arrow_parquet/uri_utils.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, sync::LazyLock}; +use std::{ + panic, + sync::{Arc, LazyLock}, +}; use arrow::datatypes::SchemaRef; use aws_config::{ @@ -7,11 +10,14 @@ use aws_config::{ profile::{ProfileFileCredentialsProvider, ProfileFileRegionProvider}, }; use aws_credential_types::provider::ProvideCredentials; +use home::home_dir; +use ini::Ini; use object_store::{ aws::{AmazonS3, AmazonS3Builder}, + azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder}, local::LocalFileSystem, path::Path, - ObjectStore, + ObjectStore, ObjectStoreScheme, }; use parquet::{ arrow::{ @@ -44,57 +50,106 @@ pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock = LazyLock::new(|| .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)) }); -fn parse_bucket_and_key(uri: &Url) -> (String, String) { - debug_assert!(uri.scheme() == "s3"); +fn parse_azure_blob_container(uri: &Url) -> Option { + let host = uri.host_str()?; - let bucket = uri - .host_str() - .unwrap_or_else(|| panic!("bucket not found in uri: {}", uri)); + // az(ure)://{container}/key + if uri.scheme() == "az" || uri.scheme() == "azure" { + return Some(host.to_string()); + } + // https://{account}.blob.core.windows.net/{container} + else if host.ends_with("blob.core.windows.net") { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); - let key = uri.path(); + if !path_segments.is_empty() { + return Some(path_segments[0].to_string()); + } else { + return None; + } + } - (bucket.to_string(), key.to_string()) + None +} + +fn parse_s3_bucket(uri: &Url) -> Option { + let host = uri.host_str()?; + + // s3(a)://{bucket}/key + if uri.scheme() == "s3" || uri.scheme() == "s3a" { + return Some(host.to_string()); + } + // https://s3.amazonaws.com/{bucket}/key + else if host == "s3.amazonaws.com" { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); + if !path_segments.is_empty() { + return Some(path_segments[0].to_string()); // Bucket name is the first part of the path + } else { + return None; + } + } + // https://{bucket}.s3.amazonaws.com/key + else if host.ends_with("s3.amazonaws.com") { + let bucket_name = host.split('.').next()?; + return Some(bucket_name.to_string()); + } + + None } fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc, Path) { - if uri.scheme() == "s3" { - let (bucket_name, key) = parse_bucket_and_key(uri); + let (scheme, path) = + ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); - let storage_container = PG_BACKEND_TOKIO_RUNTIME - .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); + match scheme { + ObjectStoreScheme::AmazonS3 => { + let bucket_name = parse_s3_bucket(uri).unwrap_or_else(|| { + panic!("failed to parse bucket name from uri: {}", uri); + }); - let location = Path::from(key); + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); - (storage_container, location) - } else { - debug_assert!(uri.scheme() == "file"); - - let uri = uri_as_string(uri); - - if !copy_from { - // create or overwrite the local file - std::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&uri) - .unwrap_or_else(|e| panic!("{}", e)); + (storage_container, path) + } + ObjectStoreScheme::MicrosoftAzure => { + let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| { + panic!("failed to parse container name from uri: {}", uri); + }); + + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_azure_object_store(&container_name).await) }); + + (storage_container, path) } + ObjectStoreScheme::Local => { + let uri = uri_as_string(uri); + + if !copy_from { + // create or overwrite the local file + std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&uri) + .unwrap_or_else(|e| panic!("{}", e)); + } - let storage_container = Arc::new(LocalFileSystem::new()); + let storage_container = Arc::new(LocalFileSystem::new()); - let location = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); + let path = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); - (storage_container, location) + (storage_container, path) + } + _ => { + panic!("unsupported uri {}", uri); + } } } async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { let mut aws_s3_builder = AmazonS3Builder::new().with_bucket_name(bucket_name); - let is_test_running = std::env::var("PG_PARQUET_TEST").is_ok(); - - if is_test_running { + if is_testing() { // use minio for testing aws_s3_builder = aws_s3_builder.with_endpoint("http://localhost:9000"); aws_s3_builder = aws_s3_builder.with_allow_http(true); @@ -139,6 +194,78 @@ async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e)) } +async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure { + let mut azure_builder = MicrosoftAzureBuilder::new().with_container_name(container_name); + + if is_testing() { + // use azurite for testing + azure_builder = + azure_builder.with_endpoint("http://localhost:10000/devstoreaccount1".into()); + azure_builder = azure_builder.with_allow_http(true); + } + + // ~/.azure/config + let azure_config_file_path = std::env::var("AZURE_CONFIG_FILE").unwrap_or( + home_dir() + .expect("failed to get home directory") + .join(".azure") + .join("config") + .to_str() + .expect("failed to convert path to string") + .to_string(), + ); + + let azure_config_content = Ini::load_from_file(&azure_config_file_path).ok(); + + // storage account + let azure_blob_account = match std::env::var("AZURE_STORAGE_ACCOUNT") { + Ok(account) => Some(account), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("account")) + .map(|account| account.to_string()), + }; + + if let Some(azure_blob_account) = azure_blob_account { + azure_builder = azure_builder.with_account(azure_blob_account); + } + + // storage key + let azure_blob_key = match std::env::var("AZURE_STORAGE_KEY") { + Ok(key) => Some(key), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("key")) + .map(|key| key.to_string()), + }; + + if let Some(azure_blob_key) = azure_blob_key { + azure_builder = azure_builder.with_access_key(azure_blob_key); + } + + // sas token + let azure_blob_sas_token = match std::env::var("AZURE_STORAGE_SAS_TOKEN") { + Ok(token) => Some(token), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("sas_token")) + .map(|token| token.to_string()), + }; + + if let Some(azure_blob_sas_token) = azure_blob_sas_token { + azure_builder = azure_builder.with_config(AzureConfigKey::SasKey, azure_blob_sas_token); + } + + azure_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} + +fn is_testing() -> bool { + std::env::var("PG_PARQUET_TEST").is_ok() +} + pub(crate) fn parse_uri(uri: &str) -> Url { if !uri.contains("://") { // local file @@ -148,12 +275,25 @@ pub(crate) fn parse_uri(uri: &str) -> Url { let uri = Url::parse(uri).unwrap_or_else(|e| panic!("{}", e)); - if uri.scheme() != "s3" { + let (scheme, _) = + ObjectStoreScheme::parse(&uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); + + if scheme == ObjectStoreScheme::AmazonS3 { + parse_s3_bucket(&uri) + .unwrap_or_else(|| panic!("failed to parse bucket name from s3 uri {}", uri)); + } else if scheme == ObjectStoreScheme::MicrosoftAzure { + parse_azure_blob_container(&uri).unwrap_or_else(|| { + panic!( + "failed to parse container name from azure blob storage uri {}", + uri + ) + }); + } else { panic!( - "unsupported uri {}. Only local files and URIs with s3:// prefix are supported.", + "unsupported uri {}. Only Azure and S3 uris are supported.", uri ); - } + }; uri } diff --git a/src/pgrx_tests/object_store.rs b/src/pgrx_tests/object_store.rs index 4272027..89edffe 100644 --- a/src/pgrx_tests/object_store.rs +++ b/src/pgrx_tests/object_store.rs @@ -7,20 +7,33 @@ mod tests { use crate::pgrx_tests::common::TestTable; #[pg_test] - fn test_s3_object_store_from_env() { + fn test_s3_from_env() { let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); - let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); - - let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); - - test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); - test_table.assert_expected_and_result_rows(); + let s3_uris = [ + format!("s3://{}/pg_parquet_test.parquet", test_bucket_name), + format!("s3a://{}/pg_parquet_test.parquet", test_bucket_name), + format!( + "https://s3.amazonaws.com/{}/pg_parquet_test.parquet", + test_bucket_name + ), + format!( + "https://{}.s3.amazonaws.com/pg_parquet_test.parquet", + test_bucket_name + ), + ]; + + for s3_uri in s3_uris { + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } } #[pg_test] - fn test_s3_object_store_from_config_file() { + fn test_s3_from_config_file() { let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -32,14 +45,16 @@ mod tests { let region = std::env::var("AWS_REGION").unwrap(); std::env::remove_var("AWS_REGION"); + let profile = "pg_parquet_test"; + // create a config file let aws_config_file_content = format!( - "[profile pg_parquet_test]\nregion = {}\naws_access_key_id = {}\naws_secret_access_key = {}\n", + "[profile {profile}]\nregion = {}\naws_access_key_id = {}\naws_secret_access_key = {}\n", region, access_key_id, secret_access_key ); - std::env::set_var("AWS_PROFILE", "pg_parquet_test"); + std::env::set_var("AWS_PROFILE", profile); - let aws_config_file = "/tmp/aws_config"; + let aws_config_file = "/tmp/pg_parquet_aws_config"; std::env::set_var("AWS_CONFIG_FILE", aws_config_file); let mut aws_config_file = std::fs::OpenOptions::new() @@ -61,6 +76,38 @@ mod tests { test_table.assert_expected_and_result_rows(); } + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_with_wrong_access_key_id() { + std::env::set_var("AWS_ACCESS_KEY_ID", "wrong_access_key_id"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_with_wrong_secret_access_key() { + std::env::set_var("AWS_SECRET_ACCESS_KEY", "wrong_secret_access_key"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + #[pg_test] #[should_panic(expected = "permission denied to COPY from a remote uri")] fn test_s3_no_read_access() { @@ -143,7 +190,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_write_invalid_uri() { + fn test_s3_write_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let copy_to_command = format!( @@ -155,7 +202,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_read_invalid_uri() { + fn test_s3_read_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let create_table_command = "CREATE TABLE test_table (a int);"; @@ -165,6 +212,204 @@ mod tests { Spi::run(copy_from_command.as_str()).unwrap(); } + #[pg_test] + #[should_panic(expected = "failed to parse bucket name")] + fn test_s3_unsupported_uri() { + let cloudflare_s3_uri = "https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(cloudflare_s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + fn test_azure_blob_from_env() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uris = [ + format!("az://{}/pg_parquet_test.parquet", test_container_name), + format!("azure://{}/pg_parquet_test.parquet", test_container_name), + format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ), + ]; + + for azure_blob_uri in azure_blob_uris { + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + } + + #[pg_test] + fn test_azure_from_config_file() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + // remove these to make sure the config file is used + let account_name = std::env::var("AZURE_STORAGE_ACCOUNT").unwrap(); + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + let account_key = std::env::var("AZURE_STORAGE_KEY").unwrap(); + std::env::remove_var("AZURE_STORAGE_KEY"); + + // create a config file + let azure_config_file_content = format!( + "[storage]\naccount = {}\nkey = {}\n", + account_name, account_key + ); + + let azure_config_file = "/tmp/pg_parquet_azure_config"; + std::env::set_var("AZURE_CONFIG_FILE", azure_config_file); + + let mut azure_config_file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(azure_config_file) + .unwrap(); + + azure_config_file + .write_all(azure_config_file_content.as_bytes()) + .unwrap(); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "Account must be specified")] + fn test_azure_with_no_storage_account() { + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_blob_with_wrong_storage_key() { + let wrong_account_key = String::from("FFy8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="); + std::env::set_var("AZURE_STORAGE_KEY", wrong_account_key); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "404 Not Found")] + fn test_azure_blob_write_wrong_container() { + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/nonexistentcontainer", + test_account_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + fn test_azure_blob_read_write_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let read_write_sas_token = std::env::var("AZURE_TEST_READ_WRITE_SAS") + .expect("AZURE_TEST_READ_WRITE_SAS not found"); + + // remove account key to make sure the sas token is used + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_write_sas_token); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_blob_read_only_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let read_only_sas_token: String = + std::env::var("AZURE_TEST_READ_ONLY_SAS").expect("AZURE_TEST_READ_ONLY_SAS not found"); + + // remove account key to make sure the sas token is used + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_only_sas_token); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}", + test_account_name, test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "failed to parse container name")] + fn test_azure_blob_unsupported_uri() { + let fabric_azure_blob_uri = "https://ACCOUNT.dfs.fabric.microsoft.com".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(fabric_azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + #[pg_test] #[should_panic(expected = "unsupported uri gs://testbucket")] fn test_unsupported_uri() {