diff --git a/package.json b/package.json index 099e702e5d8..ad4780bd48b 100644 --- a/package.json +++ b/package.json @@ -21,11 +21,11 @@ "publish:changed": "./scripts/publish.sh" }, "dependencies": { - "lerna": "^3.4.1", - "typescript": "^3.1.1" + "lerna": "^3.4.3", + "typescript": "^3.1.2" }, "devDependencies": { - "@types/jest": "^23.3.3", + "@types/jest": "^23.3.5", "babel-core": "^6.0.0", "babel-jest": "^23.6.0", "eslint": "^5.6.1", @@ -35,8 +35,8 @@ "jest": "^23.6.0", "jest-extended": "^0.11.0", "lerna-alias": "^3.0.2", - "semver": "^5.5.1", - "ts-jest": "^23.10.3", + "semver": "^5.6.0", + "ts-jest": "^23.10.4", "tslint": "^5.0.0" }, "workspaces": [ diff --git a/packages/docker-compose-js/package.json b/packages/docker-compose-js/package.json index 0b6a23f68ab..eacb93779f6 100644 --- a/packages/docker-compose-js/package.json +++ b/packages/docker-compose-js/package.json @@ -27,7 +27,7 @@ }, "dependencies": { "bluebird": "^3.5.2", - "debug": "^4.0.1" + "debug": "^4.1.0" }, "devDependencies": { "eslint": "^5.6.1", diff --git a/packages/elasticsearch-api/package.json b/packages/elasticsearch-api/package.json index 63822ceccc1..96f3651ad5c 100644 --- a/packages/elasticsearch-api/package.json +++ b/packages/elasticsearch-api/package.json @@ -27,7 +27,7 @@ "uuid": "^3.3.2" }, "devDependencies": { - "debug": "^4.0.1", + "debug": "^4.1.0", "eslint": "^5.6.1", "eslint-config-airbnb-base": "^13.1.0", "eslint-plugin-import": "^2.14.0", diff --git a/packages/job-components/package.json b/packages/job-components/package.json index 8afef0ae201..f7c40d0fc53 100644 --- a/packages/job-components/package.json +++ b/packages/job-components/package.json @@ -39,8 +39,8 @@ "@terascope/queue": "^1.1.4", "@terascope/teraslice-types": "^0.2.1", "@types/fs-extra": "^5.0.4", - "@types/lodash": "^4.14.116", - "@types/node": "^10.11.4", + "@types/lodash": "^4.14.117", + "@types/node": "^10.11.7", "@types/uuid": "^3.4.4", "datemath-parser": "^1.0.6", "fs-extra": "^7.0.0", @@ -50,16 +50,16 @@ "uuid": "^3.3.2" }, "devDependencies": { - "@types/jest": "^23.3.3", + "@types/jest": "^23.3.5", "babel-core": "^6.0.0", "babel-jest": "^23.6.0", "jest": "^23.6.0", "jest-extended": "^0.11.0", "rimraf": "^2.0.0", - "ts-jest": "^23.10.3", + "ts-jest": "^23.10.4", "tslint": "^5.0.0", "tslint-config-airbnb": "^5.11.0", - "typescript": "^3.1.1" + "typescript": "^3.1.2" }, "engines": { "node": ">=8.0.0" diff --git a/packages/job-components/tsconfig.build.json b/packages/job-components/tsconfig.build.json index 97b0c4f0a78..90d9fc7988a 100644 --- a/packages/job-components/tsconfig.build.json +++ b/packages/job-components/tsconfig.build.json @@ -1,6 +1,7 @@ { "extends": "./tsconfig", "compilerOptions": { + "rootDir": "src", "paths": { "@terascope/*": [ "../*", diff --git a/packages/job-components/tsconfig.json b/packages/job-components/tsconfig.json index cef70012c75..fd1a35f48ec 100644 --- a/packages/job-components/tsconfig.json +++ b/packages/job-components/tsconfig.json @@ -2,12 +2,11 @@ "extends": "../../tsconfig", "compilerOptions": { "baseUrl": ".", - "rootDir": "src", "outDir": "dist" }, "references": [ { - "path": "../teraslice-types/src" + "path": "../teraslice-types" } ] } diff --git a/packages/teraslice-client-js/package.json b/packages/teraslice-client-js/package.json index 731177250a9..fea46fc2ed1 100644 --- a/packages/teraslice-client-js/package.json +++ b/packages/teraslice-client-js/package.json @@ -39,6 +39,6 @@ "eslint-plugin-import": "^2.14.0", "jest": "^23.6.0", "jest-extended": "^0.11.0", - "nock": "^10.0.0" + "nock": "^10.0.1" } } diff --git a/packages/teraslice-messaging/package.json b/packages/teraslice-messaging/package.json index 50e0f52e57d..4855d927f40 100644 --- a/packages/teraslice-messaging/package.json +++ b/packages/teraslice-messaging/package.json @@ -1,6 +1,6 @@ { "name": "@terascope/teraslice-messaging", - "version": "0.2.2", + "version": "0.2.3", "publishConfig": { "access": "public" }, @@ -39,32 +39,33 @@ "@terascope/queue": "^1.1.4", "@terascope/teraslice-types": "^0.2.1", "@types/bluebird": "^3.5.24", - "@types/debug": "^0.0.30", + "@types/debug": "^0.0.31", "@types/fs-extra": "^5.0.4", - "@types/lodash": "^4.14.116", + "@types/lodash": "^4.14.117", "@types/nanoid": "^1.2.0", - "@types/node": "^10.11.4", + "@types/node": "^10.11.7", + "@types/p-event": "^1.3.0", "@types/socket.io": "^1.4.38", "@types/socket.io-client": "^1.4.32", "bluebird": "^3.5.2", - "debug": "^4.0.1", - "emittery": "^0.4.1", + "debug": "^4.1.0", "nanoid": "^1.2.6", + "p-event": "^2.1.0", "porty": "^3.1.1", "socket.io": "^1.7.4", "socket.io-client": "^1.7.4" }, "devDependencies": { - "@types/jest": "^23.3.3", + "@types/jest": "^23.3.5", "babel-core": "^6.0.0", "babel-jest": "^23.6.0", "jest": "^23.6.0", "jest-extended": "^0.11.0", "rimraf": "^2.0.0", - "ts-jest": "^23.10.3", + "ts-jest": "^23.10.4", "tslint": "^5.0.0", "tslint-config-airbnb": "^5.11.0", - "typescript": "^3.1.1" + "typescript": "^3.1.2" }, "engines": { "node": ">=8.0.0" diff --git a/packages/teraslice-messaging/src/cluster-master/client.ts b/packages/teraslice-messaging/src/cluster-master/client.ts index f028ac1fe04..eadea55b0c3 100644 --- a/packages/teraslice-messaging/src/cluster-master/client.ts +++ b/packages/teraslice-messaging/src/cluster-master/client.ts @@ -1,4 +1,4 @@ -import _ from 'lodash'; +import { isString } from 'lodash'; import * as i from './interfaces'; import * as core from '../messenger'; @@ -15,11 +15,11 @@ export class Client extends core.Client { connectTimeout } = opts; - if (!clusterMasterUrl || !_.isString(clusterMasterUrl)) { + if (!clusterMasterUrl || !isString(clusterMasterUrl)) { throw new Error('ClusterMaster.Client requires a valid clusterMasterUrl'); } - if (!exId || !_.isString(exId)) { + if (!exId || !isString(exId)) { throw new Error('ClusterMaster.Client requires a valid exId'); } @@ -65,14 +65,14 @@ export class Client extends core.Client { } onExecutionAnalytics(fn: core.MessageHandler) { - this.socket.on('execution:analytics', this.handleResponse('execution:analytics', fn)); + this.handleResponse(this.socket, 'execution:analytics', fn); } onExecutionPause(fn: core.MessageHandler) { - this.socket.on('execution:pause', this.handleResponse('execution:pause', fn)); + this.handleResponse(this.socket, 'execution:pause', fn); } onExecutionResume(fn: core.MessageHandler) { - this.socket.on('execution:resume', this.handleResponse('execution:resume', fn)); + this.handleResponse(this.socket, 'execution:resume', fn); } } diff --git a/packages/teraslice-messaging/src/cluster-master/server.ts b/packages/teraslice-messaging/src/cluster-master/server.ts index 55b377e14f6..570056f444e 100644 --- a/packages/teraslice-messaging/src/cluster-master/server.ts +++ b/packages/teraslice-messaging/src/cluster-master/server.ts @@ -1,4 +1,4 @@ -import _ from 'lodash'; +import { isNumber, cloneDeep, forOwn } from 'lodash'; import * as i from './interfaces'; import * as core from '../messenger'; @@ -17,7 +17,7 @@ export class Server extends core.Server { pingTimeout, } = opts; - if (!_.isNumber(nodeDisconnectTimeout)) { + if (!isNumber(nodeDisconnectTimeout)) { throw new Error('ClusterMaster.Server requires a valid nodeDisconnectTimeout'); } @@ -49,7 +49,7 @@ export class Server extends core.Server { async start() { this.on('connection', (msg) => { - this.onConnection(msg.clientId, msg.payload as SocketIO.Socket); + this.onConnection(msg.scope, msg.payload as SocketIO.Socket); }); await this.listen(); @@ -68,38 +68,38 @@ export class Server extends core.Server { } getClusterAnalytics() { - return _.cloneDeep(this.clusterAnalytics); + return cloneDeep(this.clusterAnalytics); } onExecutionFinished(fn: (clientId: string, error?: core.ResponseError) => {}) { this.on('execution:finished', (msg) => { - fn(msg.clientId, msg.error); + fn(msg.scope, msg.error); }); } private onConnection(exId: string, socket: SocketIO.Socket) { - socket.on('execution:finished', this.handleResponse('execution:finished', (msg: core.Message) => { + this.handleResponse(socket, 'execution:finished', (msg: core.Message) => { this.emit('execution:finished', { - clientId: exId, + scope: exId, payload: {}, error: msg.payload.error }); - })); + }); - socket.on('cluster:analytics', this.handleResponse('cluster:analytics', (msg: core.Message) => { + this.handleResponse(socket, 'cluster:analytics', (msg: core.Message) => { const data = msg.payload as i.ExecutionAnalyticsMessage; if (!this.clusterAnalytics[data.kind]) { return; } - _.forOwn(data.stats, (value, field) => { + forOwn(data.stats, (value, field) => { if (this.clusterAnalytics[data.kind][field] != null) { this.clusterAnalytics[data.kind][field] += value; } }); this.emit('cluster:analytics', { - clientId: exId, + scope: exId, payload: { diff: data.stats, current: this.clusterAnalytics[data.kind], @@ -109,6 +109,6 @@ export class Server extends core.Server { return { recorded: true }; - })); + }); } } diff --git a/packages/teraslice-messaging/src/execution-controller/client.ts b/packages/teraslice-messaging/src/execution-controller/client.ts index 2f4f5e09700..451255e2b8d 100644 --- a/packages/teraslice-messaging/src/execution-controller/client.ts +++ b/packages/teraslice-messaging/src/execution-controller/client.ts @@ -46,7 +46,7 @@ export class Client extends core.Client { throw new Error(`Unable to connect to execution controller, caused by error: ${err.message}`); } - this.socket.on('execution:slice:new', this.handleResponse('execution:slice:new', (msg: core.Message) => { + this.handleResponse(this.socket, 'execution:slice:new', (msg: core.Message) => { if (this.listenerCount('execution:slice:new') === 0) { return { willProcess: false }; } @@ -62,13 +62,13 @@ export class Client extends core.Client { return { willProcess, }; - })); + }); - this.socket.on('execution:finished', this.handleResponse('execution:finished', (msg: core.Message) => { + this.handleResponse(this.socket, 'execution:finished', (msg: core.Message) => { this.emit('execution:finished', { payload: msg.payload }); - })); + }); } onExecutionFinished(fn: () => void) { @@ -86,10 +86,11 @@ export class Client extends core.Client { this.sendAvailable(); const slice = await new Promise((resolve) => { - const unsubscribe = this.on('execution:slice:new', onMessage); + this.once('execution:slice:new', onMessage); const intervalId = setInterval(() => { if (this.serverShutdown || !this.ready || fn()) { + this.removeListener('execution:slice:new', onMessage); finish(); } }, interval); @@ -100,7 +101,6 @@ export class Client extends core.Client { function finish(slice?: Slice) { clearInterval(intervalId); - unsubscribe(); resolve(slice); } }); diff --git a/packages/teraslice-messaging/src/execution-controller/interfaces.ts b/packages/teraslice-messaging/src/execution-controller/interfaces.ts index 07f836e3818..088dd2c0c01 100644 --- a/packages/teraslice-messaging/src/execution-controller/interfaces.ts +++ b/packages/teraslice-messaging/src/execution-controller/interfaces.ts @@ -23,7 +23,7 @@ export interface Worker { } export interface ActiveWorkers { - [workerId: string]: string; + [workerId: string]: boolean; } export interface SliceResponseMessage { diff --git a/packages/teraslice-messaging/src/execution-controller/server.ts b/packages/teraslice-messaging/src/execution-controller/server.ts index 66252596828..352bf02cd91 100644 --- a/packages/teraslice-messaging/src/execution-controller/server.ts +++ b/packages/teraslice-messaging/src/execution-controller/server.ts @@ -1,4 +1,4 @@ -import _ from 'lodash'; +import { isNumber, get } from 'lodash'; import debugFn from 'debug'; import Queue from '@terascope/queue'; import { Slice } from '@terascope/teraslice-types'; @@ -7,8 +7,10 @@ import * as i from './interfaces'; const debug = debugFn('teraslice-messaging:execution-controller:server'); +const { Available, Unavailable } = core.ClientState; + export class Server extends core.Server { - private _activeWorkers: string[]; + private _activeWorkers: i.ActiveWorkers; queue: Queue; constructor(opts: i.ServerOptions) { @@ -21,7 +23,7 @@ export class Server extends core.Server { workerDisconnectTimeout, } = opts; - if (!_.isNumber(workerDisconnectTimeout)) { + if (!isNumber(workerDisconnectTimeout)) { throw new Error('ExecutionController.Server requires a valid workerDisconnectTimeout'); } @@ -36,12 +38,12 @@ export class Server extends core.Server { }); this.queue = new Queue(); - this._activeWorkers = []; + this._activeWorkers = {}; } async start() { this.on('connection', (msg) => { - this.onConnection(msg.clientId, msg.payload as SocketIO.Socket); + this.onConnection(msg.scope, msg.payload as SocketIO.Socket); }); this.onClientUnavailable((workerId) => { @@ -49,11 +51,12 @@ export class Server extends core.Server { }); this.onClientDisconnect((workerId) => { - this._activeWorkers = _.without(this._activeWorkers, workerId); + delete this._activeWorkers[workerId]; this._workerRemove(workerId); }); this.onClientAvailable((workerId) => { + this._activeWorkers[workerId] = false; this._workerEnqueue(workerId); }); @@ -65,7 +68,7 @@ export class Server extends core.Server { this.queue.remove(worker.workerId, 'workerId'); }); - this._activeWorkers = []; + this._activeWorkers = {}; await super.shutdown(); } @@ -76,30 +79,32 @@ export class Server extends core.Server { } async dispatchSlice(slice: Slice, workerId: string): Promise { - const isAvailable = _.get(this._clients, [workerId, 'state']) === core.ClientState.Available; + const isAvailable = this._clients[workerId] && this._clients[workerId].state === Available; if (!isAvailable) { debug(`worker ${workerId} is not available`); return false; } // first assume the slice is dispatched - this._activeWorkers = _.union(this._activeWorkers, [workerId]); + this._activeWorkers[workerId] = true; let dispatched = false; try { const response = await this.send(workerId, 'execution:slice:new', slice); - dispatched = _.get(response, 'payload.willProcess', false); + if (response && response.payload) { + dispatched = response.payload.willProcess; + } } catch (error) { debug(`got error when dispatching slice ${slice.slice_id}`, error); } if (!dispatched) { debug(`failure to dispatch slice ${slice.slice_id} to worker ${workerId}`); - this._activeWorkers = _.without(this._activeWorkers, workerId); + this._activeWorkers[workerId] = false; } else { - this.updateClientState(workerId, { - state: core.ClientState.Unavailable, + process.nextTick(() => { + this.updateClientState(workerId, Unavailable); }); } @@ -108,17 +113,13 @@ export class Server extends core.Server { onSliceSuccess(fn: (workerId: string, payload: i.SliceCompletePayload) => {}) { this.on('slice:success', (msg) => { - _.defer(() => { - fn(msg.clientId, msg.payload); - }); + fn(msg.scope, msg.payload); }); } onSliceFailure(fn: (workerId: string, payload: i.SliceCompletePayload) => {}) { this.on('slice:failure', (msg) => { - _.defer(() => { - fn(msg.clientId, msg.payload); - }); + fn(msg.scope, msg.payload); }); } @@ -129,8 +130,8 @@ export class Server extends core.Server { }); } - get activeWorkers(): string[] { - return _.clone(this._activeWorkers); + get activeWorkerCount(): number { + return Object.values(this._activeWorkers).filter((v) => v).length; } get workerQueueSize(): number { @@ -138,23 +139,23 @@ export class Server extends core.Server { } private onConnection(workerId: string, socket: SocketIO.Socket) { - socket.on('worker:slice:complete', this.handleResponse('worker:slice:complete', async (msg) => { + this.handleResponse(socket, 'worker:slice:complete', async (msg) => { const { payload } = msg; - const sliceId = _.get(payload, 'slice.slice_id'); + const sliceId = get(payload, 'slice.slice_id'); if (payload.error) { - this.emit('slice:failure', { clientId: workerId, payload }); + this.emit('slice:failure', { scope: workerId, payload }); } else { - this.emit('slice:success', { clientId: workerId, payload }); + this.emit('slice:success', { scope: workerId, payload }); } - this._activeWorkers = _.without(this._activeWorkers, workerId); + this._activeWorkers[workerId] = false; - return _.pickBy({ + return { recorded: true, slice_id: sliceId, - }); - })); + }; + }); } private _workerEnqueue(workerId: string): boolean { @@ -167,7 +168,7 @@ export class Server extends core.Server { this.queue.enqueue({ workerId }); } - this.emit('worker:enqueue', { clientId: workerId, payload: {} }); + this.emit('worker:enqueue', { scope: '', payload: {} }); return exists; } @@ -183,8 +184,7 @@ export class Server extends core.Server { } if (workerId != null) { - this._activeWorkers = _.without(this._activeWorkers, workerId); - this.emit('worker:dequeue', { clientId: workerId, payload: {} }); + this._activeWorkers[workerId] = false; } return workerId; @@ -195,7 +195,6 @@ export class Server extends core.Server { this.queue.remove(workerId, 'workerId'); - this.emit('worker:dequeue', { clientId: workerId, payload: {} }); return true; } } diff --git a/packages/teraslice-messaging/src/messenger/client.ts b/packages/teraslice-messaging/src/messenger/client.ts index 7a636233080..9e0168ee2b5 100644 --- a/packages/teraslice-messaging/src/messenger/client.ts +++ b/packages/teraslice-messaging/src/messenger/client.ts @@ -52,12 +52,13 @@ export class Client extends Core { const options: SocketIOClient.ConnectOpts = Object.assign({}, socketOptions, { autoConnect: false, forceNew: true, + perMessageDeflate: false, query: { clientId, clientType }, - // transports: ['websocket'], timeout: connectTimeout }); - this.socket = SocketIOClient(hostUrl, options); + // @ts-ignore + this.socket = new SocketIOClient(hostUrl, options); this.hostUrl = hostUrl; this.connectTimeout = connectTimeout; @@ -88,8 +89,9 @@ export class Client extends Core { let connectTimeout: NodeJS.Timer | undefined; const cleanup = () => { - if (connectTimeout) { + if (connectTimeout != null) { clearTimeout(connectTimeout); + connectTimeout = undefined; } this.socket.removeListener('connect', connect); }; @@ -120,7 +122,7 @@ export class Client extends Core { if (!this.available) return; - setImmediate(async () => { + process.nextTick(async () => { try { await this.sendAvailable(); } catch (err) { @@ -141,6 +143,13 @@ export class Client extends Core { this.emit('server:shutdown'); }); + this.socket.on('message:response', (msg: i.Message) => { + this.emit(msg.id, { + scope: msg.from, + payload: msg, + }); + }); + this.socket.on('connect', () => { debug(`client ${this.clientId} connected`); this.ready = true; @@ -154,6 +163,8 @@ export class Client extends Core { } async sendAvailable(payload?: i.Payload) { + if (this.available) return; + this.available = true; return this.send(`client:${i.ClientState.Available}`, payload, { volatile: true, @@ -161,6 +172,8 @@ export class Client extends Core { } async sendUnavailable(payload?: i.Payload) { + if (!this.available) return; + this.available = false; return this.send(`client:${i.ClientState.Unavailable}`, payload, { volatile: true, @@ -173,7 +186,7 @@ export class Client extends Core { if (!this.ready && !options.volatile) { const connected = this.socket.connected ? 'connected' : 'not-connected'; debug(`server is not ready and ${connected}, waiting for the ready event`); - await this.onceWithTimeout('ready'); + await this.onceWithTimeout(`ready:${this.serverName}`); } const response = options.response != null ? options.response : true; @@ -191,19 +204,13 @@ export class Client extends Core { }; const responseMsg = this.handleSendResponse(message); - this.socket.emit(eventName, message, this._sendCallbackFn); + this.socket.emit(eventName, message); return responseMsg; } - async emit(eventName: string, msg: i.EventMessage = { payload: {} }) { - await Promise.all([ - super.emit(`${eventName}`, msg), - super.emit(`${eventName}:${this.serverName}`, msg), - ]); - } - - on(eventName: string, fn: (msg: i.EventMessage) => void) { - return super.on(eventName, fn); + emit(eventName: string, msg: i.ClientEventMessage = { payload: {} }) { + msg.scope = this.serverName; + super.emit(`${eventName}`, msg as i.EventMessage); } isClientReady() { diff --git a/packages/teraslice-messaging/src/messenger/core.ts b/packages/teraslice-messaging/src/messenger/core.ts index 84ef13a7dc0..554a67e885a 100644 --- a/packages/teraslice-messaging/src/messenger/core.ts +++ b/packages/teraslice-messaging/src/messenger/core.ts @@ -1,11 +1,12 @@ import debugFn from 'debug'; -import _ from 'lodash'; -import Emittery from 'emittery'; +import { toString, isSafeInteger } from 'lodash'; +import { EventEmitter } from 'events'; +import pEvent from 'p-event'; import * as i from './interfaces'; const debug = debugFn('teraslice-messaging:core'); -export class Core extends Emittery { +export class Core extends EventEmitter { public closed: boolean = false; protected networkLatencyBuffer: number; @@ -17,20 +18,18 @@ export class Core extends Emittery { this.networkLatencyBuffer = opts.networkLatencyBuffer || 0; this.actionTimeout = opts.actionTimeout; - if (!_.isSafeInteger(this.actionTimeout) || !this.actionTimeout) { + if (!isSafeInteger(this.actionTimeout) || !this.actionTimeout) { throw new Error('Messenger requires a valid actionTimeout'); } - if (!_.isSafeInteger(this.networkLatencyBuffer)) { + if (!isSafeInteger(this.networkLatencyBuffer)) { throw new Error('Messenger requires a valid networkLatencyBuffer'); } - - this._sendCallbackFn = this._sendCallbackFn.bind(this); } close() { this.closed = true; - this.clearListeners(); + this.removeAllListeners(); } protected async handleSendResponse(sent: i.Message): Promise { @@ -38,9 +37,6 @@ export class Core extends Emittery { debug('waiting for response from message', sent); const remaining = sent.respondBy - Date.now(); - const timeoutError = new Error(`Timed out after ${remaining}ms, waiting for message "${sent.eventName}"`); - - const responseError = new Error(`${sent.eventName} Message Response Failure`); const response = await this.onceWithTimeout(sent.id, remaining); // it is a timeout @@ -48,31 +44,20 @@ export class Core extends Emittery { if (sent.volatile || this.closed) { return null; } - throw timeoutError; + throw new Error(`Timed out after ${remaining}ms, waiting for message "${sent.eventName}"`); } if (response.error) { - responseError.message += `: ${response.error}`; - // @ts-ignore - responseError.response = response; - debug('message send response error', responseError); - throw responseError; + throw new Error(`${sent.eventName} Message Response Failure: ${response.error}`); } return response; } - protected _sendCallbackFn(response: i.Message) { - this.emit(response.id, { - clientId: response.to, - payload: response - }); - } - - protected handleResponse(eventName: string, fn: i.MessageHandler) { + protected handleResponse(socket: i.SocketEmitter, eventName: string, fn: i.MessageHandler) { debug(`registering response handler for ${eventName}`); - return async (msg: i.Message, callback: (msg?: i.Message) => void) => { + socket.on(eventName, async (msg: i.Message) => { const message: i.Message = Object.assign({}, msg, { from: msg.to, to: msg.from, @@ -85,7 +70,7 @@ export class Core extends Emittery { message.payload = payload; } } catch (err) { - message.error = _.toString(err); + message.error = toString(err); } if (!msg.response) { @@ -97,9 +82,9 @@ export class Core extends Emittery { await this.waitForClientReady(message.to, remaining); } - debug(`responding to ${eventName} with message`, message); - callback(message); - }; + // @ts-ignore + socket.emit('message:response', message); + }); } isClientReady(clientId?: string): boolean { @@ -113,7 +98,7 @@ export class Core extends Emittery { } debug(`waiting for ${clientId} to be ready`); - await this.onceWithTimeout('ready', clientId, timeout); + await this.onceWithTimeout(`ready:${clientId}`, timeout); const isReady = this.isClientReady(clientId); if (!isReady) { throw new Error(`Client ${clientId} is not ready`); @@ -130,46 +115,24 @@ export class Core extends Emittery { return (timeout || this.actionTimeout) + this.networkLatencyBuffer; } - async onceWithTimeout(eventName: string, timeout?: number): Promise; - async onceWithTimeout(eventName: string, forClientId: string, timeout?: number): Promise; - async onceWithTimeout(_eventName: string, ...params: any[]): Promise { - let timeoutMs: number = this.getTimeout(); - let forClientId: string|undefined; - - if (_.isNumber(params[0])) { - timeoutMs = this.getTimeout(params[0]); - } else if (_.isString(params[0])) { - forClientId = params[0]; - if (_.isNumber(params[1])) { - timeoutMs = this.getTimeout(params[1]); - } + // @ts-ignore + emit(eventName: string, msg: i.EventMessage) { + super.emit(`${eventName}`, msg); + if (msg.scope) { + super.emit(`${eventName}:${msg.scope}`, msg); } + } - const eventName = forClientId != null ? `${_eventName}:${forClientId}` : _eventName; - - const startTime = Date.now(); - debug(`onceWithTimeout(${eventName}, ${timeoutMs}) - started`); - - const result = await new Promise((resolve) => { - let unsubscribe: Emittery.UnsubscribeFn|undefined; - let timer: NodeJS.Timer|undefined; - - const finish = _.once((result?: any) => { - if (unsubscribe != null) unsubscribe(); - if (timer != null) clearTimeout(timer); - - resolve(result); - }); - - unsubscribe = this.on(eventName, (msg: i.ClientEventMessage|i.EventMessage) => { - finish(msg.payload); - }); - - timer = setTimeout(() => { finish(); }, timeoutMs); - }); - - const elapsed = Date.now() - startTime; - debug(`onceWithTimeout(${eventName}, ${timeoutMs}) - finished, took ${elapsed}ms`); - return result; + async onceWithTimeout(eventName: string, timeout?: number): Promise { + const timeoutMs: number = this.getTimeout(timeout); + try { + const { payload } = await pEvent(this, eventName, { + rejectionEvents: [], + timeout: timeoutMs + }) as i.EventMessage; + return payload; + } catch (err) { + return undefined; + } } } diff --git a/packages/teraslice-messaging/src/messenger/interfaces.ts b/packages/teraslice-messaging/src/messenger/interfaces.ts index 50e3cf33b2f..60cbe6187ca 100644 --- a/packages/teraslice-messaging/src/messenger/interfaces.ts +++ b/packages/teraslice-messaging/src/messenger/interfaces.ts @@ -60,18 +60,12 @@ export interface SendOptions { export interface ConnectedClient { readonly clientId: string; - readonly clientType: string; - socketId: string; state: ClientState; - createdAt: Date; - updatedAt: Date; - offlineAt: Date|null; + offlineAt: number|null; } export interface UpdateClientState { state: ClientState; - socketId?: string; - metadata?: object; error?: Error|string; payload?: Payload; } @@ -94,25 +88,35 @@ export interface ConnectedClients { [clientId: string]: ConnectedClient; } +export interface ClientSendFn { + (message: Message): void; +} + export interface ClientSendFns { - [clientId: string]: (message: Message) => void; + [clientId: string]: ClientSendFn|null; +} + +export interface MessageHandler { + (msg: Message): Promise|Payload|void; } export interface EventMessage { + scope: string; payload: any; error?: Error|ResponseError; } export interface ClientEventMessage { - clientId: string; + scope?: string; payload: any; error?: Error|ResponseError; } -export interface ClientEventFn { - (msg: ClientEventMessage): void; +export interface EventListener { + (msg: EventMessage): void; } -export interface MessageHandler { - (msg: Message): Promise|Payload|void; +export interface SocketEmitter { + on(eventName: string, fn: (msg: Message) => void): void; + emit(eventName: string, msg: Message): void; } diff --git a/packages/teraslice-messaging/src/messenger/server.ts b/packages/teraslice-messaging/src/messenger/server.ts index d5e87b2bb40..fbc0e24209f 100644 --- a/packages/teraslice-messaging/src/messenger/server.ts +++ b/packages/teraslice-messaging/src/messenger/server.ts @@ -1,7 +1,7 @@ 'use strict'; import debugFn from 'debug'; -import _ from 'lodash'; +import { isString, isNumber, clone, get } from 'lodash'; import SocketIOServer from 'socket.io'; import http from 'http'; import porty from 'porty'; @@ -33,6 +33,8 @@ const connectedStates = [ ...onlineStates ]; +const isTesting = process.env.NODE_ENV === 'test'; + export class Server extends Core { isShuttingDown: boolean; readonly port: number; @@ -42,7 +44,6 @@ export class Server extends Core { readonly clientDisconnectTimeout: number; private _cleanupClients: NodeJS.Timer|undefined; protected _clients: i.ConnectedClients; - protected _clientSendFns: i.ClientSendFns; constructor(opts: i.ServerOptions) { const { @@ -56,15 +57,15 @@ export class Server extends Core { } = opts; super(opts); - if (!_.isNumber(port)) { + if (!isNumber(port)) { throw new Error('Messenger.Server requires a valid port'); } - if (!_.isString(serverName)) { + if (!isString(serverName)) { throw new Error('Messenger.Server requires a valid serverName'); } - if (!_.isNumber(clientDisconnectTimeout)) { + if (!isNumber(clientDisconnectTimeout)) { throw new Error('Messenger.Server requires a valid clientDisconnectTimeout'); } @@ -72,11 +73,11 @@ export class Server extends Core { this.serverName = serverName; this.clientDisconnectTimeout = clientDisconnectTimeout; - this.server = SocketIOServer({ + // @ts-ignore + this.server = new SocketIOServer({ pingTimeout, pingInterval, - // transports: ['websocket'], - // allowUpgrades: false, + perMessageDeflate: false, serveClient: false, }); @@ -89,7 +90,6 @@ export class Server extends Core { this.isShuttingDown = false; this._clients = {}; - this._clientSendFns = {}; this._onConnection = this._onConnection.bind(this); } @@ -112,35 +112,31 @@ export class Server extends Core { this.server.attach(this.httpServer); this.server.use((socket, next) => { - socket.join(socket.handshake.query.clientType, next); + socket.join(socket.handshake.query.clientId, next); }); this.server.on('connection', this._onConnection); this.onClientReconnect((clientId) => { - this.emit('ready', { clientId, payload: {} }); + this.emit('ready', { scope: clientId, payload: {} }); }); this.onClientOnline((clientId) => { - this.emit('ready', { clientId, payload: {} }); + this.emit('ready', { scope: clientId, payload: {} }); }); this._cleanupClients = setInterval(() => { - _.forEach(this._clients, (client: i.ConnectedClient) => { - if (client.state === i.ClientState.Shutdown) { - this.updateClientState(client.clientId, { - state: i.ClientState.Offline, - }); + for (const { clientId, state, offlineAt } of Object.values(this._clients)) { + if (state === i.ClientState.Shutdown) { + this.updateClientState(clientId, i.ClientState.Offline); } - if (client.state === i.ClientState.Disconnected && client.offlineAt) { - if (client.offlineAt.getTime() > Date.now()) { - this.updateClientState(client.clientId, { - state: i.ClientState.Offline, - }); + if (state === i.ClientState.Disconnected && offlineAt) { + if (offlineAt > Date.now()) { + this.updateClientState(clientId, i.ClientState.Offline); } } - }); - }, 1000); + } + }, isTesting ? 100 : 5000); } async shutdown() { @@ -148,11 +144,11 @@ export class Server extends Core { if (this._cleanupClients != null) { clearInterval(this._cleanupClients); + this._cleanupClients = undefined; } if (this.closed) { this._clients = {}; - this._clientSendFns = {}; return; } @@ -171,13 +167,12 @@ export class Server extends Core { }); this._clients = {}; - this._clientSendFns = {}; super.close(); } get connectedClients(): i.ConnectedClient[] { - return _.clone(this.filterClientsByState(connectedStates)); + return clone(this.filterClientsByState(connectedStates)); } get connectedClientCount(): number { @@ -185,7 +180,7 @@ export class Server extends Core { } get onlineClients(): i.ConnectedClient[] { - return _.clone(this.filterClientsByState(onlineStates)); + return clone(this.filterClientsByState(onlineStates)); } get onlineClientCount(): number { @@ -193,7 +188,7 @@ export class Server extends Core { } get disconnectedClients(): i.ConnectedClient[] { - return _.clone(this.filterClientsByState(disconnectedStates)); + return clone(this.filterClientsByState(disconnectedStates)); } get disconectedClientCount(): number { @@ -201,7 +196,7 @@ export class Server extends Core { } get offlineClients(): i.ConnectedClient[] { - return _.clone(this.filterClientsByState([i.ClientState.Offline])); + return clone(this.filterClientsByState([i.ClientState.Offline])); } get offlineClientCount(): number { @@ -209,7 +204,7 @@ export class Server extends Core { } get availableClients(): i.ConnectedClient[] { - return _.clone(this.filterClientsByState([i.ClientState.Available])); + return clone(this.filterClientsByState([i.ClientState.Available])); } get availableClientCount(): number { @@ -217,7 +212,7 @@ export class Server extends Core { } get unavailableClients(): i.ConnectedClient[] { - return _.clone(this.filterClientsByState(unavailableStates)); + return clone(this.filterClientsByState(unavailableStates)); } get unavailableClientCount(): number { @@ -225,79 +220,68 @@ export class Server extends Core { } onClientOnline(fn: (clientId: string) => void) { - this.on(`client:${i.ClientState.Online}`, (msg) => { - fn(msg.clientId); + return this.on(`client:${i.ClientState.Online}`, (msg) => { + fn(msg.scope); }); } onClientAvailable(fn: (clientId: string) => void) { - this.on(`client:${i.ClientState.Available}`, (msg) => { - fn(msg.clientId); + return this.on(`client:${i.ClientState.Available}`, (msg) => { + fn(msg.scope); }); } onClientUnavailable(fn: (clientId: string) => void) { - this.on(`client:${i.ClientState.Unavailable}`, (msg) => { - fn(msg.clientId); + return this.on(`client:${i.ClientState.Unavailable}`, (msg) => { + fn(msg.scope); }); } onClientOffline(fn: (clientId: string) => void) { - this.on(`client:${i.ClientState.Offline}`, (msg) => { - fn(msg.clientId); + return this.on(`client:${i.ClientState.Offline}`, (msg) => { + fn(msg.scope); }); } onClientDisconnect(fn: (clientId: string) => void) { - this.on(`client:${i.ClientState.Disconnected}`, (msg) => { - fn(msg.clientId); + return this.on(`client:${i.ClientState.Disconnected}`, (msg) => { + fn(msg.scope); }); } onClientShutdown(fn: (clientId: string) => void) { - this.on(`client:${i.ClientState.Shutdown}`, (msg) => { - fn(msg.clientId); + return this.on(`client:${i.ClientState.Shutdown}`, (msg) => { + fn(msg.scope); }); } onClientReconnect(fn: (clientId: string) => void) { - this.on('client:reconnect', (msg) => { - fn(msg.clientId); + return this.on('client:reconnect', (msg) => { + fn(msg.scope); }); } onClientError(fn: (clientId: string) => void) { - this.on('client:error', (msg) => { - fn(msg.clientId); + return this.on('client:error', (msg) => { + fn(msg.scope); }); } - async emit(eventName: string, msg: i.ClientEventMessage) { - await Promise.all([ - super.emit(`${eventName}`, msg), - super.emit(`${eventName}:${msg.clientId}`, msg), - ]); - } - - on(eventName: string, fn: (msg: i.ClientEventMessage) => void) { - return super.on(eventName, fn); - } - isClientReady(clientId: string) { - const clientState = _.get(this._clients, [clientId, 'state']); + const clientState = get(this._clients, [clientId, 'state']); return onlineStates.includes(clientState); } protected sendToAll(eventName: string, payload?: i.Payload, options: i.SendOptions = { volatile: true, response: true }) { const clients = this.filterClientsByState(onlineStates); - const promises = _.map(clients, (client) => { + const promises = Object.values(clients).map((client) => { return this.send(client.clientId, eventName, payload, options); }); return Promise.all(promises); } protected async send(clientId: string, eventName: string, payload: i.Payload = {}, options: i.SendOptions = { response: true }): Promise { - if (!_.has(this._clientSendFns, clientId)) { + if (!this.isClientConnected(clientId)) { throw new Error(`No client found by that id "${clientId}"`); } @@ -326,16 +310,24 @@ export class Server extends Core { }; const responseMsg = this.handleSendResponse(message); - this._clientSendFns[clientId](message); + + this.server.to(clientId).emit(message.eventName, message); + return responseMsg; } + isClientConnected(clientId: string): boolean { + if (this._clients[clientId] == null) return false; + const { state } = this._clients[clientId]; + return connectedStates.includes(state); + } + protected getClientMetadataFromSocket(socket: SocketIO.Socket): i.ClientSocketMetadata { return socket.handshake.query; } private filterClientsByState(states: i.ClientState[]): i.ConnectedClient[] { - return _.filter(this._clients, (client) => { + return Object.values(this._clients).filter((client) => { return states.includes(client.state); }); } @@ -350,125 +342,76 @@ export class Server extends Core { return count; } - protected updateClientState(clientId: string, update: i.UpdateClientState): boolean { - const client = this._clients[clientId]; - if (!client) { + protected updateClientState(clientId: string, state: i.ClientState): boolean { + if (this._clients[clientId] == null) { debug(`${clientId} does not exist and cannot be updated`); return false; } - const currentState = client.state; - if (currentState === update.state) { + const currentState = this._clients[clientId].state; + if (currentState === state) { debug(`${clientId} state of ${currentState} is the same, skipping update`); return false; } - if (currentState === i.ClientState.Shutdown && update.state !== i.ClientState.Offline) { + if (currentState === i.ClientState.Shutdown && state !== i.ClientState.Offline) { debug(`${clientId} state of ${currentState} can only be updated to offline`); return false; } - const updatedAt = new Date(); - this._clients[clientId].state = update.state; - this._clients[clientId].updatedAt = updatedAt; - - if (update.socketId) { - this._clients[clientId].socketId = update.socketId; - } + this._clients[clientId].state = state; const eventMsg = { - clientId, - payload: update.payload, - error: update.error, + scope: clientId, + payload: {} }; - switch (update.state) { - case i.ClientState.Online: - this.emit('client:reconnect', eventMsg); - return true; - - case i.ClientState.Available: - this.emit(`client:${update.state}`, eventMsg); - return true; - - case i.ClientState.Unavailable: - this.emit(`client:${update.state}`, eventMsg); - return true; + if (state === i.ClientState.Disconnected) { + if (currentState === i.ClientState.Available) { + debug(`${clientId} is unavailable because it was marked as disconnected`); + this.emit(`client:${i.ClientState.Unavailable}`, eventMsg); + } - case i.ClientState.Shutdown: - this.emit(`client:${update.state}`, eventMsg); - return true; + this._clients[clientId].offlineAt = Date.now() + this.clientDisconnectTimeout; - case i.ClientState.Disconnected: + debug(`${clientId} is disconnected will be considered offline in ${this.clientDisconnectTimeout}`); + } else if (state === i.ClientState.Offline) { + if (!disconnectedStates.includes(currentState)) { if (currentState === i.ClientState.Available) { - debug(`${clientId} is unavailable because it was marked as disconnected`); + debug(`${clientId} is unavailable because it was marked as offline`); this.emit(`client:${i.ClientState.Unavailable}`, eventMsg); } - const offlineAtMs = Date.now() + this.clientDisconnectTimeout; - this._clients[clientId].offlineAt = new Date(offlineAtMs); - - debug(`${clientId} is disconnected will be considered offline in ${this.clientDisconnectTimeout}`); - this.emit(`client:${update.state}`, eventMsg); - return true; - - case i.ClientState.Offline: - if (!disconnectedStates.includes(currentState)) { - if (currentState === i.ClientState.Available) { - debug(`${clientId} is unavailable because it was marked as offline`); - this.emit(`client:${i.ClientState.Unavailable}`, eventMsg); - } - - debug(`${clientId} is disconnected because it was marked as offline`); - this.emit(`client:${i.ClientState.Disconnected}`, eventMsg); - } - - this.emit(`client:${update.state}`, eventMsg); - - // cleanup socket and such - const { socketId } = this._clients[clientId]; - - if (this.server.sockets.sockets[socketId]) { - try { - this.server.sockets.sockets[socketId].removeAllListeners(); - this.server.sockets.sockets[socketId].disconnect(true); - } catch (err) { - debug('error cleaning up socket when going offline', err); - } - delete this.server.sockets.sockets[socketId]; - } - - delete this._clientSendFns[clientId]; - return true; + debug(`${clientId} is disconnected because it was marked as offline`); + this.emit(`client:${i.ClientState.Disconnected}`, eventMsg); + } + } - default: - return false; + if (state === i.ClientState.Online) { + this.emit('client:reconnect', eventMsg); + } else { + this.emit(`client:${state}`, eventMsg); } + + return true; } protected ensureClient(socket: SocketIO.Socket) : i.ConnectedClient { - const { clientId, clientType } = this.getClientMetadataFromSocket(socket); + const { clientId } = this.getClientMetadataFromSocket(socket); const client = this._clients[clientId]; if (client) { - this.updateClientState(clientId, { - state: i.ClientState.Online, - socketId: socket.id, - }); + this.updateClientState(clientId, i.ClientState.Online); return client; } const newClient: i.ConnectedClient = { clientId, - clientType, state: i.ClientState.Online, - createdAt: new Date(), - updatedAt: new Date(), - offlineAt: null, - socketId: socket.id + offlineAt: null }; - this.emit(`client:${i.ClientState.Online}`, { clientId, payload: {} }); + this.emit(`client:${i.ClientState.Online}`, { scope: clientId, payload: {} }); this._clients[clientId] = newClient; return newClient; @@ -478,55 +421,47 @@ export class Server extends Core { const client = this.ensureClient(socket); const { clientId } = client; - this._clientSendFns[clientId] = (message: i.Message) => { - socket.emit(message.eventName, message, this._sendCallbackFn); - }; - socket.on('error', (error: Error|string) => { this.emit('client:error', { - clientId, + scope: clientId, payload: {}, error }); }); socket.on('disconnect', (error: Error|string) => { + debug(`client ${clientId} disconnected with error`, error); + socket.removeAllListeners(); + socket.disconnect(true); + if (this.isShuttingDown) { - this.updateClientState(clientId, { - state: i.ClientState.Shutdown, - error, - }); + this.updateClientState(clientId, i.ClientState.Shutdown); } else { - this.updateClientState(clientId, { - state: i.ClientState.Disconnected, - error, - }); + this.updateClientState(clientId, i.ClientState.Disconnected); } }); - socket.on(`client:${i.ClientState.Available}`, this.handleResponse(`client:${i.ClientState.Available}`, (msg: i.Message) => { - this.updateClientState(clientId, { - state: i.ClientState.Available, - payload: msg.payload, - }); - })); + this.handleResponse(socket, `client:${i.ClientState.Available}`, () => { + this.updateClientState(clientId, i.ClientState.Available); + }); - socket.on(`client:${i.ClientState.Unavailable}`, this.handleResponse(`client:${i.ClientState.Unavailable}`, (msg: i.Message) => { - this.updateClientState(clientId, { - state: i.ClientState.Unavailable, - payload: msg.payload, - }); - })); + this.handleResponse(socket, `client:${i.ClientState.Unavailable}`, () => { + this.updateClientState(clientId, i.ClientState.Unavailable); + }); + + this.handleResponse(socket, `client:${i.ClientState.Shutdown}`, () => { + this.updateClientState(clientId, i.ClientState.Shutdown); + }); - socket.on(`client:${i.ClientState.Shutdown}`, this.handleResponse(`client:${i.ClientState.Shutdown}`, (msg: i.Message) => { - this.updateClientState(clientId, { - state: i.ClientState.Shutdown, - payload: msg.payload, + socket.on('message:response', (msg: i.Message) => { + this.emit(msg.id, { + scope: msg.from, + payload: msg, }); - })); + }); this.emit('connection', { - clientId, + scope: clientId, payload: socket }); } diff --git a/packages/teraslice-messaging/test/execution-controller-spec.ts b/packages/teraslice-messaging/test/execution-controller-spec.ts index 97eeb6ca5f5..bcef517d4ab 100644 --- a/packages/teraslice-messaging/test/execution-controller-spec.ts +++ b/packages/teraslice-messaging/test/execution-controller-spec.ts @@ -112,7 +112,11 @@ describe('ExecutionController', () => { }); it('should have no active workers', () => { - expect(server.activeWorkers).toBeArrayOfSize(0); + expect(server.activeWorkerCount).toBe(0); + }); + + it('should have a worker queue size of 0', () => { + expect(server.workerQueueSize).toBe(0); }); it('should not call client.onExecutionFinished', () => { @@ -228,9 +232,7 @@ describe('ExecutionController', () => { describe('when receiving execution:slice:new', () => { describe('when the client is set as available', () => { it('should resolve with correct messages', async () => { - if (!client.available) { - await client.sendAvailable(); - } + await client.sendAvailable(); const newSlice = { slicer_order: 0, @@ -260,7 +262,7 @@ describe('ExecutionController', () => { expect(dispatched).toBeTrue(); - expect(server.activeWorkers).toBeArrayOfSize(1); + expect(server.activeWorkerCount).toBe(1); await client.sendSliceComplete({ slice: newSlice, @@ -273,15 +275,13 @@ describe('ExecutionController', () => { await bluebird.delay(100); - expect(server.activeWorkers).toBeArrayOfSize(0); + expect(server.activeWorkerCount).toBe(0); }); }); describe('when the client is set as unavailable', () => { beforeAll(async () => { - if (client.available) { - await client.sendUnavailable(); - } + await client.sendUnavailable(); }); it('should reject with the correct error messages', () => { diff --git a/packages/teraslice-messaging/test/messenger-spec.ts b/packages/teraslice-messaging/test/messenger-spec.ts index 66f297fe921..24f05ec1143 100644 --- a/packages/teraslice-messaging/test/messenger-spec.ts +++ b/packages/teraslice-messaging/test/messenger-spec.ts @@ -209,7 +209,8 @@ describe('Messenger', () => { actionTimeout: 1000, pingTimeout: 3000, pingInterval: 1000, - clientDisconnectTimeout: 4000, + serverTimeout: 2000, + clientDisconnectTimeout: 50, serverName: 'example' }); @@ -235,8 +236,8 @@ describe('Messenger', () => { socketOptions: { reconnection: true, reconnectionAttempts: 10, - reconnectionDelay: 500, - reconnectionDelayMax: 500 + reconnectionDelay: 10, + reconnectionDelayMax: 50 }, }); @@ -250,7 +251,10 @@ describe('Messenger', () => { afterAll((done) => { server.onClientShutdown(() => { server.shutdown() - .then(() => { done(); }) + .then(() => { + server.shutdown(); + done(); + }) .catch(fail); }); client.shutdown().catch(fail); @@ -303,34 +307,69 @@ describe('Messenger', () => { expect(clientErrorFn).not.toHaveBeenCalled(); }); + it('should be able to handle getTimeoutWithMax', () => { + expect(server.getTimeoutWithMax(100)).toBe(100); + expect(server.getTimeoutWithMax(20000)).toBe(1000); + }); + + it('should be able to handle waitForClientReady', async () => { + try { + await server.waitForClientReady('hello', 100); + } catch (err) { + expect(err.message).toEqual('Client hello is not ready'); + } + + const clientReady = await server.waitForClientReady(client.clientId, 100); + expect(clientReady).toBeTrue(); + + const serverReady = await client.waitForClientReady(client.serverName, 100); + expect(serverReady).toBeTrue(); + }); + describe('when testing onceWithTimeout', () => { - it('should be able to handle timeouts', () => { + it('should be able to handle timeouts', async () => { + expect(server.listenerCount('timeout:event')).toBe(0); + const once = server.onceWithTimeout('timeout:event', 500); - return expect(once).resolves.toBeUndefined(); + expect(server.listenerCount('timeout:event')).toBe(1); + const msg = await once; + + expect(msg).toBeUndefined(); + + expect(server.listenerCount('timeout:event')).toBe(0); }); - it('should be able to handle timeouts when given a specific clientId', () => { - const once = server.onceWithTimeout('timeout:event', clientId, 500); + it('should be able to handle timeouts when given a specific scope', () => { + const once = server.onceWithTimeout(`timeout:event:${clientId}`, 500); return expect(once).resolves.toBeUndefined(); }); it('should be able to resolve the message', async () => { + expect(server.listenerCount('success:event')).toBe(0); + const once = server.onceWithTimeout('success:event', 500); + expect(server.listenerCount('success:event')).toBe(1); + await server.emit('success:event', { - clientId, + scope: clientId, payload: { hello: true } }); - return expect(once).resolves.toEqual({ + + const msg = await once; + + expect(server.listenerCount('success:event')).toBe(0); + + expect(msg).toEqual({ hello: true }); }); - it('should be able to resolve the message when given a specific clientId', () => { - const once = server.onceWithTimeout('success:event', clientId, 500); + it('should be able to resolve the message when given a specific scope', () => { + const once = server.onceWithTimeout(`success:event:${clientId}`, 500); server.emit('success:event', { - clientId, + scope: clientId, payload: { hello: true } @@ -344,10 +383,12 @@ describe('Messenger', () => { describe('when waiting for message that will never come', () => { it('should throw a timeout error', async () => { expect.hasAssertions(); + // @ts-ignore - server.server.to(clientId).on('hello', server.handleResponse(async () => { + server.handleResponse(server.server.to(clientId), 'hello', async () => { await bluebird.delay(1000); - })); + }); + try { // @ts-ignore await client.send('hello', {}, { @@ -382,9 +423,9 @@ describe('Messenger', () => { beforeAll(async () => { // @ts-ignore - client.socket.once('failure:message', (msg: Message, cb: Function) => { + client.socket.once('failure:message', (msg: Message) => { msg.error = 'this should fail'; - cb(msg); + client.socket.emit('message:response', msg); }); try { // @ts-ignore @@ -401,7 +442,7 @@ describe('Messenger', () => { }); }); - xdescribe('when testing reconnect', () => { + describe('when testing reconnect', () => { it('should call server.onClientReconnect', (done) => { server.onClientReconnect(() => { done(); diff --git a/packages/teraslice-messaging/tsconfig.build.json b/packages/teraslice-messaging/tsconfig.build.json index 97b0c4f0a78..90d9fc7988a 100644 --- a/packages/teraslice-messaging/tsconfig.build.json +++ b/packages/teraslice-messaging/tsconfig.build.json @@ -1,6 +1,7 @@ { "extends": "./tsconfig", "compilerOptions": { + "rootDir": "src", "paths": { "@terascope/*": [ "../*", diff --git a/packages/teraslice-messaging/tsconfig.json b/packages/teraslice-messaging/tsconfig.json index cef70012c75..fd1a35f48ec 100644 --- a/packages/teraslice-messaging/tsconfig.json +++ b/packages/teraslice-messaging/tsconfig.json @@ -2,12 +2,11 @@ "extends": "../../tsconfig", "compilerOptions": { "baseUrl": ".", - "rootDir": "src", "outDir": "dist" }, "references": [ { - "path": "../teraslice-types/src" + "path": "../teraslice-types" } ] } diff --git a/packages/teraslice-types/package.json b/packages/teraslice-types/package.json index c7111407394..37364ed33f4 100644 --- a/packages/teraslice-types/package.json +++ b/packages/teraslice-types/package.json @@ -36,26 +36,26 @@ "test:debug": "env DEBUG='*teraslice*' jest --detectOpenHandles --coverage=false --runInBand" }, "dependencies": { - "@types/bunyan": "^1.8.4", + "@types/bunyan": "^1.8.5", "@types/convict": "^4.2.0", - "@types/lodash": "^4.14.116", - "@types/node": "^10.11.4", + "@types/lodash": "^4.14.117", + "@types/node": "^10.11.7", "bunyan": "^1.8.12", "convict": "^4.4.0", "debugnyan": "^2.0.2", "lodash": "^4.17.11" }, "devDependencies": { - "@types/jest": "^23.3.3", + "@types/jest": "^23.3.5", "babel-core": "^6.0.0", "babel-jest": "^23.6.0", "jest": "^23.6.0", "jest-extended": "^0.11.0", "rimraf": "^2.0.0", - "ts-jest": "^23.10.3", + "ts-jest": "^23.10.4", "tslint": "^5.0.0", "tslint-config-airbnb": "^5.11.0", - "typescript": "^3.1.1" + "typescript": "^3.1.2" }, "engines": { "node": ">=8.0.0" diff --git a/packages/teraslice/cluster-service.js b/packages/teraslice/cluster-service.js index 730ab10ea84..c9890c96229 100644 --- a/packages/teraslice/cluster-service.js +++ b/packages/teraslice/cluster-service.js @@ -6,16 +6,12 @@ const Promise = require('bluebird'); const get = require('lodash/get'); const { shutdownHandler } = require('./lib/workers/helpers/worker-shutdown'); const makeTerafoundationContext = require('./lib/workers/context/terafoundation-context'); +const makeClusterMaster = require('./lib/cluster/cluster_master'); +const makeAssetService = require('./lib/cluster/services/assets'); class Service { - constructor() { - this.context = makeTerafoundationContext(); - - this.shutdownHandler = shutdownHandler(this.context, () => { - if (!this.instance) return Promise.resolve(); - return this.instance.shutdown(); - }); - + constructor(context) { + this.context = context; this.logger = this.context.logger; this.shutdownTimeout = get(this.context, 'sysconfig.teraslice.shutdown_timeout', 60 * 1000); } @@ -26,10 +22,10 @@ class Service { if (assignment === 'cluster_master') { // require this here so node doesn't have load extra code into memory - this.instance = require('./lib/cluster/cluster_master')(this.context); + this.instance = makeClusterMaster(this.context); } else if (assignment === 'assets_service') { // require this here so node doesn't have load extra code into memory - this.instance = require('./lib/cluster/services/assets')(this.context); + this.instance = makeAssetService(this.context); } await this.instance.initialize(); @@ -49,7 +45,14 @@ class Service { } } -const cmd = new Service(); +const context = makeTerafoundationContext(); +const cmd = new Service(context); + +cmd.shutdownHandler = shutdownHandler(context, () => { + if (!cmd.instance) return Promise.resolve(); + return cmd.instance.shutdown(); +}); + Promise.resolve() .then(() => cmd.initialize()) .then(() => cmd.run()) diff --git a/packages/teraslice/lib/cluster/storage/backends/elasticsearch_store.js b/packages/teraslice/lib/cluster/storage/backends/elasticsearch_store.js index 98e6527ef26..f10f9dcd601 100644 --- a/packages/teraslice/lib/cluster/storage/backends/elasticsearch_store.js +++ b/packages/teraslice/lib/cluster/storage/backends/elasticsearch_store.js @@ -2,7 +2,6 @@ const fs = require('fs'); const _ = require('lodash'); -const shortid = require('shortid'); const parseError = require('@terascope/error-parser'); const elasticsearchApi = require('@terascope/elasticsearch-api'); const { getClient } = require('@terascope/job-components'); @@ -14,7 +13,7 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS let elasticsearch; let client; let flushInterval; - let destroyFns = {}; + let isShutdown = false; // Buffer to build up bulk requests. let bulkQueue = []; @@ -186,14 +185,15 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS if (forceShutdown !== true) { return _flush(); } + return new Promise((resolve) => { logger.trace(`attempting to shutdown, will destroy in ${config.shutdown_timeout}`); const _destroy = () => { logger.trace(`shutdown store, took ${Date.now() - startTime}ms`); - bulkQueue.length = 0; - Object.values(destroyFns).forEach((fn) => { fn(); }); - destroyFns = {}; + bulkQueue.length = []; + isShutdown = true; + resolve(); }; const timeout = setTimeout(_destroy, config.shutdown_timeout).unref(); @@ -241,18 +241,21 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS function isAvailable(indexArg) { const query = { index: indexArg || indexName, q: '*' }; - return new Promise(((resolve, reject) => { + return new Promise(((resolve) => { elasticsearch.search(query) .then((results) => { logger.trace(`index ${indexName} is now available`); resolve(results); }) .catch(() => { - const id = shortid.generate(); const isReady = setInterval(() => { + if (isShutdown) { + clearInterval(isReady); + return; + } + elasticsearch.search(query) .then((results) => { - destroyFns = _.omit(destroyFns, id); clearInterval(isReady); resolve(results); }) @@ -260,11 +263,6 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS logger.warn('verifying job index is open'); }); }, 200); - - destroyFns[id] = () => { - clearInterval(isReady); - reject(new Error('Elastic search force shutting down')); - }; }); })); } @@ -359,7 +357,7 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS newIndex = timeseriesIndex(timeseriesFormat, indexName.slice(0, nameSize)).index; } - return new Promise(((resolve, reject) => { + return new Promise(((resolve) => { const clientName = JSON.stringify(config.state); client = getClient(context, config.state, 'elasticsearch'); let options = null; @@ -377,8 +375,12 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS logger.error(errMsg); logger.error(`Error created job index: ${errMsg}`); logger.info(`Attempting to connect to elasticsearch: ${clientName}`); - const id = shortid.generate(); const checking = setInterval(() => { + if (isShutdown) { + clearInterval(checking); + return; + } + _createIndex(newIndex) .then(() => { const query = { index: newIndex }; @@ -396,7 +398,6 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS } if (bool) { - destroyFns = _.omit(destroyFns, id); clearInterval(checking); logger.info('connection to elasticsearch has been established'); return isAvailable(newIndex) @@ -411,11 +412,6 @@ module.exports = function module(context, indexName, recordType, idField, _bulkS logger.info(`Attempting to connect to elasticsearch: ${clientName}, error: ${checkingErrMsg}`); }); }, 3000); - - destroyFns[id] = () => { - clearInterval(checking); - reject(new Error('Elastic search force shutting down')); - }; }); })); }; diff --git a/packages/teraslice/lib/processors/delay.js b/packages/teraslice/lib/processors/delay.js index 277dd88e44e..154e000b731 100644 --- a/packages/teraslice/lib/processors/delay.js +++ b/packages/teraslice/lib/processors/delay.js @@ -5,7 +5,7 @@ const Promise = require('bluebird'); function newProcessor(context, opConfig, executionConfig) { - return data => Promise.delay(opConfig.delay).then(() => data); + return data => Promise.delay(opConfig.ms).then(() => data); } function schema() { diff --git a/packages/teraslice/lib/readers/elasticsearch_data_generator.js b/packages/teraslice/lib/readers/elasticsearch_data_generator.js index 8f6d962d9ee..1bd557e4081 100644 --- a/packages/teraslice/lib/readers/elasticsearch_data_generator.js +++ b/packages/teraslice/lib/readers/elasticsearch_data_generator.js @@ -3,9 +3,7 @@ const Promise = require('bluebird'); const parseError = require('@terascope/error-parser'); const { getOpConfig } = require('@terascope/job-components'); -const mocker = require('mocker-data-generator').default; -const defaultSchema = require('../utils/data_utils'); -const { existsSync } = require('../utils/file_utils'); +const { pathExistsSync } = require('fs-extra'); function parsedSchema(opConfig) { let dataSchema = false; @@ -15,7 +13,7 @@ function parsedSchema(opConfig) { const nextPath = `${process.cwd()}/${opConfig.json_schema}`; try { - if (existsSync(firstPath)) { + if (pathExistsSync(firstPath)) { dataSchema = require(firstPath); } else { dataSchema = require(nextPath); @@ -25,11 +23,13 @@ function parsedSchema(opConfig) { throw new Error(`Could not retrieve code for: ${opConfig}\n${e}`); } } else { - return defaultSchema(opConfig, dataSchema); + return require('../utils/data_utils')(opConfig, dataSchema); } } function newReader(context, opConfig) { + const mocker = require('mocker-data-generator').default; + const dataSchema = parsedSchema(opConfig); return function _newReader(msg) { if (opConfig.stress_test) { diff --git a/packages/teraslice/lib/workers/context/execution-context.js b/packages/teraslice/lib/workers/context/execution-context.js index 8685a415ec1..79e1368bae9 100644 --- a/packages/teraslice/lib/workers/context/execution-context.js +++ b/packages/teraslice/lib/workers/context/execution-context.js @@ -4,12 +4,12 @@ const Promise = require('bluebird'); const _ = require('lodash'); const { OperationLoader, registerApis } = require('@terascope/job-components'); const { terasliceOpPath } = require('../../config'); -const loadAssets = require('../assets/loader'); +const spawnAssetLoader = require('../assets/spawn'); const { makeLogger } = require('../helpers/terafoundation'); const { analyzeOp } = require('../helpers/op-analytics'); -class ExectionContext { - constructor(context, executionConfig) { +class ExecutionContext { + constructor(context, _executionConfig) { if (_.get(context, 'sysconfig.teraslice.reporter')) { throw new Error('reporters are not functional at this time, please do not set one in the configuration'); } @@ -19,65 +19,64 @@ class ExectionContext { assetPath: _.get(context, 'sysconfig.teraslice.assets_directory'), }); - registerApis(context, executionConfig.job); + const executionConfig = _.cloneDeep(_executionConfig); + + if (executionConfig.config == null) { + executionConfig.config = Object.assign({}, executionConfig.job, { + ex_id: executionConfig.ex_id, + job_id: executionConfig.job_id, + }); + delete executionConfig.job; + } + + registerApis(context, executionConfig.config); this._logger = makeLogger(context, executionConfig, 'execution_context'); this._context = context; - Object.assign(this, executionConfig); - - this.config = executionConfig.job; - this.config.ex_id = executionConfig.ex_id; - this.config.job_id = executionConfig.job_id; - this.queue = []; - this.reader = null; - this.slicer = null; - this.reporter = null; - this.queueLength = 10000; - this.dynamicQueueLength = false; + this.executionContext = Object.assign({}, executionConfig, { + assetIds: [], + queue: [], + reader: null, + slicer: null, + reporter: null, + queueLength: 10000, + dynamicQueueLength: false, + }); } async initialize() { - const assets = _.get(this.config, 'assets', []); - this.assetIds = await loadAssets(this._context, assets); + const assets = _.get(this.executionContext.config, 'assets', []); + this.executionContext.assetIds = await spawnAssetLoader(assets); - if (this.assignment === 'worker') { + if (this.executionContext.assignment === 'worker') { await this._initializeOperations(); } - if (this.assignment === 'execution_controller') { + if (this.executionContext.assignment === 'execution_controller') { await this._initializeSlicer(); } - // cleanup private stuff to keep memory footprint small - delete this._context; - delete this._initializeSlicer; - delete this._initializeOperations; - delete this._initializeOperations; - delete this._loadOperation; - delete this._logger; - delete this._opLoader; - delete this._setQueueLength; - return this; + return this.executionContext; } async _initializeSlicer() { - const opConfig = _.get(this.config, 'operations[0]'); + const opConfig = _.get(this.executionContext.config, 'operations[0]'); if (!opConfig) { throw new Error('Invalid configuration for operation'); } - this.slicer = await this._loadOperation(opConfig._op); + this.executionContext.slicer = await this._loadOperation(opConfig._op); await this._setQueueLength(); } async _initializeOperations() { const context = this._context; - const { config } = this; + const { config } = this.executionContext; - const operations = _.get(this.config, 'operations', []); - this.queue = await Promise.map(operations, async (opConfig, index) => { + const operations = _.get(this.executionContext.config, 'operations', []); + this.executionContext.queue = await Promise.map(operations, async (opConfig, index) => { const op = await this._loadOperation(opConfig._op); const args = [context, opConfig, config]; const opFn = !index ? await op.newReader(...args) : await op.newProcessor(...args); @@ -87,35 +86,35 @@ class ExectionContext { return analyzeOp(opFn, index); }); - this.reader = _.first(this.queue); + this.executionContext.reader = _.first(this.executionContext.queue); } async _loadOperation(opName) { - return this._opLoader.load(opName, this.assetIds); + return this._opLoader.load(opName, this.executionContext.assetIds); } async _setQueueLength() { - const { slicer } = this; + const { slicer } = this.executionContext; if (!slicer.slicerQueueLength) return; if (!_.isFunction(slicer.slicerQueueLength)) { - this._logger.error(`slicerQueueLength on the reader must be a function, defaulting to ${this.queueLength}`); + this._logger.error(`slicerQueueLength on the reader must be a function, defaulting to ${this.executionContext.queueLength}`); return; } - const results = await slicer.slicerQueueLength(this); + const results = await slicer.slicerQueueLength(this.executionContext); if (results === 'QUEUE_MINIMUM_SIZE') { - this.dynamicQueueLength = true; - this.queueLength = this.config.workers; + this.executionContext.dynamicQueueLength = true; + this.executionContext.queueLength = this.executionContext.config.workers; } else if (_.isNumber(results) && results >= 1) { - this.queueLength = results; + this.executionContext.queueLength = results; } - const isDyanmic = this.dynamicQueueLength ? ' and is dynamic' : ''; + const isDyanmic = this.executionContext.dynamicQueueLength ? ' and is dynamic' : ''; - this._logger.info(`Setting slicer queue length to ${this.queueLength}${isDyanmic}`); + this._logger.info(`Setting slicer queue length to ${this.executionContext.queueLength}${isDyanmic}`); } } -module.exports = ExectionContext; +module.exports = ExecutionContext; diff --git a/packages/teraslice/lib/workers/context/terafoundation-context.js b/packages/teraslice/lib/workers/context/terafoundation-context.js index bb1b3550bbd..fc27e25513b 100644 --- a/packages/teraslice/lib/workers/context/terafoundation-context.js +++ b/packages/teraslice/lib/workers/context/terafoundation-context.js @@ -37,8 +37,8 @@ function makeContext(cluster, config, sysconfig) { // Initialize the API registerApis(context); - delete context.apis.foundation.startWorkers; - delete context.foundation.startWorkers; + context.apis.foundation.startWorkers = () => {}; + context.foundation.startWorkers = () => {}; const events = new EventEmitter(); context.apis.foundation.getSystemEvents = () => events; @@ -66,6 +66,7 @@ function getSysConfig() { .alias('v', 'version') .help() .alias('h', 'help') + .detectLocale(false) .option('c', { alias: 'configfile', describe: `Terafoundation configuration file to load. diff --git a/packages/teraslice/lib/workers/execution-controller/analytics.js b/packages/teraslice/lib/workers/execution-controller/execution-analytics.js similarity index 84% rename from packages/teraslice/lib/workers/execution-controller/analytics.js rename to packages/teraslice/lib/workers/execution-controller/execution-analytics.js index 51d25503f46..e0a2788bc62 100644 --- a/packages/teraslice/lib/workers/execution-controller/analytics.js +++ b/packages/teraslice/lib/workers/execution-controller/execution-analytics.js @@ -12,16 +12,7 @@ class ExecutionAnalytics { this.client = client; this.analyticsRate = _.get(context, 'sysconfig.teraslice.analytics_rate'); this._handlers = {}; - - this._registerHandlers(); - } - - start() { - const { - ex_id: exId, - job_id: jobId, - } = this.executionContext; - const { name } = this.executionContext.job; + this._pushing = false; this.executionAnalytics = { workers_available: 0, @@ -37,7 +28,8 @@ class ExecutionAnalytics { processed: 0, slicers: 0, subslice_by_key: 0, - started: newFormattedDate(), + started: '', + queuing_complete: '' }; this.pushedAnalytics = { @@ -50,20 +42,32 @@ class ExecutionAnalytics { workers_reconnected: 0 }; + this._registerHandlers(); + + this.isRunning = false; + this.isShutdown = false; + } + + start() { + const { + ex_id: exId, + job_id: jobId, + } = this.executionContext; + const { name } = this.executionContext.config; + + this.set('started', newFormattedDate()); + this.client.onExecutionAnalytics(() => ({ name, ex_id: exId, job_id: jobId, - stats: this.executionAnalytics + stats: this.getAnalytics() })); - this.sendingAnalytics = true; + this.isRunning = true; - this.intervalId = setInterval(() => { - if (!this.sendingAnalytics) { - clearInterval(this.intervalId); - return; - } + this.analyticsInterval = setInterval(() => { + if (!this.isRunning) return; this._pushAnalytics(); }, this.analyticsRate); @@ -87,15 +91,18 @@ class ExecutionAnalytics { } getAnalytics() { - return _.cloneDeep(this.executionAnalytics); + return _.clone(this.executionAnalytics); } async shutdown(timeout) { - this.sendingAnalytics = false; - clearInterval(this.intervalId); + this.isRunning = false; + this.isShutdown = true; + + clearInterval(this.analyticsInterval); _.forEach(this._handlers, (handler, event) => { this.events.removeListener(event, handler); + this._handlers[event] = null; }); await this._pushAnalytics(timeout); @@ -105,13 +112,15 @@ class ExecutionAnalytics { if (this._pushing) return; this._pushing = true; + const analytics = this.getAnalytics(); + // save a copy of what we push so we can emit diffs const diffs = {}; const copy = {}; _.forOwn(this.pushedAnalytics, (value, field) => { - diffs[field] = this.executionAnalytics[field] - value; - copy[field] = this.executionAnalytics[field]; + diffs[field] = analytics[field] - value; + copy[field] = analytics[field]; }); const response = await this.client.sendClusterAnalytics(diffs, timeout); @@ -119,7 +128,7 @@ class ExecutionAnalytics { this._pushing = false; - if (!recorded && this.sendingAnalytics) { + if (!recorded && this.isRunning) { this.logger.warn('cluster master did not record the cluster analytics'); return; } diff --git a/packages/teraslice/lib/workers/execution-controller/index.js b/packages/teraslice/lib/workers/execution-controller/index.js index c5672af970c..870965c896e 100644 --- a/packages/teraslice/lib/workers/execution-controller/index.js +++ b/packages/teraslice/lib/workers/execution-controller/index.js @@ -8,7 +8,7 @@ const parseError = require('@terascope/error-parser'); const Messaging = require('@terascope/teraslice-messaging'); const Scheduler = require('./scheduler'); -const ExecutionAnalytics = require('./analytics'); +const ExecutionAnalytics = require('./execution-analytics'); const makeExecutionRecovery = require('./recovery'); const makeSliceAnalytics = require('./slice-analytics'); const { waitForWorkerShutdown } = require('../helpers/worker-shutdown'); @@ -84,7 +84,6 @@ class ExecutionController { this.workersHaveConnected = false; this._handlers = {}; - this.setFailingStatus = this.setFailingStatus.bind(this); this.terminalError = this.terminalError.bind(this); } @@ -113,36 +112,41 @@ class ExecutionController { return; } - const stateStore = makeStateStore(context); - this.stores.stateStore = await stateStore; + const stateStore = await makeStateStore(context); + this.stores.stateStore = stateStore; + // attach to scheduler + this.scheduler.stateStore = stateStore; await this.server.start(); this.isInitialized = true; this.server.onClientOnline((workerId) => { + clearTimeout(this.workerConnectTimeoutId); + this.workerConnectTimeoutId = null; + this.logger.trace(`worker ${workerId} is online`); - this._adjustSlicerQueueLength(); this.workersHaveConnected = true; - clearTimeout(this.workerConnectTimeoutId); this.executionAnalytics.increment('workers_joined'); + + this._adjustSlicerQueueLength(); }); this.server.onClientAvailable((workerId) => { this.logger.trace(`worker ${workerId} is available`); + this.executionAnalytics.set('workers_active', this.server.activeWorkerCount); this.executionAnalytics.set('workers_available', this.server.availableClientCount); - this.executionAnalytics.set('workers_active', this.server.activeWorkers.length); }); this.server.onClientUnavailable(() => { - this.executionAnalytics.set('workers_active', this.server.activeWorkers.length); + this.executionAnalytics.set('workers_active', this.server.activeWorkerCount); this.executionAnalytics.set('workers_available', this.server.availableClientCount); }); this.server.onClientDisconnect((workerId) => { this.logger.trace(`worker ${workerId} is disconnected but it may reconnect`); this.executionAnalytics.increment('workers_disconnected'); - this.executionAnalytics.set('workers_active', this.server.activeWorkers.length); + this.executionAnalytics.set('workers_active', this.server.activeWorkerCount); this._startWorkerDisconnectWatchDog(); }); @@ -154,8 +158,9 @@ class ExecutionController { this.server.onClientReconnect((workerId) => { clearTimeout(this.workerDisconnectTimeoutId); - this.logger.trace(`worker ${workerId} is reconnected`); + this.workerConnectTimeoutId = null; + this.logger.trace(`worker ${workerId} is reconnected`); this.executionAnalytics.increment('workers_reconnected'); }); @@ -168,26 +173,29 @@ class ExecutionController { }); this.server.onSliceSuccess((workerId, response) => { - this.logger.info(`worker ${workerId} has completed its slice`, response); - this.events.emit('slice:success', response); - this.pendingSlices -= 1; + process.nextTick(() => { + this.logger.info(`worker ${workerId} has completed its slice ${response.slice_id}`); + this.events.emit('slice:success', response); + this.pendingSlices -= 1; + }); }); this.server.onSliceFailure((workerId, response) => { - this.logger.error(`worker: ${workerId} has failure completing its slice`, response); - this.events.emit('slice:failure', response); - - if (this.scheduler.canComplete()) { - this.setFailingStatus(); - } else if (this.scheduler.isRecovering()) { - this.terminalError(new Error('Slice failed while recovering')); - } else { - // in persistent mode we set watchdogs to monitor - // when failing can be set back to running - this._checkAndUpdateExecutionState(); - } - - this.pendingSlices -= 1; + process.nextTick(() => { + this.logger.error(`worker: ${workerId} has failure completing its slice`, response); + this.events.emit('slice:failure', response); + + if (this.scheduler.canComplete()) { + this.setFailingStatus(); + } else if (this.scheduler.isRecovering()) { + this.terminalError(new Error('Slice failed while recovering')); + } else { + // in persistent mode we set watchdogs to monitor + // when failing can be set back to running + this._checkAndUpdateExecutionState(); + } + this.pendingSlices -= 1; + }); }); this._handlers['slicer:execution:update'] = ({ update }) => { @@ -209,21 +217,19 @@ class ExecutionController { this.events.on(event, handler); }); + if (this.collectAnalytics) { + this.slicerAnalytics = makeSliceAnalytics(this.context, this.executionContext); + } + this.logger.debug(`execution ${this.exId} is initialized`); this.isInitialized = true; } async run() { - if (this.isShuttingDown) { - this.logger.error('Cannot run execution while shutting down'); - return; - } + this._startWorkConnectWatchDog(); - if (!this.isInitialized) { - this.logger.error('Cannot run execution is not initialized'); - return; - } + this.executionAnalytics.start(); try { await this._runExecution(); @@ -327,6 +333,7 @@ class ExecutionController { // remove any listeners _.forEach(this._handlers, (handler, event) => { this.events.removeListener(event, handler); + this._handlers[event] = null; }); this.isShuttingDown = true; @@ -338,10 +345,18 @@ class ExecutionController { await this._waitForExecutionFinished(); + if (this.collectAnalytics) { + try { + await this.slicerAnalytics.shutdown(); + } catch (err) { + shutdownErrs.push(err); + } + } + try { await this.executionAnalytics.shutdown(); } catch (err) { - this.logger.error('execution analytics error'); + shutdownErrs.push(err); } this.scheduler.cleanup(); @@ -397,16 +412,9 @@ class ExecutionController { } async _runExecution() { - this._startWorkConnectWatchDog(); - - this.slicerAnalytics = makeSliceAnalytics(this.context, this.executionContext); - - this.executionAnalytics.start(); - this.logger.info(`starting execution ${this.exId}...`); this.startTime = Date.now(); - this.isStarted = true; if (this.scheduler.recoverExecution) { @@ -440,100 +448,115 @@ class ExecutionController { } } + // dispatching should be pushed out into its own module async _runDispatch() { this.isDoneDispatching = false; - this.logger.debug('dispatching slices...'); + let dispatchInterval; + // returns a boolean to indicate whether + // dispatching should continue const isRunning = () => { if (this.isShuttingDown) return false; if (this.isExecutionDone) return false; - if (this.scheduler.isFinished && !this.pendingDispatches) return false; + if (this.scheduler.isFinished + && !this.pendingDispatches) return false; return true; }; + const isPaused = () => this.isPaused; const canDispatch = () => { - if (this.isPaused) return false; - const workers = this.server.workerQueueSize; const slices = this.scheduler.queueLength; + return workers > 0 && slices > 0; }; - const dispatch = _.throttle(() => { - if (!isRunning()) return; - if (!canDispatch()) return; - - this._dispatchSlices(); - }, 100); - - const unsubscribe = this.server.on('client:available', dispatch); - - await pWhilst(isRunning, () => { - dispatch(); - return Promise.delay(100); - }); - - unsubscribe(); - - this.isDoneDispatching = true; - } - - _dispatchSlices() { - if (this.isPaused) return; - - const slices = this.scheduler.getSlices(this.server.workerQueueSize); + const dequeueAndDispatch = () => { + const reenqueue = []; + const dispatch = []; + + const slices = this.scheduler.getSlices(this.server.workerQueueSize); + slices.forEach((slice) => { + const workerId = this.server.dequeueWorker(slice); + if (!workerId) { + reenqueue.push(slice); + } else { + this.pendingDispatches += 1; + this.pendingSlices += 1; + dispatch.push({ slice, workerId }); + } + }); + slices.length = 0; + + if (dispatch.length > 0) { + process.nextTick(() => { + const promises = dispatch + .map((input) => { + const { slice, workerId } = input; + return this._dispatchSlice(slice, workerId); + }); + dispatch.length = 0; + + Promise.all(promises) + .catch(err => this.logger.error('failure to dispatch slices', err)); + }); + } - slices.forEach((slice) => { - const workerId = this.server.dequeueWorker(slice); - if (!workerId) { - this.scheduler.enqueueSlice(slice, true); - return; + if (reenqueue.length > 0) { + // this isn't really ideal since we adding + // to the beginning of the queue and + // it may end up in a recursive loop trying + // to process that slice + this.scheduler.enqueueSlices(reenqueue, true); + reenqueue.length = 0; } + }; - this.logger.trace(`dispatching slice ${slice.slice_id} for worker ${workerId}`); + await Promise.delay(0); - this._dispatchSlice(slice, workerId); - }); - } + await new Promise((resolve) => { + this.logger.debug('dispatching slices...'); - // if the _created is missing property is missing - // create the slice state. - // By not mutating the slice record we can improve the v8 - // performance - async _createSliceState(_slice) { - if (_slice._created) return _slice; + dispatchInterval = setInterval(() => { + if (!isRunning()) { + resolve(); + return; + } - const { createState } = this.stores.stateStore; + if (isPaused()) return; - const slice = Object.assign({ - _created: new Date().toISOString(), - }, _slice); + if (canDispatch()) { + dequeueAndDispatch(); + } + }, 5); + }); - await createState(this.exId, slice, 'start'); + clearInterval(dispatchInterval); - return slice; + this.isDoneDispatching = true; } - async _dispatchSlice(_slice, workerId) { - if (this.isShuttingDown) return; - - this.pendingDispatches += 1; - - const slice = await this._createSliceState(_slice); - const sliceId = slice.slice_id; - - const dispatched = await this.server.dispatchSlice(slice, workerId); - - if (dispatched) { - this.logger.debug(`dispatched slice ${sliceId} to worker ${workerId}`); - this.pendingSlices += 1; - } else { - this.logger.warn(`worker "${workerId}" is not available to process slice ${sliceId}`); - this.scheduler.enqueueSlice(slice, true); - } + _dispatchSlice(slice, workerId) { + this.logger.trace(`dispatching slice ${slice.slice_id} for worker ${workerId}`); + + return this.server.dispatchSlice(slice, workerId) + .then((dispatched) => { + if (dispatched) { + this.logger.debug(`dispatched slice ${slice.slice_id} to worker ${workerId}`); + } else { + this.logger.warn(`worker "${workerId}" is not available to process slice ${slice.slice_id}`); + this.scheduler.enqueueSlice(slice, true); + this.pendingSlices -= 1; + } - this.pendingDispatches -= 1; + this.pendingDispatches -= 1; + }) + .catch((err) => { + this.logger.error('error dispatching slice', err); + this.pendingDispatches -= 1; + this.pendingSlices -= 1; + }); } async _slicerInit() { diff --git a/packages/teraslice/lib/workers/execution-controller/recovery.js b/packages/teraslice/lib/workers/execution-controller/recovery.js index b08a6a73e5d..2cd79651af7 100644 --- a/packages/teraslice/lib/workers/execution-controller/recovery.js +++ b/packages/teraslice/lib/workers/execution-controller/recovery.js @@ -36,7 +36,7 @@ function recovery(context, executionFailed, stateStore, executionContext) { } function _sliceComplete(sliceData) { - delete retryState[sliceData.slice.slice_id]; + retryState[sliceData.slice.slice_id] = false; } function getSlicerStartingPosition() { @@ -66,7 +66,7 @@ function recovery(context, executionFailed, stateStore, executionContext) { } function _recoveryBatchCompleted() { - return Object.keys(retryState).length === 0; + return _.every(retryState, v => v === false); } function _retryState() { diff --git a/packages/teraslice/lib/workers/execution-controller/scheduler.js b/packages/teraslice/lib/workers/execution-controller/scheduler.js index b8ae113b98c..4ee96392995 100644 --- a/packages/teraslice/lib/workers/execution-controller/scheduler.js +++ b/packages/teraslice/lib/workers/execution-controller/scheduler.js @@ -3,7 +3,7 @@ const _ = require('lodash'); const uuidv4 = require('uuid/v4'); const Promise = require('bluebird'); -const autoBind = require('auto-bind'); +const pWhilst = require('p-whilst'); const Queue = require('@terascope/queue'); const { makeLogger } = require('../helpers/terafoundation'); @@ -17,17 +17,16 @@ class Scheduler { this.recovering = this.recoverExecution; this.slicers = []; + this._creating = 0; this.ready = false; this.paused = true; this.stopped = false; this.slicersDone = false; this.slicersFailed = false; - this._resolveRun = () => {}; - this._processCleanup = () => {}; - this.queue = new Queue(); - autoBind(this); + this._resolveRun = _.noop; + this._processCleanup = _.noop; this._processSlicers(); } @@ -45,6 +44,7 @@ class Scheduler { this.slicersDone = true; } this._resolveRun(); + this._resolveRun = _.noop; }; this._resolveRun = () => { @@ -61,7 +61,14 @@ class Scheduler { await promise; - this.logger.debug(`execution ${this.exId} is finished scheduling, ${this.queueLength} remaining slices in the queue`); + this.logger.debug(`execution ${this.exId} is finished scheduling, ${this.queueLength + this._creating} remaining slices in the queue`); + + const waitForCreating = () => { + const is = () => this._creating; + return pWhilst(is, () => Promise.delay(100)); + }; + + await waitForCreating(); } start() { @@ -77,7 +84,9 @@ class Scheduler { this.stopped = true; this._processCleanup(); + this._processCleanup = _.noop; this._resolveRun(); + this._resolveRun = _.noop; } pause() { @@ -89,11 +98,13 @@ class Scheduler { } get isQueueFull() { - return this.queueLength > this.executionContext.queueLength; + const maxLength = this.executionContext.queueLength; + return (this._creating + this.queueLength) > maxLength; } get isFinished() { - return (this.slicersDone || this.slicersFailed || this.stopped) && this.queueLength === 0; + const isDone = this.slicersDone || this.slicersFailed || this.stopped; + return isDone && !this.queueLength && !this._creating; } canAllocateSlice() { @@ -121,7 +132,9 @@ class Scheduler { this.logger.warn('execution recovery has been marked as completed'); this.slicersDone = true; this._processCleanup(); + this._processCleanup = _.noop; this._resolveRun(); + this._resolveRun = _.noop; return; } @@ -164,16 +177,14 @@ class Scheduler { this.queue.remove(slice.slice_id, 'slice_id'); }); - this.slicers.length = 0; - } + this.stateStore = null; - getSlice() { - const [slice] = this.getSlices(1); - return slice; + this.slicers.length = 0; } getSlices(limit = 1) { if (this.queue.size() === 0) return []; + if (limit < 1) return []; const slices = []; @@ -198,7 +209,7 @@ class Scheduler { enqueueSlices(slices, addToStart = false) { if (this.stopped) return; - _.forEach(slices, (slice) => { + slices.forEach((slice) => { if (this.queue.exists('slice_id', slice.slice_id)) { this.logger.warn(`slice ${slice.slice_id} has already been enqueued`); return; @@ -221,69 +232,45 @@ class Scheduler { events, logger, exId, - canAllocateSlice, - _runSlicer, } = this; const resetCleanup = () => { - this._processCleanup = () => {}; + this._processCleanup = _.noop; }; let slicersDone = 0; - let backupTimer; + let interval; const slicerCount = () => this.slicers.length; - const getSlicerIds = () => _.map(_.filter(this.slicers, { finished: false }), 'id'); + const getSlicers = () => this.slicers; - let pendingSlicers = []; + const pendingSlicers = new Set(); const getAllocationCount = () => { - if (!canAllocateSlice()) return 0; + if (!this.canAllocateSlice()) return 0; - const count = this.executionContext.queueLength - this.queueLength; - if (count > pendingSlicers.length) { - return pendingSlicers.length; + const count = this.executionContext.queueLength - this.queueLength - this._creating; + if (count > pendingSlicers.size) { + return pendingSlicers.size; } + if (count < 0) return 0; return count; }; - function runPendingSlicers() { - // make sure never a miss anything - clearTimeout(backupTimer); - backupTimer = setTimeout(() => { - runPendingSlicers(); - }, 100); - - if (!pendingSlicers.length) return; - - const count = getAllocationCount(); - if (!count) return; - - const slicersToRun = _.take(pendingSlicers, count); - pendingSlicers = _.without(pendingSlicers, ...slicersToRun); - - slicersToRun.forEach((id) => { - _runSlicer(id); - }); - } - - function onSlicersStart() { - runPendingSlicers(); - } - function onSlicersDone(slicerId) { - pendingSlicers = _.union(pendingSlicers, [slicerId]); - runPendingSlicers(); + if (pendingSlicers.has(slicerId)) return; + + pendingSlicers.add(slicerId); } function onSlicerFinished(slicerId) { slicersDone += 1; - logger.info(`a slicer ${slicerId} for execution: ${exId} has completed its range`); + logger.info(`slicer ${slicerId} for execution: ${exId} has completed its range`); if (slicersDone === slicerCount()) { - clearTimeout(backupTimer); + clearInterval(interval); logger.info(`all slicers for execution: ${exId} have been completed`); // before removing listeners make sure we've received all of the events @@ -295,7 +282,7 @@ class Scheduler { } function onSlicerFailure(err, slicerId) { - clearTimeout(backupTimer); + clearInterval(interval); logger.warn(`slicer ${slicerId} failed`, _.toString(err)); // before removing listeners make sure we've received all of the events @@ -308,37 +295,62 @@ class Scheduler { function onRegisteredSlicers(count) { logger.debug(`registered ${count} slicers`); - pendingSlicers = getSlicerIds(); + getSlicers() + .forEach(({ finished, id }) => { + if (finished) return; + if (pendingSlicers.has(id)) return; + + pendingSlicers.add(id); + }); } events.on('slicer:done', onSlicersDone); events.on('slicer:finished', onSlicerFinished); events.on('slicer:failure', onSlicerFailure); - events.on('slicers:start', onSlicersStart); - events.on('slicers:queued', runPendingSlicers); events.on('slicers:registered', onRegisteredSlicers); function cleanup() { - clearTimeout(backupTimer); + clearInterval(interval); - pendingSlicers = []; + pendingSlicers.clear(); events.removeListener('slicer:done', onSlicersDone); events.removeListener('slicer:finished', onSlicerFinished); events.removeListener('slicer:failure', onSlicerFailure); - events.removeListener('slicers:start', onSlicersStart); - events.removeListener('slicers:queued', runPendingSlicers); events.removeListener('slicers:registered', onRegisteredSlicers); resetCleanup(); } + // make sure never a miss anything + interval = setInterval(() => { + if (!pendingSlicers.size) return; + + const count = getAllocationCount(); + if (!count) return; + + let processed = 0; + const promises = []; + + // eslint-disable-next-line no-restricted-syntax + for (const id of pendingSlicers) { + processed += 1; + if (processed > count) break; + + pendingSlicers.delete(id); + promises.push(this._runSlicer(id)); + } + + Promise.all(promises) + .catch(err => this.logger.error('failure to run slicers', err)); + }, 5); + this._processCleanup = cleanup; } async _runSlicer(id) { - const slicer = _.find(this.slicers, { id }); + const slicer = this.slicers[id]; if (slicer.finished) return; this.logger.trace(`slicer ${slicer.id} is being called`); @@ -353,7 +365,7 @@ class Scheduler { this.events.emit('slicer:subslice'); } - const slices = _.map(_.castArray(result), (request) => { + const slices = _.castArray(result).map((request) => { slicer.order += 1; // recovery slices already have correct meta data @@ -369,7 +381,7 @@ class Scheduler { }; }); - this.enqueueSlices(slices); + this._createSlices(slices); } else if (this.canComplete() && !slicer.finished) { slicer.finished = true; this.logger.trace(`slicer ${slicer.id} finished`); @@ -383,6 +395,33 @@ class Scheduler { this.events.emit('slicer:done', slicer.id); } + + // In the case of recovery slices have already been + // created, so its important to skip this step + _ensureSliceState(slice) { + if (slice._created) return Promise.resolve(slice); + + slice._created = new Date().toISOString(); + + // this.stateStore is attached from the execution_controller + return this.stateStore.createState(this.exId, slice, 'start') + .then(() => slice); + } + + _createSlices(slices) { + this._creating += slices.length; + + process.nextTick(() => { + const promises = slices.map(slice => this._ensureSliceState(slice) + .then(_slice => this.enqueueSlice(_slice)) + .finally(() => { + this._creating -= 1; + })); + + Promise.all(promises) + .catch(err => this.logger.error('failure to enqueue slice', err)); + }); + } } module.exports = Scheduler; diff --git a/packages/teraslice/lib/workers/execution-controller/slice-analytics.js b/packages/teraslice/lib/workers/execution-controller/slice-analytics.js index c1502080687..33724a44480 100644 --- a/packages/teraslice/lib/workers/execution-controller/slice-analytics.js +++ b/packages/teraslice/lib/workers/execution-controller/slice-analytics.js @@ -102,15 +102,22 @@ average memory: ${memory.average}, min: ${memory.min}, and max: ${memory.max} return sliceAnalytics; } - events.on('slice:success', (response) => { - if (response.analytics) { - addStats(response.analytics); + function onSliceSuccess({ analytics }) { + if (analytics) { + addStats(analytics); } - }); + } + + events.on('slice:success', onSliceSuccess); + + function shutdown() { + events.removeListener('slice:success', onSliceSuccess); + } return { addStats, analyzeStats, getStats, + shutdown, }; }; diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 7131ad833b9..c1a304e1dfc 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -1,6 +1,6 @@ { "name": "teraslice", - "version": "0.42.2", + "version": "0.42.3", "description": "Slice and dice your Elasticsearch data", "bin": "service.js", "main": "index.js", @@ -38,9 +38,8 @@ "@terascope/error-parser": "^1.0.1", "@terascope/job-components": "^0.5.1", "@terascope/queue": "^1.1.4", - "@terascope/teraslice-messaging": "^0.2.2", + "@terascope/teraslice-messaging": "^0.2.3", "async-mutex": "^0.1.3", - "auto-bind": "^1.2.1", "barbe": "^3.0.14", "bluebird": "^3.5.2", "bluebird-retry": "^0.11.0", @@ -49,7 +48,7 @@ "datemath-parser": "^1.0.6", "decompress": "^4.2.0", "easy-table": "^1.1.1", - "express": "^4.16.3", + "express": "^4.16.4", "fs-extra": "^7.0.0", "kubernetes-client": "^5.4.0", "lodash": "^4.17.11", @@ -79,6 +78,6 @@ "jest-extended": "^0.11.0", "jest-fixtures": "^0.6.0", "json-schema-faker": "^0.5.0-rc16", - "nock": "^10.0.0" + "nock": "^10.0.1" } } diff --git a/packages/teraslice/test/workers/context/terafoundation-context-spec.js b/packages/teraslice/test/workers/context/terafoundation-context-spec.js index 392a0fd8864..cce645214cc 100644 --- a/packages/teraslice/test/workers/context/terafoundation-context-spec.js +++ b/packages/teraslice/test/workers/context/terafoundation-context-spec.js @@ -21,8 +21,8 @@ describe('Terafoundation Context', () => { expect(context.foundation).toHaveProperty('getEventEmitter'); expect(context.apis.foundation).toHaveProperty('getConnection'); expect(context.foundation).toHaveProperty('getConnection'); - expect(context.apis.foundation).not.toHaveProperty('startWorkers'); - expect(context.foundation).not.toHaveProperty('startWorkers'); + expect(context.apis.foundation).toHaveProperty('startWorkers'); + expect(context.foundation).toHaveProperty('startWorkers'); expect(context.apis).toHaveProperty('registerAPI'); }); diff --git a/packages/teraslice/test/workers/execution-controller/execution-controller-spec.js b/packages/teraslice/test/workers/execution-controller/execution-controller-spec.js index 255db7a343f..710d0a6d46a 100644 --- a/packages/teraslice/test/workers/execution-controller/execution-controller-spec.js +++ b/packages/teraslice/test/workers/execution-controller/execution-controller-spec.js @@ -143,6 +143,13 @@ describe('ExecutionController', () => { shutdown: () => Promise.reject(new Error('Store Error')) }; + exController.executionAnalytics = {}; + exController.executionAnalytics.shutdown = () => Promise.reject(new Error('Execution Analytics Error')); + + exController.collectAnalytics = true; + exController.slicerAnalytics = {}; + exController.slicerAnalytics.shutdown = () => Promise.reject(new Error('Slicer Analytics Error')); + exController.recover = {}; exController.recover.shutdown = () => Promise.reject(new Error('Recover Error')); @@ -161,6 +168,8 @@ describe('ExecutionController', () => { const errMsg = err.toString(); expect(errMsg).toStartWith('Error: Failed to shutdown correctly'); expect(errMsg).toInclude('Store Error'); + expect(errMsg).toInclude('Execution Analytics Error'); + expect(errMsg).toInclude('Slicer Analytics Error'); expect(errMsg).toInclude('Recover Error'); expect(errMsg).toInclude('Execution Controller Server Error'); expect(errMsg).toInclude('Cluster Master Client Error'); diff --git a/packages/teraslice/test/workers/execution-controller/recovery-spec.js b/packages/teraslice/test/workers/execution-controller/recovery-spec.js index a1785c6f179..4a7b2c02b76 100644 --- a/packages/teraslice/test/workers/execution-controller/recovery-spec.js +++ b/packages/teraslice/test/workers/execution-controller/recovery-spec.js @@ -106,7 +106,7 @@ describe('execution recovery', () => { recovery._sliceComplete({ slice: { slice_id: 1 } }); - expect(recovery._retryState()).toEqual({ }); + expect(recovery._retryState()).toEqual({ 1: false }); expect(recovery._recoveryBatchCompleted()).toEqual(true); }); @@ -137,7 +137,10 @@ describe('execution recovery', () => { waitFor(sendSucess2, 250) ]) .then(() => { - expect(recovery._retryState()).toEqual({}); + expect(recovery._retryState()).toEqual({ + 1: false, + 2: false, + }); expect(recovery._recoveryBatchCompleted()).toEqual(true); return recovery._setId({ slice_id: 2 }); }) diff --git a/packages/teraslice/test/workers/execution-controller/scheduler-spec.js b/packages/teraslice/test/workers/execution-controller/scheduler-spec.js index fd744f4a6c9..a1748cb4db8 100644 --- a/packages/teraslice/test/workers/execution-controller/scheduler-spec.js +++ b/packages/teraslice/test/workers/execution-controller/scheduler-spec.js @@ -6,8 +6,8 @@ const TestContext = require('../helpers/test-context'); const Scheduler = require('../../../lib/workers/execution-controller/scheduler'); describe('Scheduler', () => { - let slicers; - let countPerSlicer; + const slicers = 3; + const countPerSlicer = 200; let expectedCount; let testContext; let scheduler; @@ -21,7 +21,7 @@ describe('Scheduler', () => { clearInterval(intervalId); resolve(slices); } else { - const result = scheduler.getSlices(10); + const result = scheduler.getSlices(100); if (result.length > 0) { slices.push(...result); } @@ -43,8 +43,6 @@ describe('Scheduler', () => { } beforeEach(async () => { - slicers = 3; - countPerSlicer = 200; expectedCount = slicers * countPerSlicer; testContext = new TestContext({ @@ -59,6 +57,10 @@ describe('Scheduler', () => { testContext.executionContext ); + scheduler.stateStore = { + createState: () => Promise.delay() + }; + registerSlicers(); testContext.attachCleanup(() => scheduler.cleanup()); @@ -77,6 +79,47 @@ describe('Scheduler', () => { expect(scheduler.isFinished).toBeFalse(); }); + it('should throw an error when run is called before slicers are registered', () => { + scheduler.ready = false; + return expect(scheduler.run()).rejects.toThrowError('Scheduler needs to have registered slicers first'); + }); + + it('should throw an error when registering a non-array of slicers', () => { + expect(() => { + scheduler.registerSlicers({}); + }).toThrowError(`newSlicer from module ${testContext.config.job.operations[0]._op} needs to return an array of slicers`); + }); + + it('should be able to reenqueue a slice', () => { + scheduler.enqueueSlices([ + { + slice_id: 1, + }, + { + slice_id: 2, + } + ]); + + scheduler.enqueueSlice({ slice_id: 1 }); + + scheduler.enqueueSlice({ + slice_id: 3 + }, true); + + const slices = scheduler.getSlices(100); + expect(slices).toEqual([ + { + slice_id: 3, + }, + { + slice_id: 1, + }, + { + slice_id: 2, + } + ]); + }); + it(`should be able to schedule ${expectedCount} slices`, async () => { let slices = []; @@ -98,7 +141,7 @@ describe('Scheduler', () => { const pause = _.once(() => { scheduler.pause(); - _.delay(scheduler.start, 10); + _.delay(() => scheduler.start(), 10); }); const pauseAfter = _.after(Math.round(countPerSlicer / 3), pause); @@ -116,9 +159,9 @@ describe('Scheduler', () => { }); it('should handle stop correctly', async () => { - let slices = []; + let slices = []; // eslint-disable-line - const stop = _.once(scheduler.stop); + const stop = _.once(() => scheduler.stop()); expectedCount = Math.round(countPerSlicer / 3); const stopAfter = _.after(expectedCount, stop); @@ -130,7 +173,10 @@ describe('Scheduler', () => { getSlices().then((_slices) => { slices = _slices; }), ]); - expect(slices).toBeArrayOfSize(expectedCount); + const min = expectedCount - slicers; + const max = expectedCount + slicers; + expect(slices.length).toBeWithin(min, max); + expect(scheduler.isFinished).toBeTrue(); expect(scheduler.stopped).toBeTrue(); expect(scheduler.slicersDone).toBeFalse(); diff --git a/packages/teraslice/test/workers/helpers/test-context.js b/packages/teraslice/test/workers/helpers/test-context.js index a97293ddf57..b88a0cbb9e7 100644 --- a/packages/teraslice/test/workers/helpers/test-context.js +++ b/packages/teraslice/test/workers/helpers/test-context.js @@ -79,8 +79,8 @@ class TestContext { this.config.ex_id = ex.ex_id; } - this.executionContext = new ExecutionContext(this.context, this.config); - await this.executionContext.initialize(); + const exContext = new ExecutionContext(this.context, this.config); + this.executionContext = await exContext.initialize(); this.nodeId = this.executionContext.node_id; this.exId = this.executionContext.ex_id; diff --git a/packages/teraslice/test/workers/worker/worker-spec.js b/packages/teraslice/test/workers/worker/worker-spec.js index 05090dcfe5b..3e0c3d0a357 100644 --- a/packages/teraslice/test/workers/worker/worker-spec.js +++ b/packages/teraslice/test/workers/worker/worker-spec.js @@ -106,12 +106,16 @@ describe('Worker', () => { server.onSliceSuccess((workerId, _msg) => { sliceSuccess = _msg; - shutdownPromise = server.shutdown(); + setImmediate(() => { + shutdownPromise = server.shutdown(); + }); }); server.onSliceFailure((workerId, _msg) => { sliceFailure = _msg; - shutdownPromise = server.shutdown(); + setImmediate(() => { + shutdownPromise = server.shutdown(); + }); }); await worker.run(); @@ -199,7 +203,7 @@ describe('Worker', () => { sliceFailure = _msg; }); - const msgPromise = server.once('worker:enqueue'); + const msgPromise = server.onceWithTimeout('worker:enqueue'); const workerStart = worker.runOnce(); @@ -220,7 +224,7 @@ describe('Worker', () => { }); it('should re-enqueue the worker after receiving the slice complete message', () => { - expect(msg.clientId).toEqual(worker.workerId); + expect(msg).not.toBeNil(); expect(sliceSuccess).toMatchObject({ slice: sliceConfig, }); @@ -251,7 +255,7 @@ describe('Worker', () => { sliceFailure = _msg; }); - const msgPromise = server.once('worker:enqueue'); + const msgPromise = server.onceWithTimeout('worker:enqueue'); const sliceConfig = await testContext.newSlice(); diff --git a/packages/teraslice/worker-service.js b/packages/teraslice/worker-service.js index 8f0e45ab025..25cb8c136b3 100644 --- a/packages/teraslice/worker-service.js +++ b/packages/teraslice/worker-service.js @@ -2,24 +2,20 @@ /* eslint-disable class-methods-use-this, no-console */ -const Promise = require('bluebird'); const _ = require('lodash'); -const get = require('lodash/get'); +const Promise = require('bluebird'); const { shutdownHandler } = require('./lib/workers/helpers/worker-shutdown'); const { safeDecode } = require('./lib/utils/encoding_utils'); const ExecutionContext = require('./lib/workers/context/execution-context'); const makeTerafoundationContext = require('./lib/workers/context/terafoundation-context'); +const ExecutionController = require('./lib/workers/execution-controller'); +const Worker = require('./lib/workers/worker'); class Service { - constructor() { + constructor(context) { const ex = this._getExecutionConfigFromEnv(); - this.context = makeTerafoundationContext(); - - this.shutdownHandler = shutdownHandler(this.context, () => { - if (!this.instance) return Promise.resolve(); - return this.instance.shutdown(); - }); + this.context = context; this.executionConfig = { assignment: this.context.assignment, @@ -37,26 +33,20 @@ class Service { }; this.logger = this.context.logger; - this.shutdownTimeout = get(this.context, 'sysconfig.teraslice.shutdown_timeout', 60 * 1000); + this.shutdownTimeout = _.get(this.context, 'sysconfig.teraslice.shutdown_timeout', 60 * 1000); } async initialize() { const { assignment, ex_id: exId } = this.executionConfig; this.logger.trace(`Initializing ${assignment} for execution ${exId}...`, this.executionConfig); - this.executionContext = new ExecutionContext(this.context, this.executionConfig); - await this.executionContext.initialize(); + const exContext = new ExecutionContext(this.context, this.executionConfig); + const executionContext = await exContext.initialize(); if (assignment === 'worker') { - // require this here so node doesn't have load extra code into memory - const Worker = require('./lib/workers/worker'); - - this.instance = new Worker(this.context, this.executionContext); + this.instance = new Worker(this.context, executionContext); } else if (assignment === 'execution_controller') { - // require this here so node doesn't have load extra code into memory - const ExecutionController = require('./lib/workers/execution-controller'); - - this.instance = new ExecutionController(this.context, this.executionContext); + this.instance = new ExecutionController(this.context, executionContext); } await this.instance.initialize(); @@ -89,7 +79,14 @@ class Service { } } -const cmd = new Service(); +const context = makeTerafoundationContext(); +const cmd = new Service(context); + +cmd.shutdownHandler = shutdownHandler(context, () => { + if (!cmd.instance) return Promise.resolve(); + return cmd.instance.shutdown(); +}); + Promise.resolve() .then(() => cmd.initialize()) .then(() => cmd.run()) diff --git a/tsconfig.json b/tsconfig.json index 93e5e828e26..f5d263f1615 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,10 +1,12 @@ { "compileOnSave": true, "compilerOptions": { + "baseUrl": ".", "module": "commonjs", "moduleResolution": "node", "target": "es2017", "experimentalDecorators": true, + "composite": true, "declaration": true, "sourceMap": true, "inlineSources": true, diff --git a/yarn.lock b/yarn.lock index c53261500df..380eef61564 100644 --- a/yarn.lock +++ b/yarn.lock @@ -464,10 +464,10 @@ inquirer "^6.2.0" npmlog "^4.1.2" -"@lerna/publish@^3.4.1": - version "3.4.1" - resolved "https://registry.yarnpkg.com/@lerna/publish/-/publish-3.4.1.tgz#abbbc656b3bfafc2289399a46da060b90f6baf32" - integrity sha512-Nd5PT2Ksngo1GHqT6ccmGVfMvoA9MplBvqyzPFIjFIJOgODrJ9IGJAMobxgBQ6xXXShflQ/1dHWk8faTOYKLoQ== +"@lerna/publish@^3.4.3": + version "3.4.3" + resolved "https://registry.yarnpkg.com/@lerna/publish/-/publish-3.4.3.tgz#fb956ca2a871729982022889f90d0e8eb8528340" + integrity sha512-baeRL8xmOR25p86cAaS9mL0jdRzdv4dUo04PlK2Wes+YlL705F55cSXeC9npNie+9rGwFyLzCTQe18WdbZyLuw== dependencies: "@lerna/batch-packages" "^3.1.2" "@lerna/check-working-tree" "^3.3.0" @@ -635,12 +635,11 @@ resolved "https://registry.yarnpkg.com/@types/bluebird/-/bluebird-3.5.24.tgz#11f76812531c14f793b8ecbf1de96f672905de8a" integrity sha512-YeQoDpq4Lm8ppSBqAnAeF/xy1cYp/dMTif2JFcvmAbETMRlvKHT2iLcWu+WyYiJO3b3Ivokwo7EQca/xfLVJmg== -"@types/bunyan@^1.8.4": - version "1.8.4" - resolved "https://registry.yarnpkg.com/@types/bunyan/-/bunyan-1.8.4.tgz#69c11adc7b50538d45fb68d9ae39d062b9432f38" - integrity sha512-bxOF3fsm69ezKxdcJ7Oo/PsZMOJ+JIV/QJO2IADfScmR3sLulR88dpSnz6+q+9JJ1kD7dXFFgUrGRSKHLkOX7w== +"@types/bunyan@^1.8.5": + version "1.8.5" + resolved "https://registry.yarnpkg.com/@types/bunyan/-/bunyan-1.8.5.tgz#d992adbce8ed20cde634764bd8f269f29f703647" + integrity sha512-7n8ANtxh2c5A/NfCuv8cVtWcgSLdq76MQbtmbInpzXuPw4TSAReUJ+MGHK4m67I4zI3ynCJoABfaeHYJaYSeRg== dependencies: - "@types/events" "*" "@types/node" "*" "@types/convict@^4.2.0": @@ -648,15 +647,10 @@ resolved "https://registry.yarnpkg.com/@types/convict/-/convict-4.2.0.tgz#87ec9c10bfae801f79bf9dfa0f603c064e2fa5fa" integrity sha512-p+gNRe4RPjpl1lTBUomFJ42P8ymArH/P93DFJ0iY873BJ4ZmogcKc6TbHgZQmtQMsy3jxcAo0HcTjidXwo8uKg== -"@types/debug@^0.0.30": - version "0.0.30" - resolved "https://registry.yarnpkg.com/@types/debug/-/debug-0.0.30.tgz#dc1e40f7af3b9c815013a7860e6252f6352a84df" - integrity sha512-orGL5LXERPYsLov6CWs3Fh6203+dXzJkR7OnddIr2514Hsecwc8xRpzCapshBbKFImCsvS/mk6+FWiN5LyZJAQ== - -"@types/events@*": - version "1.2.0" - resolved "https://registry.yarnpkg.com/@types/events/-/events-1.2.0.tgz#81a6731ce4df43619e5c8c945383b3e62a89ea86" - integrity sha512-KEIlhXnIutzKwRbQkGWb/I4HFqBuUykAdHgDED6xqwXJfONCjF5VoE0cXEiurh3XauygxzeDzgtXUqvLkxFzzA== +"@types/debug@^0.0.31": + version "0.0.31" + resolved "https://registry.yarnpkg.com/@types/debug/-/debug-0.0.31.tgz#bac8d8aab6a823e91deb7f79083b2a35fa638f33" + integrity sha512-LS1MCPaQKqspg7FvexuhmDbWUhE2yIJ+4AgVIyObfc06/UKZ8REgxGNjZc82wPLWmbeOm7S+gSsLgo75TanG4A== "@types/fs-extra@^5.0.4": version "5.0.4" @@ -665,15 +659,15 @@ dependencies: "@types/node" "*" -"@types/jest@^23.3.3": - version "23.3.3" - resolved "https://registry.yarnpkg.com/@types/jest/-/jest-23.3.3.tgz#246ebcc52771d2327bb8e37aa971b412d9dc4237" - integrity sha512-G6EBrbjWDfmIpYu8UcRBOhwtDiYaLj5N5jUR5rx0YvbKxRBhXPZVLUmtfShewSUNKiQwpHavpML69a2WMbIlEQ== +"@types/jest@^23.3.5": + version "23.3.5" + resolved "https://registry.yarnpkg.com/@types/jest/-/jest-23.3.5.tgz#870a1434208b60603745bfd214fc3fc675142364" + integrity sha512-3LI+vUC3Wju28vbjIjsTKakhMB8HC4l+tMz+Z8WRzVK+kmvezE5jcOvKtBpznWSI5KDLFo+FouUhpTKoekadCA== -"@types/lodash@^4.14.116": - version "4.14.116" - resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.116.tgz#5ccf215653e3e8c786a58390751033a9adca0eb9" - integrity sha512-lRnAtKnxMXcYYXqOiotTmJd74uawNWuPnsnPrrO7HiFuE3npE2iQhfABatbYDyxTNqZNuXzcKGhw37R7RjBFLg== +"@types/lodash@^4.14.117": + version "4.14.117" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.117.tgz#695a7f514182771a1e0f4345d189052ee33c8778" + integrity sha512-xyf2m6tRbz8qQKcxYZa7PA4SllYcay+eh25DN3jmNYY6gSTL7Htc/bttVdkqj2wfJGbeWlQiX8pIyJpKU+tubw== "@types/nanoid@^1.2.0": version "1.2.0" @@ -687,10 +681,15 @@ resolved "https://registry.yarnpkg.com/@types/node/-/node-10.5.8.tgz#6f14ccecad1d19332f063a6a764f8907801fece0" integrity sha512-sWSjw+bYW/2W+1V3m8tVsm9PKJcxk3NHN7oRqNUfEdofKg0Imbdu1dQbFvLKjZQXEDXRN6IfSMACjJ7Wv4NGCQ== -"@types/node@^10.11.4": - version "10.11.4" - resolved "https://registry.yarnpkg.com/@types/node/-/node-10.11.4.tgz#e8bd933c3f78795d580ae41d86590bfc1f4f389d" - integrity sha512-ojnbBiKkZFYRfQpmtnnWTMw+rzGp/JiystjluW9jgN3VzRwilXddJ6aGQ9V/7iuDG06SBgn7ozW9k3zcAnYjYQ== +"@types/node@^10.11.7": + version "10.11.7" + resolved "https://registry.yarnpkg.com/@types/node/-/node-10.11.7.tgz#0e75ca9357d646ca754016ca1d68a127ad7e7300" + integrity sha512-yOxFfkN9xUFLyvWaeYj90mlqTJ41CsQzWKS3gXdOMOyPVacUsymejKxJ4/pMW7exouubuEeZLJawGgcNGYlTeg== + +"@types/p-event@^1.3.0": + version "1.3.0" + resolved "https://registry.yarnpkg.com/@types/p-event/-/p-event-1.3.0.tgz#4af0869918751f75a51d78533d73e225bd1b13ba" + integrity sha512-Pp6w4bQdrAiIzi5JnO6AVh6Vq51RjD27DxyeKHqCgPrlfqYu3xPnCs3pdqCVIokEIVX7EbJMIGG0ctzFk5u9lA== "@types/socket.io-client@^1.4.32": version "1.4.32" @@ -1393,23 +1392,7 @@ bluebird@^3.5.2: resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.5.2.tgz#1be0908e054a751754549c270489c1505d4ab15a" integrity sha512-dhHTWMI7kMx5whMQntl7Vr9C6BvV10lFXDAasnqnrMYhXVCzzk6IO9Fo2L75jXHT07WrOngL1WDXOp+yYS91Yg== -body-parser@1.18.2: - version "1.18.2" - resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.18.2.tgz#87678a19d84b47d859b83199bd59bce222b10454" - integrity sha1-h2eKGdhLR9hZuDGZvVm84iKxBFQ= - dependencies: - bytes "3.0.0" - content-type "~1.0.4" - debug "2.6.9" - depd "~1.1.1" - http-errors "~1.6.2" - iconv-lite "0.4.19" - on-finished "~2.3.0" - qs "6.5.1" - raw-body "2.3.2" - type-is "~1.6.15" - -body-parser@^1.18.2: +body-parser@1.18.3, body-parser@^1.18.2: version "1.18.3" resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.18.3.tgz#5b292198ffdd553b3a0f20ded0592b956955c8b4" integrity sha1-WykhmP/dVTs6DyDe0FkrlWlVyLQ= @@ -2277,7 +2260,7 @@ debug@2.2.0: debug@2.3.3: version "2.3.3" - resolved "http://registry.npmjs.org/debug/-/debug-2.3.3.tgz#40c453e67e6e13c901ddec317af8986cda9eff8c" + resolved "https://registry.yarnpkg.com/debug/-/debug-2.3.3.tgz#40c453e67e6e13c901ddec317af8986cda9eff8c" integrity sha1-QMRT5n5uE8kB3ewxeviYbNqe/4w= dependencies: ms "0.7.2" @@ -2310,6 +2293,13 @@ debug@^4.0.1: dependencies: ms "^2.1.1" +debug@^4.1.0: + version "4.1.0" + resolved "https://registry.yarnpkg.com/debug/-/debug-4.1.0.tgz#373687bffa678b38b1cd91f861b63850035ddc87" + integrity sha512-heNPJUJIqC+xB6ayLAMHaIrmN9HKa7aQO8MGqKpvCA+uJYVcvR6l5kgdrhRuwPFHU7P5/A1w0BjByPHwpfTDKg== + dependencies: + ms "^2.1.1" + debuglog@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/debuglog/-/debuglog-1.0.1.tgz#aa24ffb9ac3df9a2351837cfb2d279360cd78492" @@ -2501,12 +2491,7 @@ delegates@^1.0.0: resolved "https://registry.yarnpkg.com/delegates/-/delegates-1.0.0.tgz#84c6e159b81904fdca59a0ef44cd870d31250f9a" integrity sha1-hMbhWbgZBP3KWaDvRM2HDTElD5o= -depd@1.1.1: - version "1.1.1" - resolved "https://registry.yarnpkg.com/depd/-/depd-1.1.1.tgz#5783b4e1c459f06fa5ca27f991f3d06e7a310359" - integrity sha1-V4O04cRZ8G+lyif5kfPQbnoxA1k= - -depd@1.1.2, depd@~1.1.1, depd@~1.1.2: +depd@1.1.2, depd@~1.1.2: version "1.1.2" resolved "https://registry.yarnpkg.com/depd/-/depd-1.1.2.tgz#9bcd52e14c097763e749b274c4346ed2e560b5a9" integrity sha1-m81S4UwJd2PnSbJ0xDRu0uVgtak= @@ -2698,11 +2683,6 @@ elasticsearch@^15.1.1: chalk "^1.0.0" lodash "^4.17.10" -emittery@^0.4.1: - version "0.4.1" - resolved "https://registry.yarnpkg.com/emittery/-/emittery-0.4.1.tgz#abe9d3297389ba424ac87e53d1c701962ce7433d" - integrity sha512-r4eRSeStEGf6M5SKdrQhhLK5bOwOBxQhIE3YSTnZE3GpKiLfnnhE+tPtrJE79+eDJgm39BM6LSoI8SCx4HbwlQ== - encodeurl@~1.0.2: version "1.0.2" resolved "https://registry.yarnpkg.com/encodeurl/-/encodeurl-1.0.2.tgz#ad3ff4c86ec2d029322f5a02c3a9a606c95b3f59" @@ -3110,14 +3090,14 @@ expect@^23.6.0: jest-message-util "^23.4.0" jest-regex-util "^23.3.0" -express@^4.16.3: - version "4.16.3" - resolved "https://registry.yarnpkg.com/express/-/express-4.16.3.tgz#6af8a502350db3246ecc4becf6b5a34d22f7ed53" - integrity sha1-avilAjUNsyRuzEvs9rWjTSL37VM= +express@^4.16.4: + version "4.16.4" + resolved "https://registry.yarnpkg.com/express/-/express-4.16.4.tgz#fddef61926109e24c515ea97fd2f1bdbf62df12e" + integrity sha512-j12Uuyb4FMrd/qQAm6uCHAkPtO8FDTRJZBDd5D2KOL2eLaz1yUNdUB/NOIyq0iU4q4cFarsUCrnFDPBcnksuOg== dependencies: accepts "~1.3.5" array-flatten "1.1.1" - body-parser "1.18.2" + body-parser "1.18.3" content-disposition "0.5.2" content-type "~1.0.4" cookie "0.3.1" @@ -3134,10 +3114,10 @@ express@^4.16.3: on-finished "~2.3.0" parseurl "~1.3.2" path-to-regexp "0.1.7" - proxy-addr "~2.0.3" - qs "6.5.1" + proxy-addr "~2.0.4" + qs "6.5.2" range-parser "~1.2.0" - safe-buffer "5.1.1" + safe-buffer "5.1.2" send "0.16.2" serve-static "1.13.2" setprototypeof "1.1.0" @@ -3918,16 +3898,6 @@ http-cache-semantics@3.8.1, http-cache-semantics@^3.8.1: resolved "https://registry.yarnpkg.com/http-cache-semantics/-/http-cache-semantics-3.8.1.tgz#39b0e16add9b605bf0a9ef3d9daaf4843b4cacd2" integrity sha512-5ai2iksyV8ZXmnZhHH4rWPoxxistEexSi5936zIQ1bnNTW5VnA85B6P/VpXiRM017IgRvb2kKo1a//y+0wSp3w== -http-errors@1.6.2: - version "1.6.2" - resolved "https://registry.yarnpkg.com/http-errors/-/http-errors-1.6.2.tgz#0a002cc85707192a7e7946ceedc11155f60ec736" - integrity sha1-CgAsyFcHGSp+eUbO7cERVfYOxzY= - dependencies: - depd "1.1.1" - inherits "2.0.3" - setprototypeof "1.0.3" - statuses ">= 1.3.1 < 2" - http-errors@1.6.3, http-errors@~1.6.2, http-errors@~1.6.3: version "1.6.3" resolved "https://registry.yarnpkg.com/http-errors/-/http-errors-1.6.3.tgz#8b55680bb4be283a0b5bf4ea2e38580be1d9320d" @@ -3970,11 +3940,6 @@ humanize-ms@^1.2.1: dependencies: ms "^2.0.0" -iconv-lite@0.4.19: - version "0.4.19" - resolved "https://registry.yarnpkg.com/iconv-lite/-/iconv-lite-0.4.19.tgz#f7468f60135f5e5dad3399c0a81be9a1603a082b" - integrity sha512-oTZqweIP51xaGPI4uPa56/Pri/480R+mo7SeU+YETByQNhDG55ycFyNLIgta9vXhILrxXDmF7ZGhqZIcuN0gJQ== - iconv-lite@0.4.23, iconv-lite@^0.4.4, iconv-lite@~0.4.13: version "0.4.23" resolved "https://registry.yarnpkg.com/iconv-lite/-/iconv-lite-0.4.23.tgz#297871f63be507adcfbfca715d0cd0eed84e9a63" @@ -5212,10 +5177,10 @@ lerna-alias@^3.0.2: dependencies: get-lerna-packages "^0.1.0" -lerna@^3.4.1: - version "3.4.1" - resolved "https://registry.yarnpkg.com/lerna/-/lerna-3.4.1.tgz#4acc5a6b9d843993db7a7bb1350274bcaf20ca80" - integrity sha512-00X2mYuwJk/bvxdjJceUxTjUgUg7MIMWllo2zGfDVGPijLadrg8QCtJASZqVE7HDQbBLDxLGPjswk29HF5JS2Q== +lerna@^3.4.3: + version "3.4.3" + resolved "https://registry.yarnpkg.com/lerna/-/lerna-3.4.3.tgz#501454efb453c65c305802d370ee337f7298787e" + integrity sha512-tWq1LvpHqkyB+FaJCmkEweivr88yShDMmauofPVdh0M5gU1cVucszYnIgWafulKYu2LMQ3IfUMUU5Pp3+MvADQ== dependencies: "@lerna/add" "^3.4.1" "@lerna/bootstrap" "^3.4.1" @@ -5229,7 +5194,7 @@ lerna@^3.4.1: "@lerna/init" "^3.3.0" "@lerna/link" "^3.3.0" "@lerna/list" "^3.3.2" - "@lerna/publish" "^3.4.1" + "@lerna/publish" "^3.4.3" "@lerna/run" "^3.3.2" "@lerna/version" "^3.4.1" import-local "^1.0.0" @@ -6004,13 +5969,13 @@ nice-try@^1.0.4: resolved "https://registry.yarnpkg.com/nice-try/-/nice-try-1.0.4.tgz#d93962f6c52f2c1558c0fbda6d512819f1efe1c4" integrity sha512-2NpiFHqC87y/zFke0fC0spBXL3bBsoh/p5H1EFhshxjCR5+0g2d6BiXbUFz9v1sAcxsk2htp2eQnNIci2dIYcA== -nock@^10.0.0: - version "10.0.0" - resolved "https://registry.yarnpkg.com/nock/-/nock-10.0.0.tgz#bcbd2c003961baffcae2ecd28e074574233d3869" - integrity sha512-hE0O9Uhrg7uOpAqnA6ZfnvCS/TZy0HJgMslJ829E7ZuRytcS86/LllupHDD6Tl8fFKQ24kWe1ikX3MCrKkwaaQ== +nock@^10.0.1: + version "10.0.1" + resolved "https://registry.yarnpkg.com/nock/-/nock-10.0.1.tgz#71eeb580c2995878e582b3e32420daead9eb44f7" + integrity sha512-M0aL9IDbUFURmokoXqejZQybZk8EtlYjUBjaoICVbW62uOlyPRsnEsceyOlUik4spCOt50ptwM4BTPt20ITtcQ== dependencies: chai "^4.1.2" - debug "^3.1.0" + debug "^4.1.0" deep-equal "^1.0.0" json-stringify-safe "^5.0.1" lodash "^4.17.5" @@ -6494,6 +6459,13 @@ p-defer@^1.0.0: resolved "https://registry.yarnpkg.com/p-defer/-/p-defer-1.0.0.tgz#9f6eb182f6c9aa8cd743004a7d4f96b196b0fb0c" integrity sha1-n26xgvbJqozXQwBKfU+WsZaw+ww= +p-event@^2.1.0: + version "2.1.0" + resolved "https://registry.yarnpkg.com/p-event/-/p-event-2.1.0.tgz#74de477a4e6b3aa8267240c7099e78ac52cb4db4" + integrity sha512-sDEpDVnzLGlJj3k590uUdpfEUySP5yAYlvfTCu5hTDvSTXQVecYWKcEwdO49PrZlnJ5wkfAvtawnno/jyXeqvA== + dependencies: + p-timeout "^2.0.1" + p-finally@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/p-finally/-/p-finally-1.0.0.tgz#3fbcfb15b899a44123b34b6dcc18b724336a2cae" @@ -6958,7 +6930,7 @@ protoduck@^5.0.0: dependencies: genfun "^4.0.1" -proxy-addr@~2.0.3: +proxy-addr@~2.0.4: version "2.0.4" resolved "https://registry.yarnpkg.com/proxy-addr/-/proxy-addr-2.0.4.tgz#ecfc733bf22ff8c6f407fa275327b9ab67e48b93" integrity sha512-5erio2h9jp5CHGwcybmxmVqHmnCBZeewlfJ0pex+UW7Qny7OOZXTtH56TGNyBizkgiOwhJtMKrVzDTeKcySZwA== @@ -7016,11 +6988,6 @@ q@^1.5.1: resolved "https://registry.yarnpkg.com/q/-/q-1.5.1.tgz#7e32f75b41381291d04611f1bf14109ac00651d7" integrity sha1-fjL3W0E4EpHQRhHxvxQQmsAGUdc= -qs@6.5.1: - version "6.5.1" - resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.1.tgz#349cdf6eef89ec45c12d7d5eb3fc0c870343a6d8" - integrity sha512-eRzhrN1WSINYCDCbrz796z37LOe3m5tmW7RQf6oBntukAG1nmovJvhnwHHRMAfeoItc1m2Hk02WER2aQ/iqs+A== - qs@6.5.2, qs@^6.5.1, qs@~6.5.1, qs@~6.5.2: version "6.5.2" resolved "https://registry.yarnpkg.com/qs/-/qs-6.5.2.tgz#cb3ae806e8740444584ef154ce8ee98d403f3e36" @@ -7062,16 +7029,6 @@ range-parser@~1.2.0: resolved "https://registry.yarnpkg.com/range-parser/-/range-parser-1.2.0.tgz#f49be6b487894ddc40dcc94a322f611092e00d5e" integrity sha1-9JvmtIeJTdxA3MlKMi9hEJLgDV4= -raw-body@2.3.2: - version "2.3.2" - resolved "https://registry.yarnpkg.com/raw-body/-/raw-body-2.3.2.tgz#bcd60c77d3eb93cde0050295c3f379389bc88f89" - integrity sha1-vNYMd9Prk83gBQKVw/N5OJvIj4k= - dependencies: - bytes "3.0.0" - http-errors "1.6.2" - iconv-lite "0.4.19" - unpipe "1.0.0" - raw-body@2.3.3: version "2.3.3" resolved "https://registry.yarnpkg.com/raw-body/-/raw-body-2.3.3.tgz#1b324ece6b5706e153855bc1148c65bb7f6ea0c3" @@ -7552,12 +7509,7 @@ rxjs@^6.1.0: dependencies: tslib "^1.9.0" -safe-buffer@5.1.1: - version "5.1.1" - resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.1.tgz#893312af69b2123def71f57889001671eeb2c853" - integrity sha512-kKvNJn6Mm93gAczWVJg7wH+wGYWNrDHdWvpUmHyEsgCtIwwo3bqPtV4tR5tuPaUhTOo/kvhVwd8XwwOllGYkbg== - -safe-buffer@^5.0.1, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1: +safe-buffer@5.1.2, safe-buffer@^5.0.1, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1: version "5.1.2" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g== @@ -7617,6 +7569,11 @@ semver@^5.5, semver@^5.5.1: resolved "https://registry.yarnpkg.com/semver/-/semver-5.5.1.tgz#7dfdd8814bdb7cabc7be0fb1d734cfb66c940477" integrity sha512-PqpAxfrEhlSUWge8dwIp4tZnQ25DIOthpiaHNIthsjEFQD6EvqUKUDM7L8O2rShkFccYo1VjJR0coWfNkCubRw== +semver@^5.6.0: + version "5.6.0" + resolved "https://registry.yarnpkg.com/semver/-/semver-5.6.0.tgz#7e74256fbaa49c75aa7c7a205cc22799cac80004" + integrity sha512-RS9R6R35NYgQn++fkDWaOmqGoj4Ek9gGs+DPxNUZKuwE183xjJroKvyo1IzVFeXvUrvmALy6FWD5xrdJT25gMg== + semver@~5.3.0: version "5.3.0" resolved "https://registry.yarnpkg.com/semver/-/semver-5.3.0.tgz#9b2ce5d3de02d17c6012ad326aa6b4d0cf54f94f" @@ -7676,11 +7633,6 @@ set-value@^2.0.0: is-plain-object "^2.0.3" split-string "^3.0.1" -setprototypeof@1.0.3: - version "1.0.3" - resolved "https://registry.yarnpkg.com/setprototypeof/-/setprototypeof-1.0.3.tgz#66567e37043eeb4f04d91bd658c0cbefb55b8e04" - integrity sha1-ZlZ+NwQ+608E2RvWWMDL77VbjgQ= - setprototypeof@1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/setprototypeof/-/setprototypeof-1.1.0.tgz#d0bd85536887b6fe7c0d818cb962d9d91c54e656" @@ -8013,7 +7965,7 @@ static-extend@^0.1.1: define-property "^0.2.5" object-copy "^0.1.0" -"statuses@>= 1.3.1 < 2", "statuses@>= 1.4.0 < 2": +"statuses@>= 1.4.0 < 2": version "1.5.0" resolved "https://registry.yarnpkg.com/statuses/-/statuses-1.5.0.tgz#161c7dac177659fd9811f43771fa99381478628c" integrity sha1-Fhx9rBd2Wf2YEfQ3cfqZOBR4Yow= @@ -8402,10 +8354,10 @@ trim-right@^1.0.1: resolved "https://registry.yarnpkg.com/trim-right/-/trim-right-1.0.1.tgz#cb2e1203067e0c8de1f614094b9fe45704ea6003" integrity sha1-yy4SAwZ+DI3h9hQJS5/kVwTqYAM= -ts-jest@^23.10.3: - version "23.10.3" - resolved "https://registry.yarnpkg.com/ts-jest/-/ts-jest-23.10.3.tgz#f42de669888dfd2795b1491016b1813230d553fa" - integrity sha512-Lgyfw1MYPfqAs1qrFBmqXu8LRrde8ItH70pmp1iQuRbkVXaap7QcaEpN+yiSxuppfvO8rqezVv8wOYZkKhR5wA== +ts-jest@^23.10.4: + version "23.10.4" + resolved "https://registry.yarnpkg.com/ts-jest/-/ts-jest-23.10.4.tgz#a7a953f55c9165bcaa90ff91014a178e87fe0df8" + integrity sha512-oV/wBwGUS7olSk/9yWMiSIJWbz5xO4zhftnY3gwv6s4SMg6WHF1m8XZNBvQOKQRiTAexZ9754Z13dxBq3Zgssw== dependencies: bs-logger "0.x" buffer-from "1.x" @@ -8523,7 +8475,7 @@ type-detect@^4.0.0: resolved "https://registry.yarnpkg.com/type-detect/-/type-detect-4.0.8.tgz#7646fb5f18871cfbb7749e69bd39a6388eb7450c" integrity sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g== -type-is@~1.6.15, type-is@~1.6.16: +type-is@~1.6.16: version "1.6.16" resolved "https://registry.yarnpkg.com/type-is/-/type-is-1.6.16.tgz#f89ce341541c672b25ee7ae3c73dee3b2be50194" integrity sha512-HRkVv/5qY2G6I8iab9cI7v1bOIdhm94dVjQCPFElW9W+3GeDOSHmy2EBYe4VTApuzolPcmgFTN3ftVJRKR2J9Q== @@ -8543,10 +8495,10 @@ typedarray@^0.0.6: resolved "https://registry.yarnpkg.com/typedarray/-/typedarray-0.0.6.tgz#867ac74e3864187b1d3d47d996a78ec5c8830777" integrity sha1-hnrHTjhkGHsdPUfZlqeOxciDB3c= -typescript@^3.1.1: - version "3.1.1" - resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.1.1.tgz#3362ba9dd1e482ebb2355b02dfe8bcd19a2c7c96" - integrity sha512-Veu0w4dTc/9wlWNf2jeRInNodKlcdLgemvPsrNpfu5Pq39sgfFjvIIgTsvUHCoLBnMhPoUA+tFxsXjU6VexVRQ== +typescript@^3.1.2: + version "3.1.2" + resolved "https://registry.yarnpkg.com/typescript/-/typescript-3.1.2.tgz#c03a5d16f30bb60ad8bb6fe8e7cb212eedeec950" + integrity sha512-gOoGJWbNnFAfP9FlrSV63LYD5DJqYJHG5ky1kOXSl3pCImn4rqWy/flyq1BRd4iChQsoCqjbQaqtmXO4yCVPCA== typpy@^2.3.1: version "2.3.10"