Skip to content

Commit

Permalink
Merge pull request #844 from terascope/execution-controller-memory-leak
Browse files Browse the repository at this point in the history
Execution Controller Performance Improvements
  • Loading branch information
peterdemartini authored Oct 3, 2018
2 parents a176efe + 4ee9cdd commit ab6732c
Show file tree
Hide file tree
Showing 47 changed files with 1,447 additions and 721 deletions.
6 changes: 3 additions & 3 deletions e2e/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"version": "0.1.0",
"description": "Teraslice integration test suite",
"scripts": {
"test": "jest --runInBand $JEST_ARGS; echo '\n\n[WARN] Make sure to remember to run yarn clean or yarn test:e2e:clean to remove the docker containers'",
"test": "echo '[WARN] Make sure to remember to run yarn clean or yarn test:e2e:clean to remove the docker containers'; jest --runInBand",
"test:ci": "jest --runInBand $JEST_ARGS || (yarn logs; yarn clean; exit 1) && (yarn clean; exit 0)",
"logs": "docker-compose logs --no-color teraslice-worker teraslice-master | awk -F' [|] ' '{print $2}' | bunyan -o short",
"logs-follow": "docker-compose logs --follow --no-color teraslice-worker teraslice-master | awk -F' [|] ' '{print $2}' | bunyan -o short",
Expand All @@ -31,10 +31,10 @@
"bunyan": "^1.8.12",
"elasticsearch": "^15.1.1",
"jest": "^23.6.0",
"jest-extended": "^0.10.0",
"jest-extended": "^0.11.0",
"lodash": "^4.17.11",
"signale": "^1.3.0",
"teraslice-client-js": "^0.2.0",
"teraslice-client-js": "^0.3.0",
"uuid": "^3.3.2"
}
}
4 changes: 4 additions & 0 deletions e2e/test/wait.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ function waitForJobStatus(job, status) {
}

return job.waitForStatus(status, 100, 2 * 60 * 1000)
// since most of the time we are chaining this with other actions
// make sure we avoid unrealistic test conditions by giving the
// it a little bit of time
.then(result => Promise.delay(500).then(() => result))
.catch(async (err) => {
err.message = `Job: ${jobId}: ${err.message}`;
await logExErrors();
Expand Down
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@
},
"dependencies": {
"lerna": "^3.4.0",
"typescript": "^3.0.3"
"typescript": "^3.1.1"
},
"devDependencies": {
"@types/jest": "^23.3.2",
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"fs-extra": "^7.0.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0",
"jest-extended": "^0.11.0",
"lerna-alias": "^3.0.2",
"semver": "^5.5.1",
"ts-jest": "^23.10.1",
"ts-jest": "^23.10.3",
"tslint": "^5.0.0"
},
"workspaces": [
Expand Down
4 changes: 2 additions & 2 deletions packages/docker-compose-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
"debug": "^4.0.1"
},
"devDependencies": {
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0"
"jest-extended": "^0.11.0"
}
}
4 changes: 2 additions & 2 deletions packages/elasticsearch-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
},
"devDependencies": {
"debug": "^4.0.1",
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0"
"jest-extended": "^0.11.0"
}
}
4 changes: 2 additions & 2 deletions packages/error-parser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
"lodash": "^4.17.11"
},
"devDependencies": {
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0"
"jest-extended": "^0.11.0"
}
}
8 changes: 4 additions & 4 deletions packages/job-components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"@terascope/teraslice-types": "^0.2.0",
"@types/fs-extra": "^5.0.4",
"@types/lodash": "^4.14.116",
"@types/node": "^10.11.0",
"@types/node": "^10.11.3",
"@types/uuid": "^3.4.4",
"datemath-parser": "^1.0.6",
"fs-extra": "^7.0.0",
Expand All @@ -54,12 +54,12 @@
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0",
"jest-extended": "^0.11.0",
"rimraf": "^2.0.0",
"ts-jest": "^23.10.1",
"ts-jest": "^23.10.3",
"tslint": "^5.0.0",
"tslint-config-airbnb": "^5.11.0",
"typescript": "^3.0.3"
"typescript": "^3.1.1"
},
"engines": {
"node": ">=8.0.0"
Expand Down
4 changes: 2 additions & 2 deletions packages/queue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
},
"dependencies": {},
"devDependencies": {
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0"
"jest-extended": "^0.11.0"
}
}
4 changes: 2 additions & 2 deletions packages/teraslice-cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
"yargs": "^12.0.2"
},
"devDependencies": {
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0",
"jest-extended": "^0.11.0",
"jest-fixtures": "^0.6.0"
}
}
4 changes: 2 additions & 2 deletions packages/teraslice-client-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
"request-promise": "^4.2.2"
},
"devDependencies": {
"eslint": "^5.6.0",
"eslint": "^5.6.1",
"eslint-config-airbnb-base": "^13.1.0",
"eslint-plugin-import": "^2.14.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0",
"jest-extended": "^0.11.0",
"nock": "^10.0.0"
}
}
13 changes: 7 additions & 6 deletions packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-messaging",
"version": "0.2.0",
"version": "0.2.1",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -43,12 +43,13 @@
"@types/fs-extra": "^5.0.4",
"@types/lodash": "^4.14.116",
"@types/nanoid": "^1.2.0",
"@types/node": "^10.11.0",
"@types/node": "^10.11.3",
"@types/socket.io": "^1.4.38",
"@types/socket.io-client": "^1.4.32",
"bluebird": "^3.5.2",
"debug": "^4.0.1",
"nanoid": "^1.2.3",
"emittery": "^0.4.1",
"nanoid": "^1.2.6",
"porty": "^3.1.1",
"socket.io": "^1.7.4",
"socket.io-client": "^1.7.4"
Expand All @@ -58,12 +59,12 @@
"babel-core": "^6.0.0",
"babel-jest": "^23.6.0",
"jest": "^23.6.0",
"jest-extended": "^0.10.0",
"jest-extended": "^0.11.0",
"rimraf": "^2.0.0",
"ts-jest": "^23.10.1",
"ts-jest": "^23.10.3",
"tslint": "^5.0.0",
"tslint-config-airbnb": "^5.11.0",
"typescript": "^3.0.3"
"typescript": "^3.1.1"
},
"engines": {
"node": ">=8.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface ExecutionAnalytics {
workers_joined: number;
workers_reconnected: number;
workers_disconnected: number;
job_duration: number;
failed: number;
subslices: number;
queued: number;
Expand Down
29 changes: 24 additions & 5 deletions packages/teraslice-messaging/src/cluster-master/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ export class Server extends core.Server {
}

async start() {
this.on('connection', (clientId: string, socket: SocketIO.Socket) => {
this.onConnection(clientId, socket);
this.on('connection', (msg) => {
this.onConnection(msg.clientId, msg.payload as SocketIO.Socket);
});

await this.listen();
Expand All @@ -71,25 +71,44 @@ export class Server extends core.Server {
return _.cloneDeep(this.clusterAnalytics);
}

onExecutionFinished(fn: core.ClientEventFn) {
this.on('execution:finished', fn);
onExecutionFinished(fn: (clientId: string, error?: core.ResponseError) => {}) {
this.on('execution:finished', (msg) => {
fn(msg.clientId, msg.error);
});
}

private onConnection(exId: string, socket: SocketIO.Socket) {
socket.on('execution:finished', this.handleResponse('execution:finished', (msg: core.Message) => {
this.emit('execution:finished', exId, msg.payload.error);
this.emit('execution:finished', {
clientId: exId,
payload: {},
error: msg.payload.error
});
}));

socket.on('cluster:analytics', this.handleResponse('cluster:analytics', (msg: core.Message) => {
const data = msg.payload as i.ExecutionAnalyticsMessage;
if (!this.clusterAnalytics[data.kind]) {
return;
}

_.forOwn(data.stats, (value, field) => {
if (this.clusterAnalytics[data.kind][field] != null) {
this.clusterAnalytics[data.kind][field] += value;
}
});

this.emit('cluster:analytics', {
clientId: exId,
payload: {
diff: data.stats,
current: this.clusterAnalytics[data.kind],
}
});

return {
recorded: true
};
}));
}
}
34 changes: 24 additions & 10 deletions packages/teraslice-messaging/src/execution-controller/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,16 @@ export class Client extends core.Client {
}

this.socket.on('execution:slice:new', this.handleResponse('execution:slice:new', (msg: core.Message) => {
if (this.listenerCount('execution:slice:new') === 0) {
return { willProcess: false };
}

const willProcess = this.available;
if (willProcess) {
this.available = false;
this.emit('execution:slice:new', msg.payload);
this.emit('execution:slice:new', {
payload: msg.payload
});
}

return {
Expand All @@ -59,12 +65,14 @@ export class Client extends core.Client {
}));

this.socket.on('execution:finished', this.handleResponse('execution:finished', (msg: core.Message) => {
this.emit('execution:finished', msg.payload);
this.emit('execution:finished', {
payload: msg.payload
});
}));
}

onExecutionFinished(fn: core.ClientEventFn) {
this.once('execution:finished', fn);
onExecutionFinished(fn: () => void) {
this.on('execution:finished', fn);
}

sendSliceComplete(payload: i.SliceCompletePayload) {
Expand All @@ -78,17 +86,23 @@ export class Client extends core.Client {
this.sendAvailable();

const slice = await new Promise((resolve) => {
const unsubscribe = this.on('execution:slice:new', onMessage);

const intervalId = setInterval(() => {
if (this.serverShutdown || !this.ready || fn()) {
this.removeListener('execution:slice:new', onMessage);
resolve();
finish();
}
}, interval);
const onMessage = (msg: Slice) => {

function onMessage(msg: core.EventMessage) {
finish(msg.payload as Slice);
}

function finish(slice?: Slice) {
clearInterval(intervalId);
resolve(msg);
};
this.once('execution:slice:new', onMessage);
unsubscribe();
resolve(slice);
}
});

if (!slice) return;
Expand Down
Loading

0 comments on commit ab6732c

Please sign in to comment.