From 6ada0dba6e41b9d56eebe2b9ed51bc310285d165 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Mon, 30 Oct 2023 12:32:09 +0200 Subject: [PATCH] Refine the code and update tests Signed-off-by: Levko Kravets --- lib/result/ArrowResultHandler.ts | 25 +++---- lib/result/CloudFetchResultHandler.ts | 10 ++- lib/result/JsonResultHandler.ts | 22 +++--- tests/e2e/arrow.test.js | 9 ++- tests/unit/result/ArrowResultHandler.test.js | 46 +++++++++---- .../result/CloudFetchResultHandler.test.js | 65 ++++++++++-------- tests/unit/result/JsonResultHandler.test.js | 67 +++++-------------- tests/unit/result/compatibility.test.js | 12 ++-- .../result/fixtures/RowSetProviderMock.js | 8 ++- 9 files changed, 129 insertions(+), 135 deletions(-) diff --git a/lib/result/ArrowResultHandler.ts b/lib/result/ArrowResultHandler.ts index 0e5058f7..a1076293 100644 --- a/lib/result/ArrowResultHandler.ts +++ b/lib/result/ArrowResultHandler.ts @@ -42,20 +42,17 @@ export default class ArrowResultHandler implements IResultsProvider> this.arrowSchema = arrowSchema; } - async hasMore() { + public async hasMore() { return this.source.hasMore(); } - async fetchNext(options: ResultsProviderFetchNextOptions) { - const data = await this.source.fetchNext(options); - return this.getValue(data ? [data] : []); - } - - async getValue(data?: Array) { - if (this.schema.length === 0 || !this.arrowSchema || !data) { + public async fetchNext(options: ResultsProviderFetchNextOptions) { + if (this.schema.length === 0 || !this.arrowSchema) { return []; } + const data = await this.source.fetchNext(options); + const batches = await this.getBatches(data); if (batches.length === 0) { return []; @@ -65,15 +62,13 @@ export default class ArrowResultHandler implements IResultsProvider> return this.getRows(table.schema, table.toArray()); } - protected async getBatches(data: Array): Promise> { + protected async getBatches(rowSet?: TRowSet): Promise> { const result: Array = []; - data.forEach((rowSet) => { - rowSet.arrowBatches?.forEach((arrowBatch) => { - if (arrowBatch.batch) { - result.push(arrowBatch.batch); - } - }); + rowSet?.arrowBatches?.forEach((arrowBatch) => { + if (arrowBatch.batch) { + result.push(arrowBatch.batch); + } }); return result; diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index 33c6eecd..a49e8714 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -17,18 +17,16 @@ export default class CloudFetchResultHandler extends ArrowResultHandler { super(context, source, schema, Buffer.alloc(0)); } - async hasMore() { + public async hasMore() { if (this.pendingLinks.length > 0 || this.downloadedBatches.length > 0) { return true; } return super.hasMore(); } - protected async getBatches(data: Array): Promise> { - data.forEach((item) => { - item.resultLinks?.forEach((link) => { - this.pendingLinks.push(link); - }); + protected async getBatches(data?: TRowSet): Promise> { + data?.resultLinks?.forEach((link) => { + this.pendingLinks.push(link); }); if (this.downloadedBatches.length === 0) { diff --git a/lib/result/JsonResultHandler.ts b/lib/result/JsonResultHandler.ts index 692438d5..bcc07e77 100644 --- a/lib/result/JsonResultHandler.ts +++ b/lib/result/JsonResultHandler.ts @@ -17,26 +17,22 @@ export default class JsonResultHandler implements IResultsProvider> { this.schema = getSchemaColumns(schema); } - async hasMore() { + public async hasMore() { return this.source.hasMore(); } - async fetchNext(options: ResultsProviderFetchNextOptions) { - const data = await this.source.fetchNext(options); - return this.getValue(data ? [data] : []); - } - - async getValue(data?: Array): Promise> { - if (this.schema.length === 0 || !data) { + public async fetchNext(options: ResultsProviderFetchNextOptions) { + if (this.schema.length === 0) { return []; } - return data.reduce((result: Array, rowSet: TRowSet) => { - const columns = rowSet.columns || []; - const rows = this.getRows(columns, this.schema); + const data = await this.source.fetchNext(options); + if (!data) { + return []; + } - return result.concat(rows); - }, []); + const columns = data.columns || []; + return this.getRows(columns, this.schema); } private getRows(columns: Array, descriptors: Array): Array { diff --git a/tests/e2e/arrow.test.js b/tests/e2e/arrow.test.js index a2612c6b..4118a116 100644 --- a/tests/e2e/arrow.test.js +++ b/tests/e2e/arrow.test.js @@ -1,4 +1,5 @@ const { expect } = require('chai'); +const sinon = require('sinon'); const config = require('./utils/config'); const logger = require('./utils/logger')(config.logger); const { DBSQLClient } = require('../..'); @@ -132,12 +133,16 @@ describe('Arrow support', () => { const resultHandler = await operation.getResultHandler(); expect(resultHandler).to.be.instanceof(ArrowResultHandler); - const rawData = await operation._data.fetchNext({ limit: rowsCount }); + sinon.spy(operation._data, 'fetchNext'); + + const result = await resultHandler.fetchNext({ limit: rowsCount }); + + expect(operation._data.fetchNext.callCount).to.be.eq(1); + const rawData = await operation._data.fetchNext.firstCall.returnValue; // We don't know exact count of batches returned, it depends on server's configuration, // but with much enough rows there should be more than one result batch expect(rawData.arrowBatches?.length).to.be.gt(1); - const result = await resultHandler.getValue([rawData]); expect(result.length).to.be.eq(rowsCount); }); }); diff --git a/tests/unit/result/ArrowResultHandler.test.js b/tests/unit/result/ArrowResultHandler.test.js index 5a59c3e6..03cdb5e1 100644 --- a/tests/unit/result/ArrowResultHandler.test.js +++ b/tests/unit/result/ArrowResultHandler.test.js @@ -88,42 +88,60 @@ const rowSetAllNulls = { describe('ArrowResultHandler', () => { it('should not buffer any data', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock([sampleRowSet1]); const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); - await result.getValue([sampleRowSet1]); + expect(await rowSetProvider.hasMore()).to.be.true; + expect(await result.hasMore()).to.be.true; + + await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; expect(await result.hasMore()).to.be.false; }); it('should convert data', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); - const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); - expect(await result.getValue([sampleRowSet1])).to.be.deep.eq([]); - expect(await result.getValue([sampleRowSet2])).to.be.deep.eq([]); - expect(await result.getValue([sampleRowSet3])).to.be.deep.eq([]); - expect(await result.getValue([sampleRowSet4])).to.be.deep.eq([{ 1: 1 }]); + + case1: { + const rowSetProvider = new RowSetProviderMock([sampleRowSet1]); + const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + } + case2: { + const rowSetProvider = new RowSetProviderMock([sampleRowSet2]); + const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + } + case3: { + const rowSetProvider = new RowSetProviderMock([sampleRowSet3]); + const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + } + case4: { + const rowSetProvider = new RowSetProviderMock([sampleRowSet4]); + const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([{ 1: 1 }]); + } }); it('should return empty array if no data to process', async () => { const context = {}; const rowSetProvider = new RowSetProviderMock(); const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); - expect(await result.getValue()).to.be.deep.eq([]); - expect(await result.getValue([])).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); }); it('should return empty array if no schema available', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock([sampleRowSet4]); const result = new ArrowResultHandler(context, rowSetProvider); - expect(await result.getValue([sampleRowSet4])).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); }); it('should detect nulls', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock([rowSetAllNulls]); const result = new ArrowResultHandler(context, rowSetProvider, thriftSchemaAllNulls, arrowSchemaAllNulls); - expect(await result.getValue([rowSetAllNulls])).to.be.deep.eq([ + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { boolean_field: null, diff --git a/tests/unit/result/CloudFetchResultHandler.test.js b/tests/unit/result/CloudFetchResultHandler.test.js index aa517864..e0a48151 100644 --- a/tests/unit/result/CloudFetchResultHandler.test.js +++ b/tests/unit/result/CloudFetchResultHandler.test.js @@ -109,7 +109,7 @@ describe('CloudFetchResultHandler', () => { it('should report pending data if there are any', async () => { const context = {}; const rowSetProvider = new RowSetProviderMock(); - const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema); case1: { result.pendingLinks = []; @@ -136,21 +136,26 @@ describe('CloudFetchResultHandler', () => { const context = {}; const rowSetProvider = new RowSetProviderMock(); - const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema); sinon.stub(result, 'fetch').returns( Promise.resolve({ ok: true, status: 200, statusText: 'OK', - arrayBuffer: async () => sampleArrowBatch, + arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]), }), ); const rowSets = [sampleRowSet1, sampleEmptyRowSet, sampleRowSet2]; const expectedLinksCount = rowSets.reduce((prev, item) => prev + (item.resultLinks?.length ?? 0), 0); - const batches = await result.getBatches(rowSets); + const batches = []; + for (const rowSet of rowSets) { + const items = await result.getBatches(rowSet); + batches.push(...items); + } + expect(batches.length).to.be.equal(0); expect(result.fetch.called).to.be.false; expect(result.pendingLinks.length).to.be.equal(expectedLinksCount); @@ -160,25 +165,31 @@ describe('CloudFetchResultHandler', () => { globalConfig.cloudFetchConcurrentDownloads = 2; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSet = { + startRowOffset: 0, + resultLinks: [...sampleRowSet1.resultLinks, ...sampleRowSet2.resultLinks], + }; + const expectedLinksCount = rowSet.resultLinks.length; + const rowSetProvider = new RowSetProviderMock([rowSet]); - const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema); sinon.stub(result, 'fetch').returns( Promise.resolve({ ok: true, status: 200, statusText: 'OK', - arrayBuffer: async () => sampleArrowBatch, + arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]), }), ); - const rowSets = [sampleRowSet1, sampleRowSet2]; - const expectedLinksCount = rowSets.reduce((prev, item) => prev + (item.resultLinks?.length ?? 0), 0); + expect(await rowSetProvider.hasMore()).to.be.true; initialFetch: { - const batches = await result.getBatches(rowSets); - expect(batches.length).to.be.equal(1); + const items = await result.fetchNext({ limit: 10000 }); + expect(items.length).to.be.gt(0); + expect(await rowSetProvider.hasMore()).to.be.false; + expect(result.fetch.callCount).to.be.equal(globalConfig.cloudFetchConcurrentDownloads); expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - globalConfig.cloudFetchConcurrentDownloads); expect(result.downloadedBatches.length).to.be.equal(globalConfig.cloudFetchConcurrentDownloads - 1); @@ -186,8 +197,10 @@ describe('CloudFetchResultHandler', () => { secondFetch: { // It should return previously fetched batch, not performing additional network requests - const batches = await result.getBatches([]); - expect(batches.length).to.be.equal(1); + const items = await result.fetchNext({ limit: 10000 }); + expect(items.length).to.be.gt(0); + expect(await rowSetProvider.hasMore()).to.be.false; + expect(result.fetch.callCount).to.be.equal(globalConfig.cloudFetchConcurrentDownloads); // no new fetches expect(result.pendingLinks.length).to.be.equal(expectedLinksCount - globalConfig.cloudFetchConcurrentDownloads); expect(result.downloadedBatches.length).to.be.equal(globalConfig.cloudFetchConcurrentDownloads - 2); @@ -195,8 +208,10 @@ describe('CloudFetchResultHandler', () => { thirdFetch: { // Now buffer should be empty, and it should fetch next batches - const batches = await result.getBatches([]); - expect(batches.length).to.be.equal(1); + const items = await result.fetchNext({ limit: 10000 }); + expect(items.length).to.be.gt(0); + expect(await rowSetProvider.hasMore()).to.be.false; + expect(result.fetch.callCount).to.be.equal(globalConfig.cloudFetchConcurrentDownloads * 2); expect(result.pendingLinks.length).to.be.equal( expectedLinksCount - globalConfig.cloudFetchConcurrentDownloads * 2, @@ -209,23 +224,21 @@ describe('CloudFetchResultHandler', () => { globalConfig.cloudFetchConcurrentDownloads = 1; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock([sampleRowSet1]); - const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema); sinon.stub(result, 'fetch').returns( Promise.resolve({ ok: false, status: 500, statusText: 'Internal Server Error', - arrayBuffer: async () => sampleArrowBatch, + arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]), }), ); - const rowSets = [sampleRowSet1]; - try { - await result.getBatches(rowSets); + await result.fetchNext({ limit: 10000 }); expect.fail('It should throw an error'); } catch (error) { if (error instanceof AssertionError) { @@ -238,23 +251,21 @@ describe('CloudFetchResultHandler', () => { it('should handle expired links', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock([sampleExpiredRowSet]); - const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema); + const result = new CloudFetchResultHandler(context, rowSetProvider, sampleThriftSchema); sinon.stub(result, 'fetch').returns( Promise.resolve({ ok: true, status: 200, statusText: 'OK', - arrayBuffer: async () => sampleArrowBatch, + arrayBuffer: async () => Buffer.concat([sampleArrowSchema, sampleArrowBatch]), }), ); - const rowSets = [sampleExpiredRowSet]; - try { - await result.getBatches(rowSets); + await result.fetchNext({ limit: 10000 }); expect.fail('It should throw an error'); } catch (error) { if (error instanceof AssertionError) { diff --git a/tests/unit/result/JsonResultHandler.test.js b/tests/unit/result/JsonResultHandler.test.js index 0d819a4f..db6ff01d 100644 --- a/tests/unit/result/JsonResultHandler.test.js +++ b/tests/unit/result/JsonResultHandler.test.js @@ -40,10 +40,14 @@ describe('JsonResultHandler', () => { ]; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(data); const result = new JsonResultHandler(context, rowSetProvider, schema); - await result.getValue(data); + expect(await rowSetProvider.hasMore()).to.be.true; + expect(await result.hasMore()).to.be.true; + + await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; expect(await result.hasMore()).to.be.false; }); @@ -129,11 +133,11 @@ describe('JsonResultHandler', () => { ]; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(data); const result = new JsonResultHandler(context, rowSetProvider, schema); - expect(await result.getValue(data)).to.be.deep.eq([ + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { 'table.str': 'a', 'table.int64': 282578800148737, @@ -202,11 +206,11 @@ describe('JsonResultHandler', () => { ]; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(data); const result = new JsonResultHandler(context, rowSetProvider, schema); - expect(await result.getValue(data)).to.be.deep.eq([ + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { 'table.array': ['a', 'b'], 'table.map': { key: 12 }, @@ -222,41 +226,6 @@ describe('JsonResultHandler', () => { ]); }); - it('should merge data items', async () => { - const schema = { - columns: [getColumnSchema('table.id', TCLIService_types.TTypeId.STRING_TYPE, 1)], - }; - const data = [ - { - columns: [ - { - stringVal: { values: ['0', '1'] }, - }, - ], - }, - {}, // it should also handle empty sets - { - columns: [ - { - stringVal: { values: ['2', '3'] }, - }, - ], - }, - ]; - - const context = {}; - const rowSetProvider = new RowSetProviderMock(); - - const result = new JsonResultHandler(context, rowSetProvider, schema); - - expect(await result.getValue(data)).to.be.deep.eq([ - { 'table.id': '0' }, - { 'table.id': '1' }, - { 'table.id': '2' }, - { 'table.id': '3' }, - ]); - }); - it('should detect nulls', () => { const context = {}; const rowSetProvider = new RowSetProviderMock(); @@ -374,11 +343,11 @@ describe('JsonResultHandler', () => { ]; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(data); const result = new JsonResultHandler(context, rowSetProvider, schema); - expect(await result.getValue(data)).to.be.deep.eq([ + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { 'table.str': null, 'table.int64': null, @@ -409,9 +378,7 @@ describe('JsonResultHandler', () => { const rowSetProvider = new RowSetProviderMock(); const result = new JsonResultHandler(context, rowSetProvider, schema); - - expect(await result.getValue()).to.be.deep.eq([]); - expect(await result.getValue([])).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); }); it('should return empty array if no schema available', async () => { @@ -426,11 +393,11 @@ describe('JsonResultHandler', () => { ]; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(data); const result = new JsonResultHandler(context, rowSetProvider); - expect(await result.getValue(data)).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); }); it('should return raw data if types are not specified', async () => { @@ -462,11 +429,11 @@ describe('JsonResultHandler', () => { ]; const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(data); const result = new JsonResultHandler(context, rowSetProvider, schema); - expect(await result.getValue(data)).to.be.deep.eq([ + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { 'table.array': '["a", "b"]', 'table.map': '{ "key": 12 }', diff --git a/tests/unit/result/compatibility.test.js b/tests/unit/result/compatibility.test.js index b39efe12..6d047d57 100644 --- a/tests/unit/result/compatibility.test.js +++ b/tests/unit/result/compatibility.test.js @@ -12,25 +12,25 @@ const RowSetProviderMock = require('./fixtures/RowSetProviderMock'); describe('Result handlers compatibility tests', () => { it('colum-based data', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(fixtureColumn.rowSets); const result = new JsonResultHandler(context, rowSetProvider, fixtureColumn.schema); - const rows = await result.getValue(fixtureColumn.rowSets); + const rows = await result.fetchNext({ limit: 10000 }); expect(rows).to.deep.equal(fixtureColumn.expected); }); it('arrow-based data without native types', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(fixtureArrow.rowSets); const result = new ArrowResultHandler(context, rowSetProvider, fixtureArrow.schema, fixtureArrow.arrowSchema); - const rows = await result.getValue(fixtureArrow.rowSets); + const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected); }); it('arrow-based data with native types', async () => { const context = {}; - const rowSetProvider = new RowSetProviderMock(); + const rowSetProvider = new RowSetProviderMock(fixtureArrowNT.rowSets); const result = new ArrowResultHandler(context, rowSetProvider, fixtureArrowNT.schema, fixtureArrowNT.arrowSchema); - const rows = await result.getValue(fixtureArrowNT.rowSets); + const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrowNT.expected); }); }); diff --git a/tests/unit/result/fixtures/RowSetProviderMock.js b/tests/unit/result/fixtures/RowSetProviderMock.js index 96a54b5d..1e878a01 100644 --- a/tests/unit/result/fixtures/RowSetProviderMock.js +++ b/tests/unit/result/fixtures/RowSetProviderMock.js @@ -1,10 +1,14 @@ class RowSetProviderMock { + constructor(rowSets) { + this.rowSets = Array.isArray(rowSets) ? [...rowSets] : []; + } + async hasMore() { - return false; + return this.rowSets.length > 0; } async fetchNext() { - return undefined; + return this.rowSets.shift(); } }