Skip to content

Commit

Permalink
Use resultFormat field to properly process query results (#92)
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko authored Nov 9, 2022
1 parent e57e017 commit c99dfd1
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 75 deletions.
2 changes: 1 addition & 1 deletion lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {

private thrift = thrift;

constructor(options: ClientOptions) {
constructor(options?: ClientOptions) {
super();
this.connectionProvider = new HttpConnection();
this.authProvider = new NoSaslAuthentication();
Expand Down
31 changes: 26 additions & 5 deletions lib/DBSQLOperation/SchemaHelper.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { TOperationHandle, TGetResultSetMetadataResp } from '../../thrift/TCLIService_types';
import { TOperationHandle, TGetResultSetMetadataResp, TSparkRowSetType } from '../../thrift/TCLIService_types';
import HiveDriver from '../hive/HiveDriver';
import StatusFactory from '../factory/StatusFactory';
import IOperationResult from '../result/IOperationResult';
import JsonResult from '../result/JsonResult';
import HiveDriverError from '../errors/HiveDriverError';
import { definedOrError } from '../utils';

export default class SchemaHelper {
Expand All @@ -10,15 +13,15 @@ export default class SchemaHelper {

private statusFactory = new StatusFactory();

private metadata: TGetResultSetMetadataResp | null = null;
private metadata?: TGetResultSetMetadataResp;

constructor(driver: HiveDriver, operationHandle: TOperationHandle, metadata?: TGetResultSetMetadataResp) {
this.driver = driver;
this.operationHandle = operationHandle;
this.metadata = metadata || null;
this.metadata = metadata;
}

async fetch() {
private async fetchMetadata() {
if (!this.metadata) {
const metadata = await this.driver.getResultSetMetadata({
operationHandle: this.operationHandle,
Expand All @@ -27,6 +30,24 @@ export default class SchemaHelper {
this.metadata = metadata;
}

return definedOrError(this.metadata.schema);
return this.metadata;
}

async fetch() {
const metadata = await this.fetchMetadata();
return definedOrError(metadata.schema);
}

async getResultHandler(): Promise<IOperationResult> {
const metadata = await this.fetchMetadata();
const schema = definedOrError(metadata.schema);
const resultFormat = definedOrError(metadata.resultFormat);

switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
return new JsonResult(schema);
default:
throw new HiveDriverError(`Unsupported result format: ${TSparkRowSetType[resultFormat]}`);
}
}
}
16 changes: 0 additions & 16 deletions lib/DBSQLOperation/getResult.ts

This file was deleted.

7 changes: 3 additions & 4 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
} from '../../thrift/TCLIService_types';
import Status from '../dto/Status';

import getResult from './getResult';
import OperationStatusHelper from './OperationStatusHelper';
import SchemaHelper from './SchemaHelper';
import FetchResultsHelper from './FetchResultsHelper';
Expand Down Expand Up @@ -94,9 +93,9 @@ export default class DBSQLOperation implements IOperation {

await this._status.waitUntilReady(options);

return Promise.all([this._schema.fetch(), this._data.fetch(options?.maxRows || defaultMaxRows)]).then(
([schema, data]) => {
const result = getResult(schema, data ? [data] : []);
return Promise.all([this._schema.getResultHandler(), this._data.fetch(options?.maxRows || defaultMaxRows)]).then(
([resultHandler, data]) => {
const result = resultHandler.getValue(data ? [data] : []);
this.logger?.log(
LogLevel.debug,
`Fetched chunk of size: ${options?.maxRows || defaultMaxRows} from operation with id: ${this.getId()}`,
Expand Down
4 changes: 3 additions & 1 deletion lib/result/IOperationResult.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { TRowSet } from '../../thrift/TCLIService_types';

export default interface IOperationResult {
getValue(): any;
getValue(data?: Array<TRowSet>): any;
}
13 changes: 5 additions & 8 deletions lib/result/JsonResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,20 @@ import {
import IOperationResult from './IOperationResult';

export default class JsonResult implements IOperationResult {
private readonly schema: TTableSchema | null;
private readonly schema?: TTableSchema;

private readonly data: Array<TRowSet> | null;

constructor(schema: TTableSchema | null, data: Array<TRowSet>) {
constructor(schema?: TTableSchema) {
this.schema = schema;
this.data = data;
}

getValue(): Array<object> {
if (!this.data) {
getValue(data?: Array<TRowSet>): Array<object> {
if (!data) {
return [];
}

const descriptors = this.getSchemaColumns();

return this.data.reduce((result: Array<any>, rowSet: TRowSet) => {
return data.reduce((result: Array<any>, rowSet: TRowSet) => {
const columns = rowSet.columns || [];
const rows = this.getRows(columns, descriptors);

Expand Down
7 changes: 0 additions & 7 deletions lib/result/NullResult.ts

This file was deleted.

56 changes: 32 additions & 24 deletions tests/unit/DBSQLOperation.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
const { expect, AssertionError } = require('chai');
const sinon = require('sinon');
const { DBSQLLogger, LogLevel } = require('../../dist');
const { TStatusCode, TOperationState, TTypeId } = require('../../thrift/TCLIService_types');
const { TStatusCode, TOperationState, TTypeId, TSparkRowSetType } = require('../../thrift/TCLIService_types');
const DBSQLOperation = require('../../dist/DBSQLOperation').default;
const StatusError = require('../../dist/errors/StatusError').default;
const OperationStateError = require('../../dist/errors/OperationStateError').default;
const getResult = require('../../dist/DBSQLOperation/getResult').default;
const HiveDriverError = require('../../dist/errors/HiveDriverError').default;

// Create logger that won't emit
//
Expand All @@ -27,6 +27,7 @@ class DriverMock {

getResultSetMetadataResp = {
status: { statusCode: TStatusCode.SUCCESS_STATUS },
resultFormat: TSparkRowSetType.COLUMN_BASED_SET,
schema: {
columns: [
{
Expand Down Expand Up @@ -580,14 +581,14 @@ describe('DBSQLOperation', () => {
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
});

driver.getResultSetMetadataResp.schema = null;
driver.getResultSetMetadataResp.schema = { columns: [] };

const operation = new DBSQLOperation(driver, handle, logger);

const schema = await operation.getSchema();

expect(driver.getOperationStatus.called).to.be.true;
expect(schema).to.be.null;
expect(schema).to.deep.equal(driver.getResultSetMetadataResp.schema);
expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE);
});

Expand All @@ -604,8 +605,6 @@ describe('DBSQLOperation', () => {
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
});

driver.getResultSetMetadataResp.schema = null;

const operation = new DBSQLOperation(driver, handle, logger);
await operation.getSchema({ progress: true });

Expand All @@ -629,8 +628,6 @@ describe('DBSQLOperation', () => {
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
});

driver.getResultSetMetadataResp.schema = null;

const operation = new DBSQLOperation(driver, handle, logger);

const callback = sinon.stub();
Expand Down Expand Up @@ -753,7 +750,7 @@ describe('DBSQLOperation', () => {
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
});

driver.getResultSetMetadataResp.schema = null;
driver.getResultSetMetadataResp.schema = { columns: [] };
driver.fetchResultsResp.hasMoreRows = false;
driver.fetchResultsResp.results.columns = [];

Expand All @@ -762,7 +759,7 @@ describe('DBSQLOperation', () => {
const results = await operation.fetchChunk();

expect(driver.getOperationStatus.called).to.be.true;
expect(results).to.be.null;
expect(results).to.deep.equal([]);
expect(operation._status.state).to.equal(TOperationState.FINISHED_STATE);
});

Expand All @@ -779,7 +776,7 @@ describe('DBSQLOperation', () => {
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
});

driver.getResultSetMetadataResp.schema = null;
driver.getResultSetMetadataResp.schema = { columns: [] };
driver.fetchResultsResp.hasMoreRows = false;
driver.fetchResultsResp.results.columns = [];

Expand All @@ -806,7 +803,7 @@ describe('DBSQLOperation', () => {
return driver.getOperationStatus.wrappedMethod.apply(driver, args);
});

driver.getResultSetMetadataResp.schema = null;
driver.getResultSetMetadataResp.schema = { columns: [] };
driver.fetchResultsResp.hasMoreRows = false;
driver.fetchResultsResp.results.columns = [];

Expand Down Expand Up @@ -908,6 +905,29 @@ describe('DBSQLOperation', () => {
expect(driver.getResultSetMetadata.callCount).to.be.eq(1);
expect(driver.fetchResults.callCount).to.be.eq(1);
});

it('should fail on unsupported result format', async () => {
const handle = new OperationHandleMock();
handle.hasResultSet = true;

const driver = new DriverMock();
driver.getOperationStatusResp.operationState = TOperationState.FINISHED_STATE;

driver.getResultSetMetadataResp.resultFormat = TSparkRowSetType.ROW_BASED_SET;
driver.getResultSetMetadataResp.schema = { columns: [] };

const operation = new DBSQLOperation(driver, handle, logger);

try {
await operation.fetchChunk();
expect.fail('It should throw a HiveDriverError');
} catch (e) {
if (e instanceof AssertionError) {
throw e;
}
expect(e).to.be.instanceOf(HiveDriverError);
}
});
});

describe('fetchAll', () => {
Expand Down Expand Up @@ -1052,16 +1072,4 @@ describe('DBSQLOperation', () => {
expect(await operation.hasMoreRows()).to.be.false;
});
});

describe('getResult', () => {
it('should return null result', () => {
const t = getResult(null, []);
expect(t).to.equal(null);
});

it('should return json result', () => {
const t = getResult({ columns: [] }, []);
expect(t).to.deep.equal([]);
});
});
});
45 changes: 36 additions & 9 deletions tests/unit/result/JsonResult.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ describe('JsonResult', () => {
},
];

const result = new JsonResult(schema, data);
const result = new JsonResult(schema);

expect(result.getValue()).to.be.deep.eq([
expect(result.getValue(data)).to.be.deep.eq([
{
'table.str': 'a',
'table.int64': 282578800148737,
Expand Down Expand Up @@ -171,9 +171,9 @@ describe('JsonResult', () => {
},
];

const result = new JsonResult(schema, data);
const result = new JsonResult(schema);

expect(result.getValue()).to.be.deep.eq([
expect(result.getValue(data)).to.be.deep.eq([
{
'table.array': ['a', 'b'],
'table.map': { key: 12 },
Expand Down Expand Up @@ -210,9 +210,9 @@ describe('JsonResult', () => {
},
];

const result = new JsonResult(schema, data);
const result = new JsonResult(schema);

expect(result.getValue()).to.be.deep.eq([
expect(result.getValue(data)).to.be.deep.eq([
{ 'table.id': '0' },
{ 'table.id': '1' },
{ 'table.id': '2' },
Expand All @@ -221,7 +221,7 @@ describe('JsonResult', () => {
});

it('should detect nulls', () => {
const result = new JsonResult(null, []);
const result = new JsonResult(null);
const buf = Buffer.from([0x55, 0xaa, 0xc3]);

[
Expand Down Expand Up @@ -333,9 +333,9 @@ describe('JsonResult', () => {
},
];

const result = new JsonResult(schema, data);
const result = new JsonResult(schema);

expect(result.getValue()).to.be.deep.eq([
expect(result.getValue(data)).to.be.deep.eq([
{
'table.str': null,
'table.int64': null,
Expand All @@ -356,4 +356,31 @@ describe('JsonResult', () => {
},
]);
});

it('should return empty array if no data to process', () => {
const schema = {
columns: [getColumnSchema('table.id', TCLIService_types.TTypeId.STRING_TYPE, 1)],
};

const result = new JsonResult(schema);

expect(result.getValue()).to.be.deep.eq([]);
expect(result.getValue([])).to.be.deep.eq([]);
});

it('should return empty array if no schema available', () => {
const data = [
{
columns: [
{
stringVal: { values: ['0', '1'] },
},
],
},
];

const result = new JsonResult();

expect(result.getValue(data)).to.be.deep.eq([]);
});
});

0 comments on commit c99dfd1

Please sign in to comment.