Skip to content

Commit

Permalink
Update volume ingestion e2e tests
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Feb 17, 2024
1 parent 57be9bf commit 5d5d202
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 45 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ jobs:
E2E_PATH: ${{ secrets.TEST_PECO_WAREHOUSE_HTTP_PATH }}
E2E_ACCESS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
E2E_TABLE_SUFFIX: ${{github.sha}}
E2E_CATALOG: peco
E2E_SCHEMA: default
E2E_VOLUME: e2etests
cache-name: cache-node-modules
NYC_REPORT_DIR: coverage_e2e

Expand Down
4 changes: 4 additions & 0 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,10 @@ export default class DBSQLSession implements IDBSQLSession {
const agent = await connectionProvider.getAgent();

const response = await fetch(presignedUrl, { method: 'DELETE', headers, agent });
// Looks that AWS and Azure have a different behavior of HTTP `DELETE` for non-existing files
// AWS assumes that - since file already doesn't exist - the goal is achieved, and returns HTTP 200
// Azure, on the other hand, is somewhat stricter and check if file exists before deleting it. And if
// file doesn't exist - Azure returns HTTP 404
if (!response.ok) {
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
}
Expand Down
1 change: 0 additions & 1 deletion tests/e2e/staging/.gitignore

This file was deleted.

124 changes: 82 additions & 42 deletions tests/e2e/staging_ingestion.test.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,61 @@
const { expect, AssertionError } = require('chai');
const { expect } = require('chai');
const fs = require('fs');
const path = require('path');
const os = require('os');
const uuid = require('uuid');
const config = require('./utils/config');
const { DBSQLClient } = require('../..');
const fs = require('fs');
const StagingError = require('../../dist/errors/StagingError').default;

describe('Staging Test', () => {
const catalog = config.database[0];
const schema = config.database[1];
const volume = config.volume;

const localPath = fs.mkdtempSync(path.join(os.tmpdir(), 'databricks-sql-tests-'));

before(() => {
expect(catalog).to.not.be.undefined;
expect(schema).to.not.be.undefined;
expect(volume).to.not.be.undefined;
});

after(() => {
fs.rmSync(localPath, {
recursive: true,
force: true,
});
});

it('put staging data and receive it', async () => {
const client = new DBSQLClient();
await client.connect({
host: config.host,
path: config.path,
token: config.token,
});
let tempPath = 'tests/e2e/staging/data';
fs.writeFileSync(tempPath, 'Hello World!');

const session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
initialCatalog: catalog,
initialSchema: schema,
});

const expectedData = 'Hello World!';
const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`;
const localFile = path.join(localPath, `${uuid.v4()}.csv`);

fs.writeFileSync(localFile, expectedData);
await session.executeStatement(`PUT '${localFile}' INTO '${stagingFileName}' OVERWRITE`, {
stagingAllowedLocalPath: [localPath],
});
fs.rmSync(localFile);

await session.executeStatement(`GET '${stagingFileName}' TO '${localFile}'`, {
stagingAllowedLocalPath: [localPath],
});
await session.executeStatement(
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
await session.executeStatement(
`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
let result = fs.readFileSync('tests/e2e/staging/file');
expect(result.toString() === 'Hello World!').to.be.true;
const result = fs.readFileSync(localFile);
fs.rmSync(localFile);
expect(result.toString() === expectedData).to.be.true;
});

it('put staging data and remove it', async () => {
Expand All @@ -37,31 +65,37 @@ describe('Staging Test', () => {
path: config.path,
token: config.token,
});
let tempPath = 'tests/e2e/staging/data';
fs.writeFileSync(tempPath, (data = 'Hello World!'));

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
const session = await client.openSession({
initialCatalog: catalog,
initialSchema: schema,
});
await session.executeStatement(
`PUT '${tempPath}' INTO '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' OVERWRITE`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
await session.executeStatement(`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv'`, {
stagingAllowedLocalPath: ['tests/e2e/staging'],

const expectedData = 'Hello World!';
const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`;
const localFile = path.join(localPath, `${uuid.v4()}.csv`);

fs.writeFileSync(localFile, expectedData);
await session.executeStatement(`PUT '${localFile}' INTO '${stagingFileName}' OVERWRITE`, {
stagingAllowedLocalPath: [localPath],
});
fs.rmSync(localFile);

await session.executeStatement(`REMOVE '${stagingFileName}'`, { stagingAllowedLocalPath: [localPath] });

try {
await session.executeStatement(
`GET '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/file1.csv' TO 'tests/e2e/staging/file'`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);
await session.executeStatement(`GET '${stagingFileName}' TO '${localFile}'`, {
stagingAllowedLocalPath: [localPath],
});
} catch (error) {
if (error instanceof AssertionError) {
if (error instanceof StagingError) {
// File should not exist after deleting
expect(error.message).to.contain('404');
} else {
throw error;
}
expect(error.message).to.contain('404'); // File should not exist after deleting
} finally {
fs.rmSync(localFile, { force: true });
}
});

Expand All @@ -72,16 +106,22 @@ describe('Staging Test', () => {
path: config.path,
token: config.token,
});
let tempPath = 'tests/e2e/staging/data';
fs.writeFileSync(tempPath, (data = 'Hello World!'));

let session = await client.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
const session = await client.openSession({
initialCatalog: catalog,
initialSchema: schema,
});
await session.executeStatement(
`REMOVE '/Volumes/${config.database[0]}/${config.database[1]}/e2etests/non_existing.csv'`,
{ stagingAllowedLocalPath: ['tests/e2e/staging'] },
);

const stagingFileName = `/Volumes/${catalog}/${schema}/${volume}/${uuid.v4()}.csv`;

try {
await session.executeStatement(`REMOVE '${stagingFileName}'`, { stagingAllowedLocalPath: [localPath] });
} catch (error) {
if (error instanceof StagingError) {
expect(error.message).to.contain('404');
} else {
throw error;
}
}
});
});
6 changes: 4 additions & 2 deletions tests/e2e/utils/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ try {
} catch (e) {}

const catalog = process.env.E2E_CATALOG || undefined;
const database = process.env.E2E_DATABASE || undefined;
const schema = process.env.E2E_SCHEMA || undefined;

// Create file named `config.local.js` in the same directory and override config there
module.exports = {
Expand All @@ -17,7 +17,9 @@ module.exports = {
// Access token: dapi********************************
token: process.env.E2E_ACCESS_TOKEN,
// Catalog and database to use for testing; specify both or leave array empty to use defaults
database: catalog || database ? [catalog, database] : [],
database: catalog || schema ? [catalog, schema] : [],
// Volume to use for testing
volume: process.env.E2E_VOLUME,
// Suffix used for tables that will be created during tests
tableSuffix: process.env.E2E_TABLE_SUFFIX,
...overrides,
Expand Down

0 comments on commit 5d5d202

Please sign in to comment.