Skip to content

Commit

Permalink
Refine the code and update tests
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Oct 30, 2023
1 parent 3da3e4a commit 6ada0db
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 135 deletions.
25 changes: 10 additions & 15 deletions lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,17 @@ export default class ArrowResultHandler implements IResultsProvider<Array<any>>
this.arrowSchema = arrowSchema;
}

async hasMore() {
public async hasMore() {
return this.source.hasMore();
}

async fetchNext(options: ResultsProviderFetchNextOptions) {
const data = await this.source.fetchNext(options);
return this.getValue(data ? [data] : []);
}

async getValue(data?: Array<TRowSet>) {
if (this.schema.length === 0 || !this.arrowSchema || !data) {
public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (this.schema.length === 0 || !this.arrowSchema) {
return [];
}

const data = await this.source.fetchNext(options);

const batches = await this.getBatches(data);
if (batches.length === 0) {
return [];
Expand All @@ -65,15 +62,13 @@ export default class ArrowResultHandler implements IResultsProvider<Array<any>>
return this.getRows(table.schema, table.toArray());
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
protected async getBatches(rowSet?: TRowSet): Promise<Array<Buffer>> {
const result: Array<Buffer> = [];

data.forEach((rowSet) => {
rowSet.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
result.push(arrowBatch.batch);
}
});
rowSet?.arrowBatches?.forEach((arrowBatch) => {
if (arrowBatch.batch) {
result.push(arrowBatch.batch);
}
});

return result;
Expand Down
10 changes: 4 additions & 6 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@ export default class CloudFetchResultHandler extends ArrowResultHandler {
super(context, source, schema, Buffer.alloc(0));
}

async hasMore() {
public async hasMore() {
if (this.pendingLinks.length > 0 || this.downloadedBatches.length > 0) {
return true;
}
return super.hasMore();
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
data.forEach((item) => {
item.resultLinks?.forEach((link) => {
this.pendingLinks.push(link);
});
protected async getBatches(data?: TRowSet): Promise<Array<Buffer>> {
data?.resultLinks?.forEach((link) => {
this.pendingLinks.push(link);
});

if (this.downloadedBatches.length === 0) {
Expand Down
22 changes: 9 additions & 13 deletions lib/result/JsonResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,22 @@ export default class JsonResultHandler implements IResultsProvider<Array<any>> {
this.schema = getSchemaColumns(schema);
}

async hasMore() {
public async hasMore() {
return this.source.hasMore();
}

async fetchNext(options: ResultsProviderFetchNextOptions) {
const data = await this.source.fetchNext(options);
return this.getValue(data ? [data] : []);
}

async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
if (this.schema.length === 0 || !data) {
public async fetchNext(options: ResultsProviderFetchNextOptions) {
if (this.schema.length === 0) {
return [];
}

return data.reduce((result: Array<any>, rowSet: TRowSet) => {
const columns = rowSet.columns || [];
const rows = this.getRows(columns, this.schema);
const data = await this.source.fetchNext(options);
if (!data) {
return [];
}

return result.concat(rows);
}, []);
const columns = data.columns || [];
return this.getRows(columns, this.schema);
}

private getRows(columns: Array<TColumn>, descriptors: Array<TColumnDesc>): Array<any> {
Expand Down
9 changes: 7 additions & 2 deletions tests/e2e/arrow.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { expect } = require('chai');
const sinon = require('sinon');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
Expand Down Expand Up @@ -132,12 +133,16 @@ describe('Arrow support', () => {
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

const rawData = await operation._data.fetchNext({ limit: rowsCount });
sinon.spy(operation._data, 'fetchNext');

const result = await resultHandler.fetchNext({ limit: rowsCount });

expect(operation._data.fetchNext.callCount).to.be.eq(1);
const rawData = await operation._data.fetchNext.firstCall.returnValue;
// We don't know exact count of batches returned, it depends on server's configuration,
// but with much enough rows there should be more than one result batch
expect(rawData.arrowBatches?.length).to.be.gt(1);

const result = await resultHandler.getValue([rawData]);
expect(result.length).to.be.eq(rowsCount);
});
});
46 changes: 32 additions & 14 deletions tests/unit/result/ArrowResultHandler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,42 +88,60 @@ const rowSetAllNulls = {
describe('ArrowResultHandler', () => {
it('should not buffer any data', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const rowSetProvider = new RowSetProviderMock([sampleRowSet1]);
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
await result.getValue([sampleRowSet1]);
expect(await rowSetProvider.hasMore()).to.be.true;
expect(await result.hasMore()).to.be.true;

await result.fetchNext({ limit: 10000 });
expect(await rowSetProvider.hasMore()).to.be.false;
expect(await result.hasMore()).to.be.false;
});

it('should convert data', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.getValue([sampleRowSet1])).to.be.deep.eq([]);
expect(await result.getValue([sampleRowSet2])).to.be.deep.eq([]);
expect(await result.getValue([sampleRowSet3])).to.be.deep.eq([]);
expect(await result.getValue([sampleRowSet4])).to.be.deep.eq([{ 1: 1 }]);

case1: {
const rowSetProvider = new RowSetProviderMock([sampleRowSet1]);
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]);
}
case2: {
const rowSetProvider = new RowSetProviderMock([sampleRowSet2]);
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]);
}
case3: {
const rowSetProvider = new RowSetProviderMock([sampleRowSet3]);
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]);
}
case4: {
const rowSetProvider = new RowSetProviderMock([sampleRowSet4]);
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([{ 1: 1 }]);
}
});

it('should return empty array if no data to process', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.getValue()).to.be.deep.eq([]);
expect(await result.getValue([])).to.be.deep.eq([]);
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]);
});

it('should return empty array if no schema available', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const rowSetProvider = new RowSetProviderMock([sampleRowSet4]);
const result = new ArrowResultHandler(context, rowSetProvider);
expect(await result.getValue([sampleRowSet4])).to.be.deep.eq([]);
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]);
});

it('should detect nulls', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const rowSetProvider = new RowSetProviderMock([rowSetAllNulls]);
const result = new ArrowResultHandler(context, rowSetProvider, thriftSchemaAllNulls, arrowSchemaAllNulls);
expect(await result.getValue([rowSetAllNulls])).to.be.deep.eq([
expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([
{
boolean_field: null,

Expand Down
65 changes: 38 additions & 27 deletions tests/unit/result/CloudFetchResultHandler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ describe('CloudFetchResultHandler', () => {
it('should report pending data if there are any', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema);

case1: {
result.pendingLinks = [];
Expand All @@ -136,21 +136,26 @@ describe('CloudFetchResultHandler', () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();

const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema);

sinon.stub(result, 'fetch').returns(
Promise.resolve({
ok: true,
status: 200,
statusText: 'OK',
arrayBuffer: async () => sampleArrowBatch,
arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]),
}),
);

const rowSets = [sampleRowSet1, sampleEmptyRowSet, sampleRowSet2];
const expectedLinksCount = rowSets.reduce((prev, item) => prev + (item.resultLinks?.length ?? 0), 0);

const batches = await result.getBatches(rowSets);
const batches = [];
for (const rowSet of rowSets) {
const items = await result.getBatches(rowSet);
batches.push(...items);
}

expect(batches.length).to.be.equal(0);
expect(result.fetch.called).to.be.false;
expect(result.pendingLinks.length).to.be.equal(expectedLinksCount);
Expand All @@ -160,43 +165,53 @@ describe('CloudFetchResultHandler', () => {
globalConfig.cloudFetchConcurrentDownloads = 2;

const context = {};
const rowSetProvider = new RowSetProviderMock();
const rowSet = {
startRowOffset: 0,
resultLinks: [...sampleRowSet1.resultLinks, ...sampleRowSet2.resultLinks],
};
const expectedLinksCount = rowSet.resultLinks.length;
const rowSetProvider = new RowSetProviderMock([rowSet]);

const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema);

sinon.stub(result, 'fetch').returns(
Promise.resolve({
ok: true,
status: 200,
statusText: 'OK',
arrayBuffer: async () => sampleArrowBatch,
arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]),
}),
);

const rowSets = [sampleRowSet1, sampleRowSet2];
const expectedLinksCount = rowSets.reduce((prev, item) => prev + (item.resultLinks?.length ?? 0), 0);
expect(await rowSetProvider.hasMore()).to.be.true;

initialFetch: {
const batches = await result.getBatches(rowSets);
expect(batches.length).to.be.equal(1);
const items = await result.fetchNext({ limit: 10000 });
expect(items.length).to.be.gt(0);
expect(await rowSetProvider.hasMore()).to.be.false;

expect(result.fetch.callCount).to.be.equal(globalConfig.cloudFetchConcurrentDownloads);
expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - globalConfig.cloudFetchConcurrentDownloads);
expect(result.downloadedBatches.length).to.be.equal(globalConfig.cloudFetchConcurrentDownloads - 1);
}

secondFetch: {
// It should return previously fetched batch, not performing additional network requests
const batches = await result.getBatches([]);
expect(batches.length).to.be.equal(1);
const items = await result.fetchNext({ limit: 10000 });
expect(items.length).to.be.gt(0);
expect(await rowSetProvider.hasMore()).to.be.false;

expect(result.fetch.callCount).to.be.equal(globalConfig.cloudFetchConcurrentDownloads); // no new fetches
expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - globalConfig.cloudFetchConcurrentDownloads);
expect(result.downloadedBatches.length).to.be.equal(globalConfig.cloudFetchConcurrentDownloads - 2);
}

thirdFetch: {
// Now buffer should be empty, and it should fetch next batches
const batches = await result.getBatches([]);
expect(batches.length).to.be.equal(1);
const items = await result.fetchNext({ limit: 10000 });
expect(items.length).to.be.gt(0);
expect(await rowSetProvider.hasMore()).to.be.false;

expect(result.fetch.callCount).to.be.equal(globalConfig.cloudFetchConcurrentDownloads * 2);
expect(result.pendingLinks.length).to.be.equal(
expectedLinksCount - globalConfig.cloudFetchConcurrentDownloads * 2,
Expand All @@ -209,23 +224,21 @@ describe('CloudFetchResultHandler', () => {
globalConfig.cloudFetchConcurrentDownloads = 1;

const context = {};
const rowSetProvider = new RowSetProviderMock();
const rowSetProvider = new RowSetProviderMock([sampleRowSet1]);

const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema);

sinon.stub(result, 'fetch').returns(
Promise.resolve({
ok: false,
status: 500,
statusText: 'Internal Server Error',
arrayBuffer: async () => sampleArrowBatch,
arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]),
}),
);

const rowSets = [sampleRowSet1];

try {
await result.getBatches(rowSets);
await result.fetchNext({ limit: 10000 });
expect.fail('It should throw an error');
} catch (error) {
if (error instanceof AssertionError) {
Expand All @@ -238,23 +251,21 @@ describe('CloudFetchResultHandler', () => {

it('should handle expired links', async () => {
const context = {};
const rowSetProvider = new RowSetProviderMock();
const rowSetProvider = new RowSetProviderMock([sampleExpiredRowSet]);

const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema);

sinon.stub(result, 'fetch').returns(
Promise.resolve({
ok: true,
status: 200,
statusText: 'OK',
arrayBuffer: async () => sampleArrowBatch,
arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]),
}),
);

const rowSets = [sampleExpiredRowSet];

try {
await result.getBatches(rowSets);
await result.fetchNext({ limit: 10000 });
expect.fail('It should throw an error');
} catch (error) {
if (error instanceof AssertionError) {
Expand Down
Loading

0 comments on commit 6ada0db

Please sign in to comment.