Skip to content

Commit

Permalink
Add execution stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Arman A authored and saumehta9 committed Dec 22, 2020
1 parent b14e32e commit e5b4efa
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 72 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# 2.1.0
Add support for obtaining basic server-side statistics on individual statement executions.

## :tada: Enhancements
* Added `IOUsage` and `TimingInformation` interface to provide server-side execution statistics
* IOUsage provides `getReadIOs(): number`
* TimingInformation provides `getProcessingTimeMilliseconds(): number`
* Added `getConsumedIOs(): IOUsage` and `getTimingInformation(): TimingInformation` to the `Result` and `ResultStream`
* `getConsumedIOs(): IOUsage` and `getTimingInformation(): TimingInformation` methods are stateful, meaning the statistics returned by them reflect the state at the time of method execution

# 2.0.0 (2020-08-27)

The release candidate 1 (v2.0.0-rc.1) has been selected as a final release of v2.0.0. No new changes are introduced between v2.0.0-rc.1 and v2.0.0.
Expand Down
2 changes: 2 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ export { Result } from "./src/Result";
export { Transaction } from "./src/Transaction";
export { TransactionExecutor } from "./src/TransactionExecutor";
export { RetryConfig } from "./src/retry/RetryConfig";
export { IOUsage } from "./src/stats/IOUsage";
export { TimingInformation } from "./src/stats/TimingInformation";
export { BackoffFunction } from "./src/retry/BackoffFunction";
export { defaultRetryConfig } from "./src/retry/DefaultRetryConfig"
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
},
"name": "amazon-qldb-driver-nodejs",
"description": "The Node.js driver for working with Amazon Quantum Ledger Database",
"version": "2.0.0",
"version": "2.1.0",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"engines": {
Expand All @@ -19,7 +19,7 @@
"@types/sinon": "^7.0.13",
"@typescript-eslint/eslint-plugin": "^2.5.0",
"@typescript-eslint/parser": "^2.5.0",
"aws-sdk": "^2.546.0",
"aws-sdk": "^2.815.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"cross-env": "^6.0.3",
Expand All @@ -37,7 +37,7 @@
"typescript": "^3.5.3"
},
"peerDependencies": {
"aws-sdk": "^2.546.0",
"aws-sdk": "^2.815.0",
"ion-js": "~4.0.0",
"jsbi": "~3.1.1"
},
Expand Down
12 changes: 6 additions & 6 deletions src/QldbDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import Semaphore from "semaphore-async-await";
import { version } from "../package.json";
import { Communicator } from "./Communicator";
import { defaultRetryConfig } from "./retry/DefaultRetryConfig";
import {
DriverClosedError,
import {
DriverClosedError,
isInvalidSessionException,
isTransactionExpiredException,
SessionPoolEmptyError,
Expand Down Expand Up @@ -72,7 +72,7 @@ export class QldbDriver {
* The maxConcurrentTransactions parameter specifies the number of sessions that the driver can hold in the pool.
* The default is set to maximum number of sockets specified in the globalAgent.
* See {@link https://docs.aws.amazon.com/qldb/latest/developerguide/driver.best-practices.html#driver.best-practices.configuring} for more details.
* @param RetryConfig Config to specify max number of retries, base and custom backoff strategy for retries. Will be overridden if a different retry_config
* @param retryConfig Config to specify max number of retries, base and custom backoff strategy for retries. Will be overridden if a different retryConfig
* is passed to {@linkcode executeLambda}.
*
* @throws RangeError if `maxConcurrentTransactions` is less than 0.
Expand Down Expand Up @@ -146,8 +146,8 @@ export class QldbDriver {
* The function passed via retryIndicator parameter is invoked whenever there is a failure and the driver is about to retry the transaction.
* The retryIndicator will be called with the current attempt number.
*
* @param transactionFunction The function representing a transaction to be executed. Please see the method docs to understand the usage of this parameter.
* @param retryConfig Config to specify max number of retries, base and custom backoff strategy for retries. This config
* @param transactionLambda The function representing a transaction to be executed. Please see the method docs to understand the usage of this parameter.
* @param retryConfig Config to specify max number of retries, base and custom backoff strategy for retries. This config
* overrides the retry config set at driver level for a particular lambda execution.
* Note that all the values of the driver level retry config will be overridden by the new config passed here.
* @throws {@linkcode DriverClosedError} When a transaction is attempted on a closed driver instance. {@linkcode close}
Expand All @@ -171,7 +171,7 @@ export class QldbDriver {
return await session.executeLambda(transactionLambda, retryConfig, transactionExecutionContext);
} catch(err) {
/* This is a guard condition to prevent the driver from entering an infinite loop
if all the sessions start resulting in InvalidSessionException
if all the sessions start resulting in InvalidSessionException
*/
if (transactionExecutionAttempt >= this._maxConcurrentTransactions + 3) {
throw err;
Expand Down
81 changes: 67 additions & 14 deletions src/Result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,55 @@
* and limitations under the License.
*/

import { IonBinary, FetchPageResult, Page, ValueHolder } from "aws-sdk/clients/qldbsession";
import {
ExecuteStatementResult,
FetchPageResult,
IonBinary,
Page,
ValueHolder
} from "aws-sdk/clients/qldbsession";
import { dom } from "ion-js";

import { Communicator } from "./Communicator";
import { ClientException } from "./errors/Errors"
import { ResultStream } from "./ResultStream";
import { IOUsage } from "./stats/IOUsage";
import { TimingInformation } from "./stats/TimingInformation";

/**
* A class representing a fully buffered set of results returned from QLDB.
*/
export class Result {
private _resultList: dom.Value[];
private _ioUsage: IOUsage;
private _timingInformation: TimingInformation;

/**
* Creates a Result.
* @param resultList A list of Ion values containing the statement execution's result returned from QLDB.
* @param ioUsage Contains the number of consumed IO requests for the executed statement.
* @param timingInformation Holds server side processing time for the executed statement.
*/
private constructor(resultList: dom.Value[]) {
private constructor(resultList: dom.Value[], ioUsage: IOUsage, timingInformation: TimingInformation) {
this._resultList = resultList;
this._ioUsage = ioUsage;
this._timingInformation = timingInformation;
}

/**
* Static factory method that creates a Result object, containing the results of a statement execution from QLDB.
* @param txnId The ID of the transaction the statement was executed in.
* @param page The initial page returned from the statement execution.
* @param executeResult The returned result from the statement execution.
* @param communicator The Communicator used for the statement execution.
* @returns Promise which fulfills with a Result.
*/
static async create(txnId: string, page: Page, communicator: Communicator): Promise<Result> {
const resultList: dom.Value[] = await Result._fetchResultPages(txnId, page, communicator);
return new Result(resultList);
static async create(
txnId: string,
executeResult: ExecuteStatementResult,
communicator: Communicator
): Promise<Result> {
const result: Result = await Result._fetchResultPages(txnId, executeResult, communicator);
return result;
}

/**
Expand All @@ -51,7 +69,7 @@ export class Result {
*/
static async bufferResultStream(resultStream: ResultStream): Promise<Result> {
const resultList: dom.Value[] = await Result._readResultStream(resultStream);
return new Result(resultList);
return new Result(resultList, resultStream.getConsumedIOs(), resultStream.getTimingInformation());
}

/**
Expand All @@ -62,9 +80,25 @@ export class Result {
return this._resultList.slice();
}

/**
* Returns the number of read IO request for the executed statement.
* @returns IOUsage, containing number of read IOs.
*/
getConsumedIOs(): IOUsage {
return this._ioUsage;
}

/**
* Returns server-side processing time for the executed statement.
* @returns TimingInformation, containing processing time.
*/
getTimingInformation(): TimingInformation {
return this._timingInformation;
}

/**
* Handle the unexpected Blob return type from QLDB.
* @param IonBinary The IonBinary value returned from QLDB.
* @param ionBinary The IonBinary value returned from QLDB.
* @returns The IonBinary value cast explicitly to one of the types that make up the IonBinary type. This will be
* either Buffer, Uint8Array, or string.
* @throws {@linkcode ClientException} when the specific type of the IonBinary value is Blob.
Expand All @@ -85,31 +119,50 @@ export class Result {
/**
* Fetches all subsequent Pages given an initial Page, places each value of each Page in an Ion value.
* @param txnId The ID of the transaction the statement was executed in.
* @param page The initial page returned from the statement execution.
* @param executeResult The returned result from the statement execution.
* @param communicator The Communicator used for the statement execution.
* @returns Promise which fulfills with a list of Ion values, representing all the returned values of the result set.
* @returns Promise which fulfills with a Result, containing a list of Ion values, representing all the returned
* values of the result set, number of IOs for the request, and the time spent processing the request.
*/
private static async _fetchResultPages(txnId: string, page: Page, communicator: Communicator): Promise<dom.Value[]> {
let currentPage: Page = page;
private static async _fetchResultPages(
txnId: string,
executeResult: ExecuteStatementResult,
communicator: Communicator
): Promise<Result> {
let currentPage: Page = executeResult.FirstPage;
let readIO: number = executeResult.ConsumedIOs != null ? executeResult.ConsumedIOs.ReadIOs : null;
let processingTime: number =
executeResult.TimingInformation != null ? executeResult.TimingInformation.ProcessingTimeMilliseconds : null;

const pageValuesArray: ValueHolder[][] = [];
if (currentPage.Values && currentPage.Values.length > 0) {
pageValuesArray.push(currentPage.Values);
}
while (currentPage.NextPageToken) {
const fetchPageResult: FetchPageResult =
const fetchPageResult: FetchPageResult =
await communicator.fetchPage(txnId, currentPage.NextPageToken);
currentPage = fetchPageResult.Page;
if (currentPage.Values && currentPage.Values.length > 0) {
pageValuesArray.push(currentPage.Values);
}

if (fetchPageResult.ConsumedIOs != null) {
readIO += fetchPageResult.ConsumedIOs.ReadIOs;
}

if (fetchPageResult.TimingInformation != null) {
processingTime += fetchPageResult.TimingInformation.ProcessingTimeMilliseconds;
}
}
const ionValues: dom.Value[] = [];
pageValuesArray.forEach((valueHolders: ValueHolder[]) => {
valueHolders.forEach((valueHolder: ValueHolder) => {
ionValues.push(dom.load(Result._handleBlob(valueHolder.IonBinary)));
});
});
return ionValues;
const ioUsage: IOUsage = readIO != null ? new IOUsage(readIO) : null;
const timingInformation = processingTime != null ? new TimingInformation(processingTime) : null;
return new Result(ionValues, ioUsage, timingInformation);
}

/**
Expand Down
50 changes: 45 additions & 5 deletions src/ResultStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@
* and limitations under the License.
*/

import { FetchPageResult, Page } from "aws-sdk/clients/qldbsession";
import {
ExecuteStatementResult,
FetchPageResult,
Page,
} from "aws-sdk/clients/qldbsession";
import { dom } from "ion-js";
import { Readable } from "stream";

import { Communicator } from "./Communicator";
import { Result } from "./Result";
import { IOUsage } from "./stats/IOUsage";
import { TimingInformation } from "./stats/TimingInformation";

/**
* A class representing the result of a statement returned from QLDB as a stream.
Expand All @@ -30,21 +36,46 @@ export class ResultStream extends Readable {
private _shouldPushCachedPage: boolean;
private _retrieveIndex: number;
private _isPushingData: boolean;
private _readIOs: number;
private _processingTime: number;

/**
* Create a ResultStream.
* @param txnId The ID of the transaction the statement was executed in.
* @param firstPage The initial page returned from the statement execution.
* @param executeResult The returned result from the statement execution.
* @param communicator The Communicator used for the statement execution.
*/
constructor(txnId: string, firstPage: Page, communicator: Communicator) {
constructor(txnId: string, executeResult: ExecuteStatementResult, communicator: Communicator) {
super({ objectMode: true });
this._communicator = communicator;
this._cachedPage = firstPage;
this._cachedPage = executeResult.FirstPage;
this._txnId = txnId;
this._shouldPushCachedPage = true;
this._retrieveIndex = 0;
this._isPushingData = false;
this._readIOs = executeResult.ConsumedIOs == null ? null : executeResult.ConsumedIOs.ReadIOs;
this._processingTime =
executeResult.TimingInformation == null ? null : executeResult.TimingInformation.ProcessingTimeMilliseconds;
}

/**
* Returns the number of read IO request for the executed statement. The statistics are stateful.
* @returns IOUsage, containing number of read IOs.
*/
getConsumedIOs(): IOUsage {
return this._readIOs == null
? null
: new IOUsage(this._readIOs);
}

/**
* Returns server-side processing time for the executed statement. The statistics are stateful
* @returns TimingInformation, containing processing time.
*/
getTimingInformation(): TimingInformation {
return this._processingTime == null
? null
: new TimingInformation(this._processingTime);
}

/**
Expand Down Expand Up @@ -72,9 +103,18 @@ export class ResultStream extends Readable {
this._shouldPushCachedPage = false;
} else if (this._cachedPage.NextPageToken) {
try {
const fetchPageResult: FetchPageResult =
const fetchPageResult: FetchPageResult =
await this._communicator.fetchPage(this._txnId, this._cachedPage.NextPageToken);
this._cachedPage = fetchPageResult.Page;

if (fetchPageResult.ConsumedIOs != null) {
this._readIOs += fetchPageResult.ConsumedIOs.ReadIOs;
}

if (fetchPageResult.TimingInformation != null) {
this._processingTime += fetchPageResult.TimingInformation.ProcessingTimeMilliseconds;
}

this._retrieveIndex = 0;
} catch (e) {
this.destroy(e);
Expand Down
5 changes: 2 additions & 3 deletions src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ export class Transaction implements TransactionExecutable {
*/
async execute(statement: string, ...parameters: any[]): Promise<Result> {
const result: ExecuteStatementResult = await this._sendExecute(statement, parameters);
const inlineResult = Result.create(this._txnId, result.FirstPage, this._communicator);
return inlineResult;
return Result.create(this._txnId, result, this._communicator);
}

/**
Expand All @@ -146,7 +145,7 @@ export class Transaction implements TransactionExecutable {
*/
async executeAndStreamResults(statement: string, ...parameters: any[]): Promise<Readable> {
const result: ExecuteStatementResult = await this._sendExecute(statement, parameters);
return new ResultStream(this._txnId, result.FirstPage, this._communicator);
return new ResultStream(this._txnId, result, this._communicator);
}

/**
Expand Down
23 changes: 23 additions & 0 deletions src/integrationtest/StatementExecution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { isOccConflictException } from "../errors/Errors";
import { QldbDriver } from "../QldbDriver";
import { Result } from "../Result";
import { RetryConfig } from "../retry/RetryConfig";
import { IOUsage } from "../stats/IOUsage";
import { TransactionExecutor } from "../TransactionExecutor";
import * as constants from "./TestConstants";
import { TestUtils } from "./TestUtils";
Expand Down Expand Up @@ -223,6 +224,28 @@ describe("StatementExecution", function() {
chai.assert.equal(searchCount, 0);
});

it("Can return metrics for consumed IOs", async () => {
const struct1: Record<string, string> = {
[constants.COLUMN_NAME]: constants.MULTI_DOC_VALUE_1
};
const struct2: Record<string, string> = {
[constants.COLUMN_NAME]: constants.MULTI_DOC_VALUE_2
};
const insertStatement: string = `INSERT INTO ${constants.TABLE_NAME} <<?,?>>`;
let ioUsage: IOUsage = await driver.executeLambda(async (txn: TransactionExecutor) => {
return (await txn.execute(insertStatement, struct1, struct2)).getConsumedIOs();
});
chai.assert.equal(ioUsage.getReadIOs(), 0);

const searchQuery: string = `SELECT VALUE ${constants.COLUMN_NAME} FROM ${constants.TABLE_NAME}` +
` WHERE ${constants.COLUMN_NAME} IS NULL`;
ioUsage = await driver.executeLambda(async (txn: TransactionExecutor) => {
return (await txn.execute(searchQuery)).getConsumedIOs();
});
chai.assert.equal(ioUsage.getReadIOs(), 2);

});

it("Can delete all documents", async () => {
const struct1: Record<string, string> = {
[constants.COLUMN_NAME]: constants.MULTI_DOC_VALUE_1
Expand Down
Loading

0 comments on commit e5b4efa

Please sign in to comment.