Skip to content

Commit

Permalink
Merge pull request #833 from terascope/execution-controller-memory-leak
Browse files Browse the repository at this point in the history
Execution Controller Performance Fixes
  • Loading branch information
godber authored Sep 21, 2018
2 parents 072618a + 587f2aa commit f9696ec
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 239 deletions.
4 changes: 1 addition & 3 deletions packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@terascope/teraslice-messaging",
"version": "0.1.0",
"version": "0.1.1",
"publishConfig": {
"access": "public"
},
Expand Down Expand Up @@ -43,13 +43,11 @@
"@types/lodash": "^4.14.116",
"@types/nanoid": "^1.2.0",
"@types/node": "^10.9.4",
"@types/node-cache": "^4.1.1",
"@types/socket.io": "^1.4.38",
"@types/socket.io-client": "^1.4.32",
"bluebird": "^3.5.2",
"debug": "^4.0.1",
"nanoid": "^1.2.3",
"node-cache": "^4.2.0",
"porty": "^3.1.1",
"rimraf": "^2.0.0",
"socket.io": "^1.7.4",
Expand Down
30 changes: 15 additions & 15 deletions packages/teraslice-messaging/src/execution-controller/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,19 @@ export class Server extends core.Server {
}

onSliceSuccess(fn: core.ClientEventFn) {
this.on('slice:success', fn);
this.on('slice:success', (workerId, payload) => {
_.defer(() => {
fn(workerId, payload);
});
});
}

onSliceFailure(fn: core.ClientEventFn) {
this.on('slice:failure', fn);
this.on('slice:failure', (workerId, payload) => {
_.defer(() => {
fn(workerId, payload);
});
});
}

sendExecutionFinishedToAll(exId: string) {
Expand All @@ -135,18 +143,11 @@ export class Server extends core.Server {
socket.on('worker:slice:complete', this.handleResponse('worker:slice:complete', (msg) => {
const workerResponse = msg.payload;
const sliceId = _.get(workerResponse, 'slice.slice_id');
const alreadyCompleted = this.cache.get(`${sliceId}:complete`);

if (!alreadyCompleted) {
this.cache.set(`${sliceId}:complete`, true);

_.defer(() => {
if (workerResponse.error) {
this.emit('slice:failure', workerId, workerResponse);
} else {
this.emit('slice:success', workerId, workerResponse);
}
});

if (workerResponse.error) {
this.emit('slice:failure', workerId, workerResponse);
} else {
this.emit('slice:success', workerId, workerResponse);
}

_.pull(this._activeWorkers, workerId);
Expand All @@ -155,7 +156,6 @@ export class Server extends core.Server {
});

return _.pickBy({
duplicate: alreadyCompleted,
recorded: true,
slice_id: sliceId,
});
Expand Down
15 changes: 1 addition & 14 deletions packages/teraslice-messaging/src/messenger/core.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import debugFn from 'debug';
import _ from 'lodash';
import NodeCache from 'node-cache';
import { EventEmitter } from 'events';
import * as i from './interfaces';
import { newMsgId } from '../utils';
Expand All @@ -9,7 +8,6 @@ const debug = debugFn('teraslice-messaging:core');

export class Core extends EventEmitter {
public closed: boolean = false;
protected cache: NodeCache;

protected networkLatencyBuffer: number;
protected actionTimeout: number;
Expand All @@ -27,21 +25,10 @@ export class Core extends EventEmitter {
if (!_.isSafeInteger(this.networkLatencyBuffer)) {
throw new Error('Messenger requires a valid networkLatencyBuffer');
}

this.cache = new NodeCache({
stdTTL: 30 * 60 * 1000, // 30 minutes
checkperiod: 10 * 60 * 1000, // 10 minutes
useClones: false,
});

}

close() {
this.closed = true;

this.cache.flushAll();
this.cache.close();

this.removeAllListeners();
}

Expand Down Expand Up @@ -105,7 +92,7 @@ export class Core extends EventEmitter {
return;
}

if (!msg.volatile) {
if (!msg.volatile && !this.isClientReady(message.to)) {
const remaining = msg.respondBy - Date.now();
await this.waitForClientReady(message.to, remaining);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/src/messenger/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export interface ConnectedClients {
}

export interface ClientSendFns {
[clientId: string]: (eventName: string, payload?: Payload, options?: SendOptions) => Promise<Message|null>;
[clientId: string]: (eventName: string, message: Message) => Promise<Message|null>;
}

export interface ClientEventFn {
Expand Down
129 changes: 66 additions & 63 deletions packages/teraslice-messaging/src/messenger/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,63 +177,51 @@ export class Server extends Core {
}

get connectedClients(): i.ConnectedClient[] {
const clients = this.filterClientsByState(connectedStates);
return _.cloneDeep(clients);
return _.clone(this.filterClientsByState(connectedStates));
}

get connectedClientCount(): number {
const clients = this.filterClientsByState(connectedStates);
return _.size(clients);
return this.countClientsByState(connectedStates);
}

get onlineClients(): i.ConnectedClient[] {
const clients = this.filterClientsByState(onlineStates);
return _.cloneDeep(clients);
return _.clone(this.filterClientsByState(onlineStates));
}

get onlineClientCount(): number {
const clients = this.filterClientsByState(onlineStates);
return _.size(clients);
return this.countClientsByState(onlineStates);
}

get disconnectedClients(): i.ConnectedClient[] {
const clients = this.filterClientsByState(disconnectedStates);
return _.cloneDeep(clients);
return _.clone(this.filterClientsByState(disconnectedStates));
}

get disconectedClientCount(): number {
const clients = this.filterClientsByState(disconnectedStates);
return _.size(clients);
return this.countClientsByState(disconnectedStates);
}

get offlineClients(): i.ConnectedClient[] {
const clients = this.filterClientsByState([i.ClientState.Offline]);
return _.cloneDeep(clients);
return _.clone(this.filterClientsByState([i.ClientState.Offline]));
}

get offlineClientCount(): number {
const clients = this.filterClientsByState([i.ClientState.Offline]);
return _.size(clients);
return this.countClientsByState([i.ClientState.Offline]);
}

get availableClients(): i.ConnectedClient[] {
const clients = this.filterClientsByState([i.ClientState.Available]);
return _.cloneDeep(clients);
return _.clone(this.filterClientsByState([i.ClientState.Available]));
}

get availableClientCount(): number {
const clients = this.filterClientsByState([i.ClientState.Available]);
return _.size(clients);
return this.countClientsByState([i.ClientState.Available]);
}

get unavailableClients(): i.ConnectedClient[] {
const clients = this.filterClientsByState(unavailableStates);
return _.cloneDeep(clients);
return _.clone(this.filterClientsByState(unavailableStates));
}

get unavailableClientCount(): number {
const clients = this.filterClientsByState(unavailableStates);
return _.size(clients);
return this.countClientsByState(unavailableStates);
}

onClientOnline(fn: i.ClientEventFn) {
Expand Down Expand Up @@ -289,12 +277,38 @@ export class Server extends Core {
return Promise.all(promises);
}

protected async send(clientId: string, eventName: string, payload: i.Payload = {}, options?: i.SendOptions): Promise<i.Message|null> {
protected async send(clientId: string, eventName: string, payload: i.Payload = {}, options: i.SendOptions = { response: true }): Promise<i.Message|null> {
if (!_.has(this._clientSendFns, clientId)) {
throw new Error(`No client found by that id "${clientId}"`);
}

return this._clientSendFns[clientId](eventName, payload, options);
if (this.closed) return null;

if (this.isShuttingDown) {
options.volatile = true;
}

if (!options.volatile && !this.isClientReady(clientId)) {
await this.waitForClientReady(clientId);
}

const response = options.response != null ? options.response : true;

const message: i.Message = {
id: newMsgId(),
respondBy: Date.now() + this.getTimeout(options.timeout),
eventName,
payload,
to: clientId,
from: this.serverName,
volatile: options.volatile,
response,
};

const responseMsg = await this._clientSendFns[clientId](eventName, message);

if (!responseMsg) return null;
return responseMsg as i.Message;
}

protected getClientMetadataFromSocket(socket: SocketIO.Socket): i.ClientSocketMetadata {
Expand All @@ -307,6 +321,16 @@ export class Server extends Core {
});
}

private countClientsByState(states: i.ClientState[]): number {
let count = 0;
for (const client of Object.values(this._clients)) {
if (states.includes(client.state)) {
count += 1;
}
}
return count;
}

protected updateClientState(clientId: string, update: i.UpdateClientState): boolean {
const client = this._clients[clientId];
if (!client) {
Expand All @@ -326,15 +350,6 @@ export class Server extends Core {
}

const updatedAt = new Date();
const debugObj = _.pickBy({
payload: update.payload,
error: update.payload,
socketId: update.socketId,
updatedAt,
});

debug(`${clientId} is being updated from ${currentState} to ${update.state}`, debugObj);

this._clients[clientId].state = update.state;
this._clients[clientId].updatedAt = updatedAt;

Expand Down Expand Up @@ -384,6 +399,20 @@ export class Server extends Core {
}

this.emit(`client:${update.state}`, clientId, update.error);

// cleanup socket and such
const { socketId } = this._clients[clientId];

if (this.server.sockets.sockets[socketId]) {
try {
this.server.sockets.sockets[socketId].removeAllListeners();
this.server.sockets.sockets[socketId].disconnect(true);
} catch (err) {
debug('error cleaning up socket when going offline', err);
}
delete this.server.sockets.sockets[socketId];
}

delete this._clientSendFns[clientId];
return true;

Expand Down Expand Up @@ -424,36 +453,10 @@ export class Server extends Core {
const client = this.ensureClient(socket);
const { clientId } = client;

this._clientSendFns[clientId] = async (eventName, payload = {}, options: i.SendOptions = { response: true }) => {
if (this.closed) return null;

if (this.isShuttingDown) {
options.volatile = true;
}

if (!options.volatile) {
await this.waitForClientReady(clientId);
}

const response = options.response != null ? options.response : true;

const message: i.Message = {
id: newMsgId(),
respondBy: Date.now() + this.getTimeout(options.timeout),
eventName,
payload,
to: clientId,
from: this.serverName,
volatile: options.volatile,
response,
};

const responseMsg = await new Promise((resolve, reject) => {
this._clientSendFns[clientId] = (eventName, message: i.Message) => {
return new Promise((resolve, reject) => {
socket.emit(eventName, message, this.handleSendResponse(message, resolve, reject));
});

if (!responseMsg) return null;
return responseMsg as i.Message;
};

socket.on('error', (err: Error|string) => {
Expand Down
11 changes: 3 additions & 8 deletions packages/teraslice-messaging/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import os from 'os';
import _ from 'lodash';
import url from 'url';
import nanoid from 'nanoid/generate';
import nanoid from 'nanoid';

export function newMsgId(lowerCase: boolean = false, length: number = 15): string {
let characters = '-0123456789abcdefghijklmnopqrstuvwxyz';
if (!lowerCase) {
characters += 'ABCDEFGHIJKLMNOPQRSTUVWXYZ';
}
const id = _.trim(nanoid(characters, length), '-');
return _.padEnd(id, length, 'abcdefghijklmnopqrstuvwxyz');
export function newMsgId(): string {
return nanoid();
}

export function formatURL(hostname = os.hostname(), port: number): string {
Expand Down
Loading

0 comments on commit f9696ec

Please sign in to comment.