Skip to content

Commit

Permalink
Merge pull request #850 from terascope/execution-controller-memory-leak
Browse files Browse the repository at this point in the history
v0.42.3 execution controller and messaging performance improvements
  • Loading branch information
godber authored Oct 15, 2018
2 parents edd20c0 + 3b1d79a commit 2793da2
Show file tree
Hide file tree
Showing 43 changed files with 892 additions and 853 deletions.
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
"publish:changed": "./scripts/publish.sh"
},
"dependencies": {
"lerna": "^3.4.1",
"typescript": "^3.1.1"
"lerna": "^3.4.3",
"typescript": "^3.1.2"
},
"devDependencies": {
"@types/jest": "^23.3.3",
"@types/jest": "^23.3.5",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"eslint": "^5.6.1",
Expand All @@ -35,8 +35,8 @@
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"lerna-alias": "^3.0.2",
"semver": "^5.5.1",
"ts-jest": "^23.10.3",
"semver": "^5.6.0",
"ts-jest": "^23.10.4",
"tslint": "^5.0.0"
},
"workspaces": [
Expand Down
2 changes: 1 addition & 1 deletion packages/docker-compose-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
},
"dependencies": {
"bluebird": "^3.5.2",
"debug": "^4.0.1"
"debug": "^4.1.0"
},
"devDependencies": {
"eslint": "^5.6.1",
Expand Down
2 changes: 1 addition & 1 deletion packages/elasticsearch-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"uuid": "^3.3.2"
},
"devDependencies": {
"debug": "^4.0.1",
"debug": "^4.1.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
Expand Down
10 changes: 5 additions & 5 deletions packages/job-components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
"@terascope/queue": "^1.1.4",
"@terascope/teraslice-types": "^0.2.1",
"@types/fs-extra": "^5.0.4",
"@types/lodash": "^4.14.116",
"@types/node": "^10.11.4",
"@types/lodash": "^4.14.117",
"@types/node": "^10.11.7",
"@types/uuid": "^3.4.4",
"datemath-parser": "^1.0.6",
"fs-extra": "^7.0.0",
Expand All @@ -50,16 +50,16 @@
"uuid": "^3.3.2"
},
"devDependencies": {
"@types/jest": "^23.3.3",
"@types/jest": "^23.3.5",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"rimraf": "^2.0.0",
"ts-jest": "^23.10.3",
"ts-jest": "^23.10.4",
"tslint": "^5.0.0",
"tslint-config-airbnb": "^5.11.0",
"typescript": "^3.1.1"
"typescript": "^3.1.2"
},
"engines": {
"node": ">=8.0.0"
Expand Down
1 change: 1 addition & 0 deletions packages/job-components/tsconfig.build.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extends": "./tsconfig",
"compilerOptions": {
"rootDir": "src",
"paths": {
"@terascope/*": [
"../*",
Expand Down
3 changes: 1 addition & 2 deletions packages/job-components/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
"extends": "../../tsconfig",
"compilerOptions": {
"baseUrl": ".",
"rootDir": "src",
"outDir": "dist"
},
"references": [
{
"path": "../teraslice-types/src"
"path": "../teraslice-types"
}
]
}
2 changes: 1 addition & 1 deletion packages/teraslice-client-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"nock": "^10.0.0"
"nock": "^10.0.1"
}
}
19 changes: 10 additions & 9 deletions packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-messaging",
"version": "0.2.2",
"version": "0.2.3",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -39,32 +39,33 @@
"@terascope/queue": "^1.1.4",
"@terascope/teraslice-types": "^0.2.1",
"@types/bluebird": "^3.5.24",
"@types/debug": "^0.0.30",
"@types/debug": "^0.0.31",
"@types/fs-extra": "^5.0.4",
"@types/lodash": "^4.14.116",
"@types/lodash": "^4.14.117",
"@types/nanoid": "^1.2.0",
"@types/node": "^10.11.4",
"@types/node": "^10.11.7",
"@types/p-event": "^1.3.0",
"@types/socket.io": "^1.4.38",
"@types/socket.io-client": "^1.4.32",
"bluebird": "^3.5.2",
"debug": "^4.0.1",
"emittery": "^0.4.1",
"debug": "^4.1.0",
"nanoid": "^1.2.6",
"p-event": "^2.1.0",
"porty": "^3.1.1",
"socket.io": "^1.7.4",
"socket.io-client": "^1.7.4"
},
"devDependencies": {
"@types/jest": "^23.3.3",
"@types/jest": "^23.3.5",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"jest": "^23.6.0",
"jest-extended": "^0.11.0",
"rimraf": "^2.0.0",
"ts-jest": "^23.10.3",
"ts-jest": "^23.10.4",
"tslint": "^5.0.0",
"tslint-config-airbnb": "^5.11.0",
"typescript": "^3.1.1"
"typescript": "^3.1.2"
},
"engines": {
"node": ">=8.0.0"
Expand Down
12 changes: 6 additions & 6 deletions packages/teraslice-messaging/src/cluster-master/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import _ from 'lodash';
import { isString } from 'lodash';
import * as i from './interfaces';
import * as core from '../messenger';

Expand All @@ -15,11 +15,11 @@ export class Client extends core.Client {
connectTimeout
} = opts;

if (!clusterMasterUrl || !_.isString(clusterMasterUrl)) {
if (!clusterMasterUrl || !isString(clusterMasterUrl)) {
throw new Error('ClusterMaster.Client requires a valid clusterMasterUrl');
}

if (!exId || !_.isString(exId)) {
if (!exId || !isString(exId)) {
throw new Error('ClusterMaster.Client requires a valid exId');
}

Expand Down Expand Up @@ -65,14 +65,14 @@ export class Client extends core.Client {
}

onExecutionAnalytics(fn: core.MessageHandler) {
this.socket.on('execution:analytics', this.handleResponse('execution:analytics', fn));
this.handleResponse(this.socket, 'execution:analytics', fn);
}

onExecutionPause(fn: core.MessageHandler) {
this.socket.on('execution:pause', this.handleResponse('execution:pause', fn));
this.handleResponse(this.socket, 'execution:pause', fn);
}

onExecutionResume(fn: core.MessageHandler) {
this.socket.on('execution:resume', this.handleResponse('execution:resume', fn));
this.handleResponse(this.socket, 'execution:resume', fn);
}
}
24 changes: 12 additions & 12 deletions packages/teraslice-messaging/src/cluster-master/server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import _ from 'lodash';
import { isNumber, cloneDeep, forOwn } from 'lodash';
import * as i from './interfaces';
import * as core from '../messenger';

Expand All @@ -17,7 +17,7 @@ export class Server extends core.Server {
pingTimeout,
} = opts;

if (!_.isNumber(nodeDisconnectTimeout)) {
if (!isNumber(nodeDisconnectTimeout)) {
throw new Error('ClusterMaster.Server requires a valid nodeDisconnectTimeout');
}

Expand Down Expand Up @@ -49,7 +49,7 @@ export class Server extends core.Server {

async start() {
this.on('connection', (msg) => {
this.onConnection(msg.clientId, msg.payload as SocketIO.Socket);
this.onConnection(msg.scope, msg.payload as SocketIO.Socket);
});

await this.listen();
Expand All @@ -68,38 +68,38 @@ export class Server extends core.Server {
}

getClusterAnalytics() {
return _.cloneDeep(this.clusterAnalytics);
return cloneDeep(this.clusterAnalytics);
}

onExecutionFinished(fn: (clientId: string, error?: core.ResponseError) => {}) {
this.on('execution:finished', (msg) => {
fn(msg.clientId, msg.error);
fn(msg.scope, msg.error);
});
}

private onConnection(exId: string, socket: SocketIO.Socket) {
socket.on('execution:finished', this.handleResponse('execution:finished', (msg: core.Message) => {
this.handleResponse(socket, 'execution:finished', (msg: core.Message) => {
this.emit('execution:finished', {
clientId: exId,
scope: exId,
payload: {},
error: msg.payload.error
});
}));
});

socket.on('cluster:analytics', this.handleResponse('cluster:analytics', (msg: core.Message) => {
this.handleResponse(socket, 'cluster:analytics', (msg: core.Message) => {
const data = msg.payload as i.ExecutionAnalyticsMessage;
if (!this.clusterAnalytics[data.kind]) {
return;
}

_.forOwn(data.stats, (value, field) => {
forOwn(data.stats, (value, field) => {
if (this.clusterAnalytics[data.kind][field] != null) {
this.clusterAnalytics[data.kind][field] += value;
}
});

this.emit('cluster:analytics', {
clientId: exId,
scope: exId,
payload: {
diff: data.stats,
current: this.clusterAnalytics[data.kind],
Expand All @@ -109,6 +109,6 @@ export class Server extends core.Server {
return {
recorded: true
};
}));
});
}
}
12 changes: 6 additions & 6 deletions packages/teraslice-messaging/src/execution-controller/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export class Client extends core.Client {
throw new Error(`Unable to connect to execution controller, caused by error: ${err.message}`);
}

this.socket.on('execution:slice:new', this.handleResponse('execution:slice:new', (msg: core.Message) => {
this.handleResponse(this.socket, 'execution:slice:new', (msg: core.Message) => {
if (this.listenerCount('execution:slice:new') === 0) {
return { willProcess: false };
}
Expand All @@ -62,13 +62,13 @@ export class Client extends core.Client {
return {
willProcess,
};
}));
});

this.socket.on('execution:finished', this.handleResponse('execution:finished', (msg: core.Message) => {
this.handleResponse(this.socket, 'execution:finished', (msg: core.Message) => {
this.emit('execution:finished', {
payload: msg.payload
});
}));
});
}

onExecutionFinished(fn: () => void) {
Expand All @@ -86,10 +86,11 @@ export class Client extends core.Client {
this.sendAvailable();

const slice = await new Promise((resolve) => {
const unsubscribe = this.on('execution:slice:new', onMessage);
this.once('execution:slice:new', onMessage);

const intervalId = setInterval(() => {
if (this.serverShutdown || !this.ready || fn()) {
this.removeListener('execution:slice:new', onMessage);
finish();
}
}, interval);
Expand All @@ -100,7 +101,6 @@ export class Client extends core.Client {

function finish(slice?: Slice) {
clearInterval(intervalId);
unsubscribe();
resolve(slice);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface Worker {
}

export interface ActiveWorkers {
[workerId: string]: string;
[workerId: string]: boolean;
}

export interface SliceResponseMessage {
Expand Down
Loading

0 comments on commit 2793da2

Please sign in to comment.