Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake-driver): support azure exports buckets #8730

Merged
merged 45 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
78acec5
wip: add azure export bucket options to Snowflake driver
KSDaemon Sep 19, 2024
02af37b
add env var for Azure SAS Token
KSDaemon Sep 20, 2024
d3f0593
add STORAGE_INTEGRATION variant to export bucket flows
KSDaemon Sep 20, 2024
3f28ea0
Simplify isUnloadSupported()
KSDaemon Sep 23, 2024
d34d448
add extractFilesFromGCS() to BaseDriver Class
KSDaemon Sep 23, 2024
3a630b9
implement extractFilesFromAzure() in SnowFlake Driver
KSDaemon Sep 23, 2024
2a7007d
some refactoring in extractFilesFromAzure()
KSDaemon Sep 23, 2024
748d507
debug and fix extractFilesFromAzure()
KSDaemon Sep 23, 2024
187161b
remove error if no csv files were exported (that's ok)
KSDaemon Sep 23, 2024
77e3ed5
move extractFilesFromAzure to BaseDriver from Snowflake
KSDaemon Sep 23, 2024
4d46a81
switch Athena Driver to use extractUnloadedFilesFromS3() from BaseDvi…
KSDaemon Sep 23, 2024
53e31c4
switch RedShift Driver to use extractUnloadedFilesFromS3() from BaseD…
KSDaemon Sep 23, 2024
4c79e3c
switch Databricks Driver to use extractUnloadedFilesFromS3() and extr…
KSDaemon Sep 25, 2024
5ed6ca5
Remove unused/cleanup from databricks driver
KSDaemon Sep 25, 2024
6ca19b3
update extractFilesFromGCS() in BaseDriver to receive credentials ins…
KSDaemon Sep 25, 2024
3ff0844
remove dep on @google-cloud/storage in Databricks for export bucket flow
KSDaemon Sep 25, 2024
2073213
fix Databricks driver export bucket flows after refactoring
KSDaemon Sep 25, 2024
b801306
set up CI to test clickhouse with s3 export bucket
KSDaemon Sep 26, 2024
9f92483
Rename databricks export bucket test to reflect the s3 type
KSDaemon Sep 26, 2024
94fb026
fix clickhouse-export-bucket-s3-full script in package.json
KSDaemon Sep 26, 2024
01ea8cd
set up CI to test Snowflake with s3 export bucket
KSDaemon Sep 26, 2024
99d2577
rename databricks tests snapshot after renaming test job
KSDaemon Sep 26, 2024
29d1d32
improve extractUnloadedFilesFromS3 flow to support different bucket n…
KSDaemon Sep 26, 2024
95493ce
add snowflake test snapshots for export bucket test
KSDaemon Sep 26, 2024
0a21132
add clickhouse test snapshots for export bucket test
KSDaemon Sep 26, 2024
6ffa16b
update databricks with bucket tests snapshots
KSDaemon Sep 26, 2024
4e31cf4
rename athena test to reflect that it uses export bucket
KSDaemon Sep 26, 2024
7b0de55
reformat CLOUD_DATABASES in drivers tests CI job + add clickhouse-exp…
KSDaemon Sep 26, 2024
ff50876
add snowflake export bucket to azure driver test
KSDaemon Sep 26, 2024
a30d4b3
add databricks export bucket to azure driver test
KSDaemon Sep 26, 2024
68e660a
add snowflake export bucket to azure via storage integration driver test
KSDaemon Sep 26, 2024
6431e2a
add snowflake export bucket to gcs via storage integration driver test
KSDaemon Sep 26, 2024
3c64607
improve/refactor exported files filtering in extractFilesFromAzure()
KSDaemon Sep 26, 2024
cd3507d
set env secrets for drivers tests
KSDaemon Sep 26, 2024
be8e473
Fix createExportBucket in Snowflake (making sasToken optional)
KSDaemon Sep 26, 2024
6482174
Fix CUBEJS_DB_EXPORT_BUCKET env var for tests
KSDaemon Sep 26, 2024
347cfc9
Remove includeIncrementalSchemaSuite from every test suite besides on…
KSDaemon Sep 26, 2024
27e0d8c
Fix CUBEJS_DB_EXPORT_BUCKET env var for tests in databricks
KSDaemon Sep 26, 2024
fea2f7f
fix databricks export to azure flow + align test config
KSDaemon Sep 27, 2024
1b714d5
Fix blob object filtering during unloading in azure
KSDaemon Sep 27, 2024
67ea07a
sync @google-cloud/storage package used across packages
KSDaemon Sep 27, 2024
277b996
yarn lock sync
KSDaemon Sep 27, 2024
eb2aaf4
remove not needed includeIncrementalSchemaSuite in some tests
KSDaemon Sep 27, 2024
0d02a3a
rename bigquery → bigquery-export-bucket-gcs to align with the rest t…
KSDaemon Sep 27, 2024
17ed037
Add comments for exporting urls from S3 in BigQueryDriver
KSDaemon Sep 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions .github/workflows/drivers-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,18 @@ jobs:
needs: [latest-tag-sha, build]
if: (needs['latest-tag-sha'].outputs.sha != github.sha)
env:
CLOUD_DATABASES: athena bigquery databricks-jdbc databricks-jdbc-export-bucket snowflake
CLOUD_DATABASES: >
athena-export-bucket-s3
bigquery-export-bucket-gcs
clickhouse-export-bucket-s3
databricks-jdbc
databricks-jdbc-export-bucket-s3
databricks-jdbc-export-bucket-azure
snowflake
snowflake-export-bucket-s3
snowflake-export-bucket-azure
snowflake-export-bucket-azure-via-storage-integration
snowflake-export-bucket-gcs
# As per docs:
# Secrets cannot be directly referenced in if: conditionals. Instead, consider setting
# secrets as job-level environment variables, then referencing the environment variables
Expand All @@ -186,15 +197,21 @@ jobs:
node:
- 20.x
database:
- athena
- bigquery
- athena-export-bucket-s3
- bigquery-export-bucket-gcs
- clickhouse
- clickhouse-export-bucket-s3
- databricks-jdbc
- databricks-jdbc-export-bucket
- databricks-jdbc-export-bucket-s3
- databricks-jdbc-export-bucket-azure
- mssql
- mysql
- postgres
- snowflake
- snowflake-export-bucket-s3
- snowflake-export-bucket-azure
- snowflake-export-bucket-azure-via-storage-integration
- snowflake-export-bucket-gcs
fail-fast: false

steps:
Expand Down Expand Up @@ -258,6 +275,13 @@ jobs:
# BigQuery
DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_BQ_CREDENTIALS }}

#GCS
DRIVERS_TESTS_CUBEJS_DB_EXPORT_GCS_CREDENTIALS: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_GCS_CREDENTIALS }}

# Azure
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_KEY: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_KEY }}
DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN }}

# Databricks
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_URL }}
DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN: ${{ secrets.DRIVERS_TESTS_CUBEJS_DB_DATABRICKS_TOKEN }}
Expand Down
2 changes: 0 additions & 2 deletions packages/cubejs-athena-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
"types": "dist/src/index.d.ts",
"dependencies": {
"@aws-sdk/client-athena": "^3.22.0",
"@aws-sdk/client-s3": "^3.49.0",
"@aws-sdk/s3-request-presigner": "^3.49.0",
"@cubejs-backend/base-driver": "^0.36.0",
"@cubejs-backend/shared": "^0.36.0",
"sqlstring": "^2.3.1"
Expand Down
35 changes: 10 additions & 25 deletions packages/cubejs-athena-driver/src/AthenaDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import {
ColumnInfo,
StartQueryExecutionCommandInput,
} from '@aws-sdk/client-athena';
import { S3, GetObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import * as stream from 'stream';
import {
BaseDriver,
Expand Down Expand Up @@ -126,7 +124,7 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
getEnv('athenaAwsSecret', { dataSource });

const { schema, ...restConfig } = config;

this.schema = schema ||
getEnv('dbName', { dataSource }) ||
getEnv('dbSchema', { dataSource });
Expand Down Expand Up @@ -438,31 +436,18 @@ export class AthenaDriver extends BaseDriver implements DriverInterface {
* Returns an array of signed URLs of the unloaded csv files.
*/
private async getCsvFiles(tableName: string): Promise<string[]> {
const client = new S3({
credentials: this.config.credentials,
region: this.config.region,
});
const { bucket, prefix } = AthenaDriver.splitS3Path(
`${this.config.exportBucket}/${tableName}`
);
const list = await client.listObjectsV2({
Bucket: bucket,
Prefix: prefix.slice(1), // skip leading
});
if (!list.Contents) {
return [];
} else {
const files = await Promise.all(
list.Contents.map(async (file) => {
const command = new GetObjectCommand({
Bucket: bucket,
Key: file.Key,
});
return getSignedUrl(client, command, { expiresIn: 3600 });
})
);
return files;
}

return this.extractUnloadedFilesFromS3(
{
credentials: this.config.credentials,
region: this.config.region,
},
bucket,
prefix.slice(1),
);
}

public informationSchemaQuery() {
Expand Down
21 changes: 17 additions & 4 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ const variables: Record<string, (...args: any) => any> = {
),

/**
* AWS Key for the AWS based export bucket srorage.
* AWS Key for the AWS based export bucket storage.
*/
dbExportBucketAwsKey: ({
dataSource,
Expand All @@ -731,7 +731,7 @@ const variables: Record<string, (...args: any) => any> = {
),

/**
* AWS Secret for the AWS based export bucket srorage.
* AWS Secret for the AWS based export bucket storage.
*/
dbExportBucketAwsSecret: ({
dataSource,
Expand All @@ -744,7 +744,7 @@ const variables: Record<string, (...args: any) => any> = {
),

/**
* AWS Region for the AWS based export bucket srorage.
* AWS Region for the AWS based export bucket storage.
*/
dbExportBucketAwsRegion: ({
dataSource,
Expand All @@ -757,7 +757,7 @@ const variables: Record<string, (...args: any) => any> = {
),

/**
* Azure Key for the Azure based export bucket srorage.
* Azure Key for the Azure based export bucket storage.
*/
dbExportBucketAzureKey: ({
dataSource,
Expand All @@ -769,6 +769,19 @@ const variables: Record<string, (...args: any) => any> = {
]
),

/**
* Azure SAS Token for the Azure based export bucket storage.
*/
dbExportAzureSasToken: ({
dataSource,
}: {
dataSource: string,
}) => (
process.env[
keyByDataSource('CUBEJS_DB_EXPORT_BUCKET_AZURE_SAS_TOKEN', dataSource)
]
),

/**
* Export bucket options for Integration based.
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/cubejs-base-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
"dependencies": {
"@aws-sdk/client-s3": "^3.49.0",
"@aws-sdk/s3-request-presigner": "^3.49.0",
"@azure/storage-blob": "^12.9.0",
"@cubejs-backend/shared": "^0.36.0",
"@google-cloud/storage": "^7.13.0",
"ramda": "^0.27.0"
},
"devDependencies": {
Expand Down
90 changes: 89 additions & 1 deletion packages/cubejs-base-driver/src/BaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ import { reduce } from 'ramda';
import fs from 'fs';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { S3, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3';
import { Storage } from '@google-cloud/storage';
import {
BlobServiceClient,
StorageSharedKeyCredential,
ContainerSASPermissions,
SASProtocol,
generateBlobSASQueryParameters,
} from '@azure/storage-blob';

import { cancelCombinator } from './utils';
import {
Expand All @@ -44,6 +52,15 @@ import {
ForeignKeysQueryResult,
} from './driver.interface';

export type AzureStorageClientConfig = {
azureKey: string,
sasToken?: string,
};

export type GoogleStorageClientConfig = {
credentials: any,
};

const sortByKeys = (unordered: any) => {
const ordered: any = {};

Expand Down Expand Up @@ -467,7 +484,7 @@ export abstract class BaseDriver implements DriverInterface {
const conditionString = conditions.join(' OR ');

const query = this.getColumnsForSpecificTablesQuery(conditionString);

const [primaryKeys, foreignKeys] = await Promise.all([
this.primaryKeys(conditionString, parameters),
this.foreignKeys(conditionString, parameters)
Expand Down Expand Up @@ -648,6 +665,10 @@ export abstract class BaseDriver implements DriverInterface {
prefix: string
): Promise<string[]> {
const storage = new S3(clientOptions);
// It looks that different driver configurations use different formats
// for the bucket - some expect only names, some - full url-like names.
// So we unify this.
bucketName = bucketName.replace(/^[a-zA-Z]+:\/\//, '');

const list = await storage.listObjectsV2({
Bucket: bucketName,
Expand All @@ -672,4 +693,71 @@ export abstract class BaseDriver implements DriverInterface {

throw new Error('Unable to retrieve list of files from S3 storage after unloading.');
}

/**
* Returns an array of signed GCS URLs of the unloaded csv files.
*/
protected async extractFilesFromGCS(
gcsConfig: GoogleStorageClientConfig,
bucketName: string,
tableName: string
): Promise<string[]> {
const storage = new Storage({
credentials: gcsConfig.credentials,
projectId: gcsConfig.credentials.project_id
});
const bucket = storage.bucket(bucketName);
const [files] = await bucket.getFiles({ prefix: `${tableName}/` });
if (files.length) {
const csvFile = await Promise.all(files.map(async (file) => {
const [url] = await file.getSignedUrl({
action: 'read',
expires: new Date(new Date().getTime() + 60 * 60 * 1000)
});
return url;
}));
return csvFile;
} else {
return [];
}
}

protected async extractFilesFromAzure(
azureConfig: AzureStorageClientConfig,
bucketName: string,
tableName: string
): Promise<string[]> {
const parts = bucketName.split('.blob.core.windows.net/');
const account = parts[0];
const container = parts[1].split('/')[0];
const credential = new StorageSharedKeyCredential(account, azureConfig.azureKey);
const url = `https://${account}.blob.core.windows.net`;
const blobServiceClient = azureConfig.sasToken ?
new BlobServiceClient(`${url}?${azureConfig.sasToken}`) :
new BlobServiceClient(url, credential);

const csvFiles: string[] = [];
const containerClient = blobServiceClient.getContainerClient(container);
const blobsList = containerClient.listBlobsFlat({ prefix: `${tableName}/` });
for await (const blob of blobsList) {
if (blob.name && (blob.name.endsWith('.csv.gz') || blob.name.endsWith('.csv'))) {
const sas = generateBlobSASQueryParameters(
{
containerName: container,
blobName: blob.name,
permissions: ContainerSASPermissions.parse('r'),
startsOn: new Date(new Date().valueOf()),
expiresOn:
new Date(new Date().valueOf() + 1000 * 60 * 60),
protocol: SASProtocol.Https,
version: '2020-08-04',
},
credential,
).toString();
csvFiles.push(`${url}/${container}/${blob.name}?${sas}`);
}
}

return csvFiles;
}
}
2 changes: 1 addition & 1 deletion packages/cubejs-bigquery-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"@cubejs-backend/dotenv": "^9.0.2",
"@cubejs-backend/shared": "^0.36.0",
"@google-cloud/bigquery": "^7.7.0",
"@google-cloud/storage": "^7.11.1",
"@google-cloud/storage": "^7.13.0",
"ramda": "^0.27.2"
},
"devDependencies": {
Expand Down
5 changes: 5 additions & 0 deletions packages/cubejs-bigquery-driver/src/BigQueryDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ export class BigQueryDriver extends BaseDriver implements DriverInterface {
const bigQueryTable = this.bigquery.dataset(schema).table(tableName);
const [job] = await bigQueryTable.createExtractJob(destination, { format: 'CSV', gzip: true });
await this.waitForJobResult(job, { table }, false);
// There is an implementation for extracting and signing urls from S3
// @see BaseDriver->extractUnloadedFilesFromS3()
// Please use that if you need. Here is a different flow
// because bigquery requires storage/bucket object for other things,
// and there is no need to initiate another one (created in extractUnloadedFilesFromS3()).
const [files] = await this.bucket.getFiles({ prefix: `${table}-` });
const urls = await Promise.all(files.map(async file => {
const [url] = await file.getSignedUrl({
Expand Down
3 changes: 0 additions & 3 deletions packages/cubejs-databricks-jdbc-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
"bin"
],
"dependencies": {
"@aws-sdk/client-s3": "^3.49.0",
"@aws-sdk/s3-request-presigner": "^3.49.0",
"@azure/storage-blob": "^12.9.0",
"@cubejs-backend/base-driver": "^0.36.0",
"@cubejs-backend/jdbc-driver": "^0.36.0",
"@cubejs-backend/schema-compiler": "^0.36.2",
Expand Down
Loading
Loading