Skip to content

Commit

Permalink
Iterable interface for IOperation (#252)
Browse files Browse the repository at this point in the history
* Iterable interface for IOperation

Signed-off-by: Levko Kravets <[email protected]>

* Chore: split `utils` unit tests into few files

Signed-off-by: Levko Kravets <[email protected]>

* Add tests

Signed-off-by: Levko Kravets <[email protected]>

* Add visibility modifiers

Signed-off-by: Levko Kravets <[email protected]>

* Fixes after merge

Signed-off-by: Levko Kravets <[email protected]>

* Fix import

Signed-off-by: Levko Kravets <[email protected]>

---------

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored May 9, 2024
1 parent c239fca commit fb817b5
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 93 deletions.
12 changes: 12 additions & 0 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import IOperation, {
FinishedOptions,
GetSchemaOptions,
WaitUntilReadyOptions,
IteratorOptions,
IOperationChunksIterator,
IOperationRowsIterator,
} from './contracts/IOperation';
import {
TGetOperationStatusResp,
Expand All @@ -26,6 +29,7 @@ import CloudFetchResultHandler from './result/CloudFetchResultHandler';
import ArrowResultConverter from './result/ArrowResultConverter';
import ResultSlicer from './result/ResultSlicer';
import { definedOrError } from './utils';
import { OperationChunksIterator, OperationRowsIterator } from './utils/OperationIterator';
import HiveDriverError from './errors/HiveDriverError';
import IClientContext from './contracts/IClientContext';

Expand Down Expand Up @@ -89,6 +93,14 @@ export default class DBSQLOperation implements IOperation {
this.context.getLogger().log(LogLevel.debug, `Operation created with id: ${this.id}`);
}

public iterateChunks(options?: IteratorOptions): IOperationChunksIterator {
return new OperationChunksIterator(this, options);
}

public iterateRows(options?: IteratorOptions): IOperationRowsIterator {
return new OperationRowsIterator(this, options);
}

public get id() {
const operationId = this.operationHandle?.operationId?.guid;
return operationId ? stringify(operationId) : NIL;
Expand Down
16 changes: 16 additions & 0 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ export interface GetSchemaOptions extends WaitUntilReadyOptions {
// no other options
}

export interface IteratorOptions extends FetchOptions {
autoClose?: boolean; // defaults to `false`
}

export interface IOperationChunksIterator extends AsyncIterableIterator<Array<object>> {
readonly operation: IOperation;
}

export interface IOperationRowsIterator extends AsyncIterableIterator<object> {
readonly operation: IOperation;
}

export default interface IOperation {
/**
* Operation identifier
Expand Down Expand Up @@ -70,4 +82,8 @@ export default interface IOperation {
* Fetch schema
*/
getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null>;

iterateChunks(options?: IteratorOptions): IOperationChunksIterator;

iterateRows(options?: IteratorOptions): IOperationRowsIterator;
}
85 changes: 85 additions & 0 deletions lib/utils/OperationIterator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import IOperation, { IOperationChunksIterator, IOperationRowsIterator, IteratorOptions } from '../contracts/IOperation';

abstract class OperationIterator<R> implements AsyncIterableIterator<R> {
public readonly operation: IOperation;

protected readonly options?: IteratorOptions;

constructor(operation: IOperation, options?: IteratorOptions) {
this.operation = operation;
this.options = options;
}

protected abstract getNext(): Promise<IteratorResult<R>>;

public [Symbol.asyncIterator]() {
return this;
}

public async next() {
const result = await this.getNext();

if (result.done && this.options?.autoClose) {
await this.operation.close();
}

return result;
}

// This method is intended for a cleanup when the caller does not intend to make any more
// reads from iterator (e.g. when using `break` in a `for ... of` loop)
public async return(value?: any) {
if (this.options?.autoClose) {
await this.operation.close();
}

return { done: true, value };
}
}

export class OperationChunksIterator extends OperationIterator<Array<object>> implements IOperationChunksIterator {
protected async getNext(): Promise<IteratorResult<Array<object>>> {
const hasMoreRows = await this.operation.hasMoreRows();
if (hasMoreRows) {
const value = await this.operation.fetchChunk(this.options);
return { done: false, value };
}

return { done: true, value: undefined };
}
}

export class OperationRowsIterator extends OperationIterator<object> implements IOperationRowsIterator {
private chunk: Array<object> = [];

private index: number = 0;

constructor(operation: IOperation, options?: IteratorOptions) {
super(operation, {
...options,
// Tell slicer to return raw chunks. We're going to process rows one by one anyway,
// so no need to additionally buffer and slice chunks returned by server
disableBuffering: true,
});
}

protected async getNext(): Promise<IteratorResult<object>> {
if (this.index < this.chunk.length) {
const value = this.chunk[this.index];
this.index += 1;
return { done: false, value };
}

const hasMoreRows = await this.operation.hasMoreRows();
if (hasMoreRows) {
this.chunk = await this.operation.fetchChunk(this.options);
this.index = 0;
// Note: this call is not really a recursion. Since this method is
// async - the call will be actually scheduled for processing on
// the next event loop cycle
return this.getNext();
}

return { done: true, value: undefined };
}
}
87 changes: 87 additions & 0 deletions tests/e2e/iterators.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const { expect } = require('chai');
const sinon = require('sinon');
const config = require('./utils/config');
const { DBSQLClient } = require('../../lib');

async function openSession(customConfig) {
const client = new DBSQLClient();

const clientConfig = client.getConfig();
sinon.stub(client, 'getConfig').returns({
...clientConfig,
...customConfig,
});

const connection = await client.connect({
host: config.host,
path: config.path,
token: config.token,
});

return connection.openSession({
initialCatalog: config.database[0],
initialSchema: config.database[1],
});
}

function arrayChunks(arr, chunkSize) {
const result = [];

while (arr.length > 0) {
const chunk = arr.splice(0, chunkSize);
result.push(chunk);
}

return result;
}

describe('Iterators', () => {
it('should iterate over all chunks', async () => {
const session = await openSession({ arrowEnabled: false });
sinon.spy(session.context.driver, 'fetchResults');
try {
const expectedRowsCount = 10;

// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`, {
maxRows: null,
});

const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));
const chunkSize = 4;
const expectedChunks = arrayChunks(expectedRows, chunkSize);

let index = 0;
for await (const chunk of operation.iterateChunks({ maxRows: chunkSize })) {
expect(chunk).to.deep.equal(expectedChunks[index]);
index += 1;
}

expect(index).to.equal(expectedChunks.length);
} finally {
await session.close();
}
});

it('should iterate over all rows', async () => {
const session = await openSession({ arrowEnabled: false });
sinon.spy(session.context.driver, 'fetchResults');
try {
const expectedRowsCount = 10;

const operation = await session.executeStatement(`SELECT * FROM range(0, ${expectedRowsCount})`);

const expectedRows = Array.from({ length: expectedRowsCount }, (_, id) => ({ id }));

let index = 0;
for await (const row of operation.iterateRows()) {
expect(row).to.deep.equal(expectedRows[index]);
index += 1;
}

expect(index).to.equal(expectedRows.length);
} finally {
await session.close();
}
});
});
Original file line number Diff line number Diff line change
@@ -1,97 +1,5 @@
const { expect, AssertionError } = require('chai');

