Skip to content

Commit

Permalink
switch Databricks Driver to use extractUnloadedFilesFromS3() and extr…
Browse files Browse the repository at this point in the history
…actFilesFromAzure() from BaseDvier impl
  • Loading branch information
KSDaemon committed Sep 25, 2024
1 parent f09ae8e commit b64ce0a
Showing 1 changed file with 18 additions and 104 deletions.
122 changes: 18 additions & 104 deletions packages/cubejs-databricks-jdbc-driver/src/DatabricksDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,7 @@ export class DatabricksDriver extends JDBCDriver {
options.query.params,
)
: await this.unloadWithTable(tableFullName);
const pathname = `${this.config.exportBucket}/${tableFullName}.csv`;
const csvFile = await this.getCsvFiles(pathname);
const csvFile = await this.getCsvFiles(tableFullName);
return {
exportBucketCsvEscapeSymbol: this.config.exportBucketCsvEscapeSymbol,
csvFile,
Expand Down Expand Up @@ -667,117 +666,32 @@ export class DatabricksDriver extends JDBCDriver {
* Returns an array of signed URLs of unloaded csv files.
*/
private async getCsvFiles(
pathname: string,
tableName: string,
): Promise<string[]> {
let res;
switch (this.config.bucketType) {
case 'azure':
res = await this.getSignedAzureUrls(pathname);
break;
return this.extractFilesFromAzure(
{ azureKey: this.config.azureKey || '' },
this.config.exportBucket || '',
tableName,
);
case 's3':
res = await this.getSignedS3Urls(pathname);
break;
return this.extractUnloadedFilesFromS3(
{
credentials: {
accessKeyId: this.config.awsKey || '',
secretAccessKey: this.config.awsSecret || '',
},
region: this.config.awsRegion || '',
},
this.config.exportBucket || '',
tableName,
);
default:
throw new Error(`Unsupported export bucket type: ${
this.config.bucketType
}`);
}
return res;
}

/**
* Returns Azure signed URLs of unloaded scv files.
*/
private async getSignedAzureUrls(
pathname: string,
): Promise<string[]> {
const csvFile: string[] = [];
const [container, account] =
pathname.split('wasbs://')[1].split('.blob')[0].split('@');
const foldername =
pathname.split(`${this.config.exportBucket}/`)[1];
const expr = new RegExp(`${foldername}\\/.*\\.csv$`, 'i');

const credential = new StorageSharedKeyCredential(
account,
this.config.azureKey as string,
);
const blobClient = new BlobServiceClient(
`https://${account}.blob.core.windows.net`,
credential,
);
const containerClient = blobClient.getContainerClient(container);
const blobsList = containerClient.listBlobsFlat({ prefix: foldername });
for await (const blob of blobsList) {
if (blob.name && expr.test(blob.name)) {
const sas = generateBlobSASQueryParameters(
{
containerName: container,
blobName: blob.name,
permissions: ContainerSASPermissions.parse('r'),
startsOn: new Date(new Date().valueOf()),
expiresOn:
new Date(new Date().valueOf() + 1000 * 60 * 60),
protocol: SASProtocol.Https,
version: '2020-08-04',
},
credential,
).toString();
csvFile.push(`https://${
account
}.blob.core.windows.net/${
container
}/${blob.name}?${sas}`);
}
}
if (csvFile.length === 0) {
throw new Error('No CSV files were exported to the specified bucket. ' +
'Please check your export bucket configuration.');
}
return csvFile;
}

/**
* Returns S3 signed URLs of unloaded scv files.
*/
private async getSignedS3Urls(
pathname: string,
): Promise<string[]> {
const client = new S3({
credentials: {
accessKeyId: this.config.awsKey as string,
secretAccessKey: this.config.awsSecret as string,
},
region: this.config.awsRegion,
});

const url = new URL(pathname);
const list = await client.listObjectsV2({
Bucket: url.host,
Prefix: url.pathname.slice(1),
});

if (list.Contents === undefined) {
throw new Error(`No content in specified path: ${pathname}`);
}

const csvFile = await Promise.all(
list.Contents
.filter(file => file.Key && /.csv$/i.test(file.Key))
.map(async (file) => {
const command = new GetObjectCommand({
Bucket: url.host,
Key: file.Key,
});
return getSignedUrl(client, command, { expiresIn: 3600 });
})
);
if (csvFile.length === 0) {
throw new Error('No CSV files were exported to the specified bucket. ' +
'Please check your export bucket configuration.');
}

return csvFile;
}

protected generateTableColumnsForExport(columns: ColumnInfo[]): string {
Expand Down

0 comments on commit b64ce0a

Please sign in to comment.