Skip to content

Commit

Permalink
Merge pull request #882 from terascope/op-test-harness
Browse files Browse the repository at this point in the history
Improvements for teraslice-test-op-harness and parallel-slicer
  • Loading branch information
peterdemartini authored Nov 5, 2018
2 parents adfb3f4 + 869e5b7 commit 9e7d8ee
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 75 deletions.
4 changes: 2 additions & 2 deletions e2e/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ services:
networks:
- cluster
volumes:
- teraslice-assets:/app/assets:delegated
- teraslice-assets:/app/assets
- ./autoload:/app/autoload:delegated
- ./config:/app/config:delegated
elasticsearch:
Expand All @@ -56,7 +56,7 @@ services:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data:delegated
- elasticsearch-data:/usr/share/elasticsearch/data
networks:
- cluster
ulimits:
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
"@types/lodash": "^4.14.117",
"@types/lodash.clonedeep": "^4.5.4",
"@types/nanoid": "^1.2.0",
"@types/node": "^10.12.0",
"@types/node": "^10.12.2",
"@types/p-event": "^1.3.0",
"@types/socket.io": "^1.4.38",
"@types/socket.io-client": "^1.4.32",
"@types/uuid": "^3.4.4",
"lerna": "^3.4.3",
"typescript": "^3.1.3"
"typescript": "^3.1.6"
},
"devDependencies": {
"@types/jest": "^23.3.8",
Expand Down
6 changes: 3 additions & 3 deletions packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/job-components",
"version": "0.8.1",
"version": "0.8.2",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -46,7 +46,7 @@
},
"devDependencies": {
"@types/jest": "^23.3.8",
"@types/node": "^10.12.0",
"@types/node": "^10.12.2",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"benchmark": "^2.1.4",
Expand All @@ -56,7 +56,7 @@
"ts-jest": "^23.10.4",
"tslint": "^5.0.0",
"tslint-config-airbnb": "^5.11.0",
"typescript": "^3.1.3"
"typescript": "^3.1.6"
},
"engines": {
"node": ">=8.0.0"
Expand Down
30 changes: 22 additions & 8 deletions packages/job-components/src/operations/parallel-slicer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { SlicerFn } from '../interfaces';
import { SlicerFn, SlicerResult } from '../interfaces';
import SlicerCore from './core/slicer-core';
import { times } from '../utils';
import { times, isFunction } from '../utils';

/**
* A varient of a "Slicer" for running a parallel stream of slicers.
Expand All @@ -16,14 +16,17 @@ export default abstract class ParallelSlicer extends SlicerCore {
*/
async initialize(recoveryData: object[]): Promise<void> {
await super.initialize(recoveryData);
const { slicers } = this.executionConfig;
const { slicers = 1 } = this.executionConfig;

const promises = times(slicers, async (id) => {
const fn = await this.newSlicer();
if (!isFunction(fn)) return;

this._slicers.push({
done: false,
fn,
id,
processing: false,
order: 0,
});
});
Expand All @@ -44,7 +47,7 @@ export default abstract class ParallelSlicer extends SlicerCore {
* Called by {@link ParallelSlicer#handle} for every count of `slicers` in the ExecutionConfig
* @returns a function which will be called in parallel
*/
abstract async newSlicer(): Promise<SlicerFn>;
abstract async newSlicer(): Promise<SlicerFn|undefined>;

slicers() {
return this._slicers.length;
Expand All @@ -53,9 +56,11 @@ export default abstract class ParallelSlicer extends SlicerCore {
async handle(): Promise<boolean> {
if (this.isFinished) return true;

const promises = this._slicers.map((slicer) => this.processSlicer(slicer));
const promises = this._slicers
.filter((slicer) => !slicer.processing)
.map((slicer) => this.processSlicer(slicer));

await Promise.all(promises);
await Promise.race(promises);
return this.isFinished;
}

Expand All @@ -64,9 +69,17 @@ export default abstract class ParallelSlicer extends SlicerCore {
}

private async processSlicer(slicer: SlicerObj) {
if (slicer.done) return;
if (slicer.done || slicer.processing) return;

slicer.processing = true;
let result: SlicerResult;

try {
result = await slicer.fn();
} finally {
slicer.processing = false;
}

const result = await slicer.fn();
if (result == null && this.canComplete()) {
this.logger.info(`slicer ${slicer.id} has completed its range`);
slicer.done = true;
Expand All @@ -91,5 +104,6 @@ interface SlicerObj {
done: boolean;
fn: SlicerFn;
id: number;
processing: boolean;
order: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import {
// tslint:disable-next-line:variable-name
export default function legacyProcessorShim(Processor: ProcessorConstructor, Schema: SchemaConstructor, apis?: APIs): LegacyProcessor {
return {
// @ts-ignore
Processor,
Schema,
schema: (context) => {
if (Schema.type() !== 'convict') {
throw new Error('Backwards compatibility only works for "convict" schemas');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type SchemaType = SchemaConstructor;
// tslint:disable-next-line:variable-name
export default function legacyReaderShim(Slicer: SlicerType, Fetcher: FetcherType, Schema: SchemaType, apis?: APIs): LegacyReader {
return {
// @ts-ignore
Slicer,
Fetcher,
Schema,
schema: (context) => {
if (Schema.type() !== 'convict') {
throw new Error('Backwards compatibility only works for "convict" schemas');
Expand Down
10 changes: 3 additions & 7 deletions packages/job-components/src/operations/shims/reader-shim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import FetcherCore from '../core/fetcher-core';
import ParallelSlicer from '../parallel-slicer';
import ConvictSchema from '../convict-schema';
import { ReaderModule } from '../interfaces';
import { isInteger } from '../../utils';
import { isInteger, isFunction } from '../../utils';
import { convertResult } from './shim-utils';

export default function readerShim<S = any>(legacy: LegacyReader): ReaderModule {
Expand All @@ -25,7 +25,7 @@ export default function readerShim<S = any>(legacy: LegacyReader): ReaderModule
config: this.executionConfig,
};

if (legacy.slicerQueueLength && typeof legacy.slicerQueueLength === 'function') {
if (isFunction(legacy.slicerQueueLength)) {
const result = await legacy.slicerQueueLength(executionContext);
if (result === 'QUEUE_MINIMUM_SIZE') {
this._maxQueueLength = this.executionConfig.workers;
Expand All @@ -44,11 +44,7 @@ export default function readerShim<S = any>(legacy: LegacyReader): ReaderModule
if (this.slicerFns == null) {
throw new Error('Slicer has not been initialized');
}
const fn = this.slicerFns.shift();
if (!fn) {
return async () => null;
}
return fn;
return this.slicerFns.shift();
}

maxQueueLength() {
Expand Down
4 changes: 2 additions & 2 deletions packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
},
"devDependencies": {
"@types/jest": "^23.3.8",
"@types/node": "^10.12.0",
"@types/node": "^10.12.2",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"ts-jest": "^23.10.4",
"tslint": "^5.0.0",
"tslint-config-airbnb": "^5.11.0",
"typescript": "^3.1.3"
"typescript": "^3.1.6"
},
"engines": {
"node": ">=8.0.0"
Expand Down
92 changes: 76 additions & 16 deletions packages/teraslice-op-test-harness/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ Now, you can access functionality using the `harness` object.
## Processor Execution Function - `init()`

The manner to instanciate a new instance of your operation is by using init
You make pass in optional clients under the `clients` key but they must follow the format listed in the setClients definition
You make pass in optional clients under the `clients` key but they must follow the format listed in the setClients definition. This will work with both the new and old Job APIs.

```javascript
const { DataEntity } = require('@terascope/job-components');
const processor = require('../asset/example-op');
const reader = require('../asset/example-reader');

describe('setting up an operation', () => {
const processorOpTest = opHarness(processor);
const readerOpTest = opHarness(reader);
Expand All @@ -43,42 +47,87 @@ describe('setting up an operation', () => {
const data = { some: 'data' };
const pTest = await processorOpTest.init({ opConfig });

const results = await pTest.run(data)
expect(results).toBeDefined();
const results = await pTest.run(data);

expect(Array.isArray(results)).toBe(true);
expect(DataEntity.isDataEntity(results[0])).toBe(true);
});

it('can make a reader instance', async () => {
const executionConfig = {
lifecycle: 'once',
operations: [{ _op: 'foo', some: 'config' }
operations: [{ _op: 'foo', some: 'config' }]
};
const data = { some: 'data' };
const type = 'reader';
const rTest = await readerOpTest.init({ executionConfig, type });

const results = await rTest.run(data)
expect(results).toBeDefined();
const results = await rTest.run(data);

expect(Array.isArray(results)).toBe(true);
expect(results[0] instanceof DataEntity).toBe(true);
});

it('can make a slicer instance', async () => {
const executionConfig = {
lifecycle: 'once',
operations: [{ _op: 'foo', some: 'config' }
operations: [{ _op: 'foo', some: 'config' }]
};
// type defaults to slicer
const sTest = await readerOpTest.init({ executionConfig });

const results = await sTest.run()
expect(results).toBeDefined();
const results = await sTest.run();

expect(Array.isArray(results)).toBe(true);
expect(results[0]).toEqual({ foo: 'bar' });
});

it('can make a multiple slicer instances', async () => {
const executionConfig = {
lifecycle: 'once',
slicers: 3,
operations: [{ _op: 'foo', some: 'config' }]
};

// type defaults to slicer
const sTest = await readerOpTest.init({ executionConfig });

const results = await sTest.run();

expect(Array.isArray(results)).toBe(true);
expect(results[0]).toEqual({ foo: 'bar' });
});

it('can return the full metadata of slice', async () => {
const executionConfig = {
lifecycle: 'once',
slicers: 3,
operations: [{ _op: 'foo', some: 'config' }]
};

// type defaults to slicer
const sTest = await readerOpTest.init({ executionConfig });

const results = await sTest.run({ fullSlice: true });

expect(Array.isArray(results)).toBe(true);
expect(results[0]).toEqual({
slice_id: 'd994d423-f3a3-411d-8973-4a4ccccd1afd',
slicer_id: 0,
slicer_order: 10,
request: { foo: 'bar' }
});
});
})
```
## Processor Execution Function - `setClients()`
This takes an array of client configurations that will be used internally.
The obejct must have a client key and a type key set.

```javascript
const { DataEntity } = require('@terascope/job-components');
const reader = require('../asset/example-reader');

describe('setting up an operation', () => {
const opTest = opHarness(reader);
let client;
Expand Down Expand Up @@ -107,20 +156,22 @@ describe('setting up an operation', () => {
it('can make a reader instance', async () => {
const executionConfig = {
lifecycle: 'once',
operations: [{ _op: 'foo', some: 'config' }
operations: [{ _op: 'foo', some: 'config' }]
};
const data = { some: 'data' };
const type = 'reader';
const test = await opTest.init({ executionConfig, type });

const results = await test.run(data)
expect(results).toBeDefined();

expect(Array.isArray(results)).toBe(true);
expect(DataEntity.isDataEntity(results[0])).toBe(true);
});

it('can make a slicer instance', async () => {
const executionConfig = {
lifecycle: 'once',
operations: [{ _op: 'foo', some: 'config' }
operations: [{ _op: 'foo', some: 'config' }]
};
// you can override the clients at init time
const clients = [{ client: new OtherClient(), type: 'elasticsearch', endpoint: 'default'}]
Expand All @@ -130,21 +181,30 @@ describe('setting up an operation', () => {

const [results1, results2 ] = await Promise.all([ test1.run(), test2.run()]);

expect(results1).toBeDefined();
expect(results2).toBeDefined();
expect(Array.isArray(results1)).toBe(true);
expect(results1[0]).toEqual({ foo: 'bar' });

expect(Array.isArray(results2)).toBe(true);
expect(results2[0]).toEqual({ foo: 'bar' });
});
})
```
## Processor Execution Function `processData()`
This provides a short hand for processors to instantiate a new operation, run some data with it and return the results
This provides a short hand for processors to instantiate a new operation, run some data with it and return the results.

```javascript
const processor = require('../asset/example-op');

describe('processor operation test', () => {
const opTest = opHarness(processor);

it('has a shorthand method', async () => {
const opConfig = { _op: 'foo', some: 'config' };
const data = [{ some: 'data' }];
const results = await processorOpTest.processData(opConfig, data);
expect(results).toBeDefined();

expect(Array.isArray(results)).toBe(true);
expect(DataEntity.isDataEntity(results[0])).toBe(true);
})
})

Expand Down
Loading

0 comments on commit 9e7d8ee

Please sign in to comment.