const { buildUserAgentString, definedOrError, formatProgress, ProgressUpdateTransformer } = require('../../lib/utils');
const CloseableCollection = require('../../lib/utils/CloseableCollection').default;

describe('buildUserAgentString', () => {
// It should follow https://www.rfc-editor.org/rfc/rfc7231#section-5.5.3 and
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
//
// UserAgent ::= <ProductName> '/' <ProductVersion> '(' <Comment> ')'
// ProductName ::= 'NodejsDatabricksSqlConnector'
// <Comment> ::= [ <ClientId> ';' ] 'Node.js' <NodeJsVersion> ';' <OSPlatform> <OSVersion>
//
// Examples:
// - with <ClientId> provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Client ID; Node.js 16.13.1; Darwin 21.5.0)
// - without <ClientId> provided: NodejsDatabricksSqlConnector/0.1.8-beta.1 (Node.js 16.13.1; Darwin 21.5.0)

function checkUserAgentString(ua, clientId) {
// Prefix: 'NodejsDatabricksSqlConnector/'
// Version: three period-separated digits and optional suffix
const re =
/^(?<productName>NodejsDatabricksSqlConnector)\/(?<productVersion>\d+\.\d+\.\d+(-[^(]+)?)\s*\((?<comment>[^)]+)\)$/i;
const match = re.exec(ua);
expect(match).to.not.be.eq(null);

const { comment } = match.groups;

expect(comment.split(';').length).to.be.gte(2); // at least Node and OS version should be there

if (clientId) {
expect(comment.trim()).to.satisfy((s) => s.startsWith(`${clientId};`));
}
}

it('matches pattern with clientId', () => {
const clientId = 'Some Client ID';
const ua = buildUserAgentString(clientId);
checkUserAgentString(ua, clientId);
});

it('matches pattern without clientId', () => {
const ua = buildUserAgentString();
checkUserAgentString(ua);
});
});

describe('formatProgress', () => {
it('formats progress', () => {
const result = formatProgress({
headerNames: [],
rows: [],
});
expect(result).to.be.eq('\n');
});
});

describe('ProgressUpdateTransformer', () => {
it('should have equal columns', () => {
const t = new ProgressUpdateTransformer();

expect(t.formatRow(['Column 1', 'Column 2'])).to.be.eq('Column 1 |Column 2 ');
});

it('should format response as table', () => {
const t = new ProgressUpdateTransformer({
headerNames: ['Column 1', 'Column 2'],
rows: [
['value 1.1', 'value 1.2'],
['value 2.1', 'value 2.2'],
],
footerSummary: 'footer',
});

expect(String(t)).to.be.eq(
'Column 1 |Column 2 \n' + 'value 1.1 |value 1.2 \n' + 'value 2.1 |value 2.2 \n' + 'footer',
);
});
});

describe('definedOrError', () => {
it('should return value if it is defined', () => {
const values = [null, 0, 3.14, false, true, '', 'Hello, World!', [], {}];
for (const value of values) {
const result = definedOrError(value);
expect(result).to.be.equal(value);
}
});

it('should throw error if value is undefined', () => {
expect(() => {
definedOrError(undefined);
}).to.throw();
});
});
const CloseableCollection = require('../../../lib/utils/CloseableCollection').default;

describe('CloseableCollection', () => {
it('should add item if not already added', () => {
Expand Down
Loading

0 comments on commit fb817b5

Please sign in to comment.