Skip to content

Commit

Permalink
Merge pull request #910 from terascope/a-few-fixes
Browse files Browse the repository at this point in the history
v0.44.2 a few fixes
  • Loading branch information
peterdemartini authored Nov 29, 2018
2 parents d1e7160 + b59cb27 commit 9f49251
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 115 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
"@types/socket.io": "^1.4.38",
"@types/socket.io-client": "^1.4.32",
"@types/uuid": "^3.4.4",
"lerna": "^3.4.3",
"lerna": "^3.5.1",
"typescript": "^3.1.6"
},
"devDependencies": {
"@types/jest": "^23.3.8",
"@types/jest": "^23.3.10",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"eslint": "^5.9.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/job-components",
"version": "0.11.1",
"version": "0.11.2",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -47,7 +47,7 @@
"uuid": "^3.3.2"
},
"devDependencies": {
"@types/jest": "^23.3.8",
"@types/jest": "^23.3.10",
"@types/node": "^10.12.10",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/job-components/src/test-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ export class TestContext implements i.Context {
throw new Error(`Registered Client for connection "${key}" is not a function, got ${actual}`);
}

const config = setConnectorConfig(sysconfig, options, {});
const config = setConnectorConfig(sysconfig, options, {}, false);

const client = create(config, logger, options);

Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"socket.io-client": "^1.7.4"
},
"devDependencies": {
"@types/jest": "^23.3.8",
"@types/jest": "^23.3.10",
"@types/node": "^10.12.10",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice-op-test-harness/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-op-test-harness",
"version": "1.4.1",
"version": "1.4.2",
"publishConfig": {
"access": "public"
},
Expand All @@ -23,7 +23,7 @@
"url": "https://github.com/terascope/teraslice/issues"
},
"dependencies": {
"@terascope/job-components": "^0.11.1",
"@terascope/job-components": "^0.11.2",
"bluebird": "^3.5.3",
"lodash": "^4.17.11"
},
Expand Down
8 changes: 4 additions & 4 deletions packages/teraslice-test-harness/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "teraslice-test-harness",
"version": "0.2.1",
"version": "0.3.0",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -36,13 +36,13 @@
"test:debug": "env DEBUG='*teraslice*' jest --detectOpenHandles --coverage=false --runInBand"
},
"dependencies": {
"@terascope/job-components": "^0.11.1",
"@terascope/teraslice-op-test-harness": "^1.4.1",
"@terascope/job-components": "^0.11.2",
"@terascope/teraslice-op-test-harness": "^1.4.2",
"@types/lodash": "^4.14.118",
"lodash": "^4.17.11"
},
"devDependencies": {
"@types/jest": "^23.3.8",
"@types/jest": "^23.3.10",
"@types/node": "^10.12.10",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
Expand Down
26 changes: 20 additions & 6 deletions packages/teraslice-test-harness/src/worker-test-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { JobHarnessOptions } from './interfaces';
* This is useful for testing Fetcher and Processors together or individually.
*
* @todo Add support for attaching APIs and Observers
* @todo Add support for slice retries
*/
export default class WorkerTestHarness extends BaseTestHarness<WorkerExecutionContext> {
constructor(job: JobConfig, options: JobHarnessOptions) {
Expand Down Expand Up @@ -57,6 +56,8 @@ export default class WorkerTestHarness extends BaseTestHarness<WorkerExecutionCo
async runSlice(input: Slice|SliceRequest, options: { fullResponse: false }): Promise<DataEntity[]>;
async runSlice(input: Slice|SliceRequest, options: { fullResponse: true }): Promise<RunSliceResult>;
async runSlice(input: Slice|SliceRequest, { fullResponse = false } = {}): Promise<DataEntity[]|RunSliceResult> {
const maxRetries = this.executionContext.config.max_retries;
const remainingTries = Number.isInteger(maxRetries) ? maxRetries + 1 : 0;
const slice: Slice = isSlice(input) ? input : newTestSlice(input);

this.events.emit('slice:initialize', slice);
Expand All @@ -65,11 +66,7 @@ export default class WorkerTestHarness extends BaseTestHarness<WorkerExecutionCo
let result: RunSliceResult;

try {
result = await this.executionContext.runSlice(slice);
} catch (err) {
this.events.emit('slice:failure', slice);
await this.executionContext.onSliceFailed(slice.slice_id);
throw err;
result = await this._trySlice(slice, remainingTries);
} finally {
this.events.emit('slice:finalize', slice);
await this.executionContext.onSliceFinalizing(slice.slice_id);
Expand All @@ -94,6 +91,23 @@ export default class WorkerTestHarness extends BaseTestHarness<WorkerExecutionCo
await super.shutdown();
await this.executionContext.shutdown();
}

private async _trySlice(slice: Slice, remainingTries: number): Promise<RunSliceResult> {
try {
return await this.executionContext.runSlice(slice);
} catch (err) {
this.context.logger.error('Slice Failure', err, slice);
if (remainingTries > 0) {
this.events.emit('slice:retry', slice);
await this.executionContext.onSliceRetry(slice.slice_id);
return this._trySlice(slice, remainingTries - 1);
}

this.events.emit('slice:failure', slice);
await this.executionContext.onSliceFailed(slice.slice_id);
throw err;
}
}
}

function isSlice(input: Slice|SliceRequest): input is Slice {
Expand Down
25 changes: 25 additions & 0 deletions packages/teraslice-test-harness/test/worker-test-harness-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ describe('WorkerTestHarness', () => {

describe('when given a valid job config', () => {
const job = newTestJobConfig();
job.max_retries = 2;
job.analytics = true;
job.operations = [
{
Expand All @@ -40,6 +41,10 @@ describe('WorkerTestHarness', () => {
clients
});

workerHarness.processors[0].handle = jest.fn(async (data: DataEntity[]) => {
return data;
});

it('should be able to call initialize', () => {
return expect(workerHarness.initialize()).resolves.toBeNil();
});
Expand All @@ -65,6 +70,26 @@ describe('WorkerTestHarness', () => {
expect(DataEntity.isDataEntityArray(result)).toBeTrue();
});

it('should call slice retry', async () => {
const onSliceRetryEvent = jest.fn();
workerHarness.events.on('slice:retry', onSliceRetryEvent);
const err = new Error('oh no');

workerHarness.processors[0].handle
// @ts-ignore
.mockClear()
// @ts-ignore
.mockRejectedValueOnce(err)
// @ts-ignore
.mockRejectedValueOnce(err);

const results = await workerHarness.runSlice({ });
expect(results).toBeArray();

expect(onSliceRetryEvent).toHaveBeenCalledTimes(2);
expect(workerHarness.processors[0].handle).toHaveBeenCalledTimes(3);
});

it('should be able to call runSlice with fullResponse', async () => {
const result = await workerHarness.runSlice(newTestSlice(), { fullResponse: true });
expect(result.analytics).not.toBeNil();
Expand Down
30 changes: 12 additions & 18 deletions packages/teraslice/lib/workers/helpers/op-analytics.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,21 @@

const _ = require('lodash');

function logOpStats(logger, slice, analyticsData) {
const str = 'analytics for slice: ';
let dataStr = '';
function formatVal(value) {
if (_.isString(value)) return `"${value}"`;
if (_.isArray(value)) return `[${value.join(', ')}]`;

if (_.isString(slice)) {
dataStr = `${slice}, `;
} else {
_.forOwn(slice, (value, key) => {
if (_.isString(value)) {
dataStr += `${key} : ${value} `;
} else {
dataStr += `${key} : ${JSON.stringify(value)} `;
}
});
}
return _.truncate(JSON.stringify(value));
}

_.forOwn(analyticsData, (value, key) => {
dataStr += `${key} : ${value} `;
});
function format(input) {
return _.map(input, (value, key) => `${key}: ${formatVal(value)}`).join(', ');
}

function logOpStats(logger, slice, analyticsData) {
const obj = Object.assign({}, _.omit(slice, 'request'), analyticsData);

logger.info(str + dataStr);
logger.info(`analytics for slice: ${format(obj)}`);
}

module.exports = { logOpStats };
19 changes: 10 additions & 9 deletions packages/teraslice/lib/workers/helpers/worker-shutdown.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,22 @@ function shutdownHandler(context, shutdownFn) {
.then(() => Promise.delay(1000));
}

const exitingIn = `exiting in ${shutdownTimeout}ms...`;

function exit(event, err) {
if (api.exiting) {
logger.debug(`${assignment} already shutting down`);
return;
}

api.exiting = true;
logger.warn(`${assignment} exiting in ${shutdownTimeout}ms...`);

const startTime = Date.now();
Promise.race([
shutdownFn(event, err),
Promise.delay(shutdownTimeout - 2000)
]).then(() => {
logger.debug(`${assignment} shutdown took ${Date.now() - startTime}ms`);
logger.info(`${assignment} shutdown took ${Date.now() - startTime}ms`);
}).catch((error) => {
logger.error(`${assignment} while shutting down`, error);
}).then(() => {
Expand All @@ -80,31 +81,31 @@ function shutdownHandler(context, shutdownFn) {
}

process.on('SIGINT', () => {
logger.info('Received process:SIGINT');
logger.info(`${assignment} received process:SIGINT, ${exitingIn}`);
if (!api.exiting) {
process.exitCode = 0;
}
exit('SIGINT');
});

process.on('SIGTERM', () => {
logger.info(`${assignment} received process:SIGTERM`);
logger.info(`${assignment} received process:SIGTERM, ${exitingIn}`);
if (!api.exiting) {
process.exitCode = 0;
}
exit('SIGTERM');
});

process.on('uncaughtException', (err) => {
logger.fatal(`${assignment} received an uncaughtException`, err);
logger.fatal(`${assignment} received an uncaughtException, ${exitingIn}`, err);
if (!api.exiting) {
process.exitCode = restartOnFailure ? 1 : 0;
}
exit('uncaughtException', err);
});

process.once('unhandledRejection', (err) => {
logger.fatal(`${assignment} received an unhandledRejection`, err);
logger.fatal(`${assignment} received an unhandledRejection, ${exitingIn}`, err);
if (!api.exiting) {
process.exitCode = restartOnFailure ? 1 : 0;
}
Expand All @@ -113,7 +114,7 @@ function shutdownHandler(context, shutdownFn) {

// event is fired from terafoundation when an error occurs during instantiation of a client
events.once('client:initialization:error', (err) => {
logger.fatal(`${assignment} received a client initialization error`, err);
logger.fatal(`${assignment} received a client initialization error, ${exitingIn}`, err);
if (!api.exiting) {
process.exitCode = restartOnFailure ? 1 : 0;
}
Expand All @@ -125,9 +126,9 @@ function shutdownHandler(context, shutdownFn) {
process.exitCode = 0;
}
if (err) {
logger.fatal(`${assignment} shutdown error`, err);
logger.fatal(`${assignment} shutdown error, ${exitingIn}`, err);
} else {
logger.info(`${assignment} shutdown`);
logger.info(`${assignment} shutdown, ${exitingIn}`);
}
exit('worker:shutdown:complete', err);
});
Expand Down
12 changes: 8 additions & 4 deletions packages/teraslice/lib/workers/worker/slice.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ class Slice {

const { slice } = this;
const maxRetries = get(this.executionContext, 'config.max_retries', 3);
const maxTries = maxRetries > 0 ? maxRetries + 1 : 0;
const retryOptions = {
max_tries: maxRetries,
max_tries: maxTries,
throw_original: true,
interval: 100,
backoff: 2
};

let result;
let remaining = maxTries;

try {
result = await retry(() => {
const shouldRetry = maxRetries > 0;
return this._runOnce(shouldRetry);
remaining -= 1;
return this._runOnce(remaining > 0);
}, retryOptions);
await this._markCompleted();
} catch (err) {
Expand Down Expand Up @@ -121,7 +124,7 @@ class Slice {
return Promise.reject(sliceError);
}

_runOnce(shouldRetry = true) {
_runOnce(shouldRetry) {
if (this._isShutdown) {
throw new retry.StopError('Slice shutdown during slice execution');
}
Expand All @@ -131,6 +134,7 @@ class Slice {
return this.executionContext.runSlice(slice)
.catch((err) => {
this.logger.error(`An error has occurred: ${toString(err)}, slice:`, slice);

if (!shouldRetry) {
return Promise.reject(err);
}
Expand Down
6 changes: 3 additions & 3 deletions packages/teraslice/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "teraslice",
"version": "0.44.1",
"version": "0.44.2",
"description": "Slice and dice your Elasticsearch data",
"bin": "service.js",
"main": "index.js",
Expand Down Expand Up @@ -36,7 +36,7 @@
"dependencies": {
"@terascope/elasticsearch-api": "^1.1.3",
"@terascope/error-parser": "^1.0.1",
"@terascope/job-components": "^0.11.1",
"@terascope/job-components": "^0.11.2",
"@terascope/queue": "^1.1.4",
"@terascope/teraslice-messaging": "^0.2.5",
"async-mutex": "^0.1.3",
Expand Down Expand Up @@ -66,7 +66,7 @@
"yargs": "^12.0.2"
},
"devDependencies": {
"@terascope/teraslice-op-test-harness": "^1.4.1",
"@terascope/teraslice-op-test-harness": "^1.4.2",
"archiver": "^3.0.0",
"bufferstreams": "^2.0.1",
"chance": "^1.0.16",
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/test/workers/worker/slice-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ describe('Slice', () => {
testContext = new TestContext({
maxRetries: 5,
analytics: false,
opErrorAt: times(5)
opErrorAt: times(6)
});

slice = await setupSlice(testContext, eventMocks);
Expand Down
Loading

0 comments on commit 9f49251

Please sign in to comment.