Skip to content

Commit

Permalink
[PECO-983] Support streaming query results via Node.js streams (#262)
Browse files Browse the repository at this point in the history
* [PECO-983] Support streaming query results via Node.js streams

Signed-off-by: Levko Kravets <[email protected]>

* Add tests

Signed-off-by: Levko Kravets <[email protected]>

* CR1

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Oct 22, 2024
1 parent 3c29fe2 commit 1adcb94
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 0 deletions.
19 changes: 19 additions & 0 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { stringify, NIL } from 'uuid';
import { Readable } from 'node:stream';
import IOperation, {
FetchOptions,
FinishedOptions,
Expand All @@ -7,6 +8,7 @@ import IOperation, {
IteratorOptions,
IOperationChunksIterator,
IOperationRowsIterator,
NodeStreamOptions,
} from './contracts/IOperation';
import {
TGetOperationStatusResp,
Expand Down Expand Up @@ -101,6 +103,23 @@ export default class DBSQLOperation implements IOperation {
return new OperationRowsIterator(this, options);
}

public toNodeStream(options?: NodeStreamOptions): Readable {
let iterable: IOperationChunksIterator | IOperationRowsIterator | undefined;

switch (options?.mode ?? 'chunks') {
case 'chunks':
iterable = this.iterateChunks(options?.iteratorOptions);
break;
case 'rows':
iterable = this.iterateRows(options?.iteratorOptions);
break;
default:
throw new Error(`IOperation.toNodeStream: unsupported mode ${options?.mode}`);
}

return Readable.from(iterable, options?.streamOptions);
}

public get id() {
const operationId = this.operationHandle?.operationId?.guid;
return operationId ? stringify(operationId) : NIL;
Expand Down
9 changes: 9 additions & 0 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Readable, ReadableOptions } from 'node:stream';
import { TGetOperationStatusResp, TTableSchema } from '../../thrift/TCLIService_types';
import Status from '../dto/Status';

Expand Down Expand Up @@ -35,6 +36,12 @@ export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
readonly operation: IOperation;
}

export interface NodeStreamOptions {
mode?: 'chunks' | 'rows'; // defaults to 'chunks'
iteratorOptions?: IteratorOptions;
streamOptions?: ReadableOptions;
}

export default interface IOperation {
/**
* Operation identifier
Expand Down Expand Up @@ -86,4 +93,6 @@ export default interface IOperation {
iterateChunks(options?: IteratorOptions): IOperationChunksIterator;

iterateRows(options?: IteratorOptions): IOperationRowsIterator;

toNodeStream(options?: NodeStreamOptions): Readable;
}
60 changes: 60 additions & 0 deletions tests/e2e/iterators.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,64 @@ describe('Iterators', () => {
await session.close();
}
});

it('should get all chunks via Nodejs stream', async () => {
const session = await openSession({ arrowEnabled: false });
// @ts-expect-error TS2339: Property context does not exist on type IDBSQLSession
sinon.spy(session.context.driver, 'fetchResults');
try {
const expectedRowsCount = 10;

// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, {
maxRows: null,
});

const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
const chunkSize = 4;
const expectedChunks = arrayChunks(expectedRows, chunkSize);

const stream = operation.toNodeStream({
mode: 'chunks',
iteratorOptions: { maxRows: chunkSize },
});

let index = 0;
for await (const chunk of stream) {
expect(chunk).to.deep.equal(expectedChunks[index]);
index += 1;
}

expect(index).to.equal(expectedChunks.length);
} finally {
await session.close();
}
});

it('should get all rows via Nodejs stream', async () => {
const session = await openSession({ arrowEnabled: false });
// @ts-expect-error TS2339: Property context does not exist on type IDBSQLSession
sinon.spy(session.context.driver, 'fetchResults');
try {
const expectedRowsCount = 10;

const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`);

const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));

const stream = operation.toNodeStream({
mode: 'rows',
});

let index = 0;
for await (const row of stream) {
expect(row).to.deep.equal(expectedRows[index]);
index += 1;
}

expect(index).to.equal(expectedRows.length);
} finally {
await session.close();
}
});
});
6 changes: 6 additions & 0 deletions tests/unit/.stubs/OperationStub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import IOperation, {
IOperationChunksIterator,
IOperationRowsIterator,
IteratorOptions,
NodeStreamOptions,
} from '../../../lib/contracts/IOperation';
import Status from '../../../lib/dto/Status';
import { OperationChunksIterator, OperationRowsIterator } from '../../../lib/utils/OperationIterator';
import { Readable } from 'node:stream';

export default class OperationStub implements IOperation {
public readonly id: string = '';
Expand Down Expand Up @@ -59,4 +61,8 @@ export default class OperationStub implements IOperation {
public iterateRows(options?: IteratorOptions): IOperationRowsIterator {
return new OperationRowsIterator(this, options);
}

public toNodeStream(options?: NodeStreamOptions): Readable {
throw new Error('Not implemented');
}
}

0 comments on commit 1adcb94

Please sign in to comment.