Skip to content

Commit

Permalink
Update Operation APIs and Examples (#834)
Browse files Browse the repository at this point in the history
* Use correct naming for JobConfig

* Rename data-processor to processor

* create slicer-core base class and other fixes

* restructure operations to make more sense

* add parallel slicer and more power to the slicer core class

* add examples and simplify construction of the new processors

* improvements to job-components

* documentation and example improvements to job-components

* bump job-components to 0.5.0

* fix tests

* better docs, improved op apis and reorganized a few things

* export default and doc improvements

* Fix job-component tests and worker-allocation tests

* Make DataEntity factory methods static methods, and remove them OperationCore

* add base class for Schema

* Expose a shim for legacy operations from within job-components

* make operation apis only initialized when called to create

* fix running tests inside typescript pkg

* add slice events shim

* improvements to operation apis

* better operation api shimming and examples

* remove unused lib

* Rename makeDataEntity to make

* Rename makeDataEntity to make in tests
  • Loading branch information
peterdemartini authored and kstaken committed Oct 2, 2018
1 parent 182242c commit a176efe
Show file tree
Hide file tree
Showing 80 changed files with 4,403 additions and 610 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"no-path-concat": "error",
"no-debugger": "error",
"handle-callback-err": ["error", "error"],
"import/no-extraneous-dependencies": "off"
"import/no-extraneous-dependencies": "off",
"class-methods-use-this": "off"
}
}
2 changes: 1 addition & 1 deletion e2e/test/cases/cluster/worker-allocation-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ function workersTest(workers, workersExpected, records, done) {
}

describe('worker allocation', () => {
beforeAll(() => resetState());
beforeEach(() => resetState());

it('with 1 worker', (done) => {
workersTest(1, 1, 1000, done);
Expand Down
7 changes: 6 additions & 1 deletion jest.config.base.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module.exports = (projectDir) => {
const rootDir = name === 'e2e' ? '../' : '../../';
const projectRoot = name === 'e2e' ? '<rootDir>/e2e' : `<rootDir>/${workspaceName}/${name}`;
const isTypescript = fs.pathExistsSync(path.join(projectDir, 'tsconfig.json'));
const runInPackage = projectDir === process.cwd();

const config = {
rootDir,
Expand Down Expand Up @@ -67,7 +68,11 @@ module.exports = (projectDir) => {
config.globals['ts-jest'].diagnostics = {
warnOnly: true
};
config.globals['ts-jest'].tsConfig = `./${workspaceName}/${name}/tsconfig.json`;
if (runInPackage) {
config.globals['ts-jest'].tsConfig = './tsconfig.json';
} else {
config.globals['ts-jest'].tsConfig = `./${workspaceName}/${name}/tsconfig.json`;
}
}

config.roots = [
Expand Down
10 changes: 1 addition & 9 deletions packages/job-components/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# `job-components`

> TODO: description
## Usage

```
const { ... } = require('@terascope/job-components');
// TODO: DEMONSTRATE API
```
> A teraslice library for validating jobs schemas, registering apis, and defining and running new Job APIs
4 changes: 4 additions & 0 deletions packages/job-components/examples/asset/asset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"name": "job-components-example",
"version": "v0.1.0"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const { legacyProcessorShim } = require('@terascope/job-components');
const Processor = require('./processor');
const Schema = require('./schema');

// This file for backwards compatibility and functionality will be limited
// but it should allow you to write processors using the new way today
module.exports = legacyProcessorShim(Processor, Schema);
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict';

const { BatchProcessor } = require('@terascope/job-components');

class ExampleBatchProcessor extends BatchProcessor {
onBatch(batch) {
return batch.map((data) => {
data.batchedAt = new Date().toISOString();
return data;
});
}
}

module.exports = ExampleBatchProcessor;
17 changes: 17 additions & 0 deletions packages/job-components/examples/asset/example-batch-op/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const { ConvictSchema } = require('@terascope/job-components');

class Schema extends ConvictSchema {
build() {
return {
example: {
default: 'examples are quick and easy',
doc: 'A random example schema property',
format: 'String',
}
};
}
}

module.exports = Schema;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const { legacyProcessorShim } = require('@terascope/job-components');
const Processor = require('./processor');
const Schema = require('./schema');

// This file for backwards compatibility and functionality will be limited
// but it should allow you to write processors using the new way today
module.exports = legacyProcessorShim(Processor, Schema);
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict';

const { Processor } = require('@terascope/job-components');

class ExampleFilter extends Processor {
onData(data) {
if (data.statusCode > 400) {
return null;
}
return data;
}
}

module.exports = ExampleFilter;
17 changes: 17 additions & 0 deletions packages/job-components/examples/asset/example-filter-op/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const { ConvictSchema } = require('@terascope/job-components');

class Schema extends ConvictSchema {
build() {
return {
example: {
default: 'examples are quick and easy',
doc: 'A random example schema property',
format: 'String',
}
};
}
}

module.exports = Schema;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

const { legacyProcessorShim } = require('@terascope/job-components');
const Processor = require('./processor');
const Schema = require('./schema');

// This file for backwards compatibility and functionality will be limited
// but it should allow you to write processors using the new way today
module.exports = legacyProcessorShim(Processor, Schema);
12 changes: 12 additions & 0 deletions packages/job-components/examples/asset/example-map-op/processor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

const { Processor } = require('@terascope/job-components');

class MapProcessor extends Processor {
onData(data) {
data.touchedAt = new Date().toISOString();
return data;
}
}

module.exports = MapProcessor;
17 changes: 17 additions & 0 deletions packages/job-components/examples/asset/example-map-op/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const { ConvictSchema } = require('@terascope/job-components');

class Schema extends ConvictSchema {
build() {
return {
example: {
default: 'examples are quick and easy',
doc: 'A random example schema property',
format: 'String',
}
};
}
}

module.exports = Schema;
20 changes: 20 additions & 0 deletions packages/job-components/examples/asset/example-reader/api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict';

const { OperationAPI } = require('@terascope/job-components');

class ExampleAPI extends OperationAPI {
name() {
return 'ExampleAPI';
}

async handle(config) {
return {
config,
say() {
return 'hello';
}
};
}
}

module.exports = ExampleAPI;
34 changes: 34 additions & 0 deletions packages/job-components/examples/asset/example-reader/fetcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
'use strict';

const _ = require('lodash');
const request = require('request');
const { Fetcher } = require('@terascope/job-components');

class ExampleFetcher extends Fetcher {
async fetch(startingData) {
const statusCode = await this.getStatusCode(startingData.fromUrl);
return _.times(statusCode, n => ({
id: n,
statusCode,
data: [
_.random(),
_.random(),
_.random(),
]
}));
}

async getStatusCode(url) {
return new Promise((resolve, reject) => {
request.get(url, (err, response) => {
if (err) {
reject(err);
return;
}
resolve(response.statusCode);
});
});
}
}

module.exports = ExampleFetcher;
13 changes: 13 additions & 0 deletions packages/job-components/examples/asset/example-reader/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
'use strict';

const { legacyReaderShim } = require('@terascope/job-components');
const Fetcher = require('./fetcher');
const Slicer = require('./slicer');
const Schema = require('./schema');
const ExampleAPI = require('./api');

// This file for backwards compatibility and functionality will be limited
// but it should allow you to write processors using the new way today
module.exports = legacyReaderShim(Slicer, Fetcher, Schema, {
'example-reader': ExampleAPI
});
17 changes: 17 additions & 0 deletions packages/job-components/examples/asset/example-reader/schema.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const { ConvictSchema } = require('@terascope/job-components');

class Schema extends ConvictSchema {
build() {
return {
example: {
default: 'examples are quick and easy',
doc: 'A random example schema property',
format: 'String',
}
};
}
}

module.exports = Schema;
15 changes: 15 additions & 0 deletions packages/job-components/examples/asset/example-reader/slicer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
'use strict';

const uuidv4 = require('uuid/v4');
const { Slicer } = require('@terascope/job-components');

class ExampleSlicer extends Slicer {
async slice() {
return {
id: uuidv4(),
fetchFrom: 'https://httpstat.us/200'
};
}
}

module.exports = ExampleSlicer;
8 changes: 6 additions & 2 deletions packages/job-components/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/job-components",
"version": "0.4.2",
"version": "0.5.0",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -36,14 +36,18 @@
"test:debug": "env DEBUG='*teraslice*' jest --detectOpenHandles --coverage=false --runInBand"
},
"dependencies": {
"@terascope/queue": "^1.1.3",
"@terascope/teraslice-types": "^0.2.0",
"@types/fs-extra": "^5.0.4",
"@types/lodash": "^4.14.116",
"@types/node": "^10.11.0",
"@types/uuid": "^3.4.4",
"datemath-parser": "^1.0.6",
"fs-extra": "^7.0.0",
"list": "^2.0.15",
"lodash": "^4.17.11",
"moment": "^2.22.2"
"moment": "^2.22.2",
"uuid": "^3.3.2"
},
"devDependencies": {
"@types/jest": "^23.3.2",
Expand Down
4 changes: 2 additions & 2 deletions packages/job-components/src/config-validators.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JobConfig, OpConfig, K8sJobConfig } from '@terascope/teraslice-types';
import { ValidatedJobConfig, OpConfig } from '@terascope/teraslice-types';
import convict from 'convict';
import { opSchema } from './job-schemas';

Expand Down Expand Up @@ -27,7 +27,7 @@ export function validateOpConfig(inputSchema: convict.Schema<any>, inputConfig:
* Merges the provided inputSchema with commonSchema and then validates the
* provided jobConfig against the resulting schema.
*/
export function validateJobConfig(inputSchema: convict.Schema<any>, inputConfig: any): JobConfig | K8sJobConfig {
export function validateJobConfig(inputSchema: convict.Schema<any>, inputConfig: any): ValidatedJobConfig {
const config = convict(inputSchema);

try {
Expand Down
53 changes: 53 additions & 0 deletions packages/job-components/src/execution-context-apis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { OpAPI, OperationAPIConstructor } from './operations/operation-api';
import legacySliceEventsShim from './operations/shims/legacy-slice-events-shim';
import { Context, ExecutionConfig } from '@terascope/teraslice-types';

export class ExecutionContextAPI {
private _registery: APIRegistry = {};
private _apis: APIS = {};
private _context: Context;
private _executionConfig: ExecutionConfig;

constructor(context: Context, executionConfig: ExecutionConfig) {
this._context = context;
this._executionConfig = executionConfig;
}

addToRegistry(name: string, api: OperationAPIConstructor) {
this._registery[name] = api;
}

getAPI(name: string) {
if (this._apis[name] == null) {
throw new Error(`Unable to find API by name "${name}"`);
}
return this._apis[name];
}

async initAPI(name: string, ...params: any[]) {
if (this._registery[name] == null) {
throw new Error(`Unable to find API by name "${name}"`);
}

if (this._apis[name] != null) {
throw new Error(`API "${name}" can only be initalized once`);
}

const API = this._registery[name];
const api = new API(this._context, this._executionConfig);
await api.initialize();

legacySliceEventsShim(api);

this._apis[name] = await api.createAPI(...params);
return this._apis[name];
}
}

interface APIS {
[name: string]: OpAPI;
}

interface APIRegistry {
[name: string]: OperationAPIConstructor;
}
7 changes: 4 additions & 3 deletions packages/job-components/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
export * from './config-validators';
export * from './execution-context-apis';
export * from './formats';
export * from './operation-loader';
export * from './register-apis';
export * from './operations';
export * from './job-validator';
export * from './job-schemas';
export * from './formats';
export * from './config-validators';
export * from './register-apis';
Loading

0 comments on commit a176efe

Please sign in to comment.