Skip to content

Commit

Permalink
[teraslice-messaging, teraslice] Add abortController for use with sen…
Browse files Browse the repository at this point in the history
…dSliceComplete() (#3838)

This PR makes the following changes:

- The execution-controller `Client` now creates an `abortController`.
Every call to `Client.sendSliceComplete()` now includes a reference to
`abortController.signal` allowing for the `pEvent` in the messenger
`Core.onceWithTimeout()` to be aborted.
- A new listener is added for the `server:shutdown` event that calls
`abortController.abort()`. This prevents a worker from still waiting for
a response to `sendSliceComplete()` after the server is shutdown.
- bump teraslice-messaging from 1.7.0 to 1.7.1

ref: #2106
  • Loading branch information
busma13 authored Nov 22, 2024
1 parent 85bbb94 commit 7690ba1
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 7 deletions.
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/teraslice-messaging",
"displayName": "Teraslice Messaging",
"version": "1.7.0",
"version": "1.7.1",
"description": "An internal teraslice messaging library using socket.io",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-messaging#readme",
"bugs": {
Expand Down
10 changes: 10 additions & 0 deletions packages/teraslice-messaging/src/execution-controller/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const ONE_MIN = 60 * 1000;

export class Client extends core.Client {
public workerId: string;
private abortController: AbortController;

constructor(opts: i.ClientOptions) {
const {
Expand Down Expand Up @@ -47,6 +48,7 @@ export class Client extends core.Client {

this.workerId = workerId;
this.available = false;
this.abortController = new AbortController();
}

async start() {
Expand Down Expand Up @@ -75,6 +77,13 @@ export class Client extends core.Client {
payload: msg.payload,
});
});

this.on('server:shutdown', () => {
// this will send an abort signal to the Core.onceWithTimeout pEvent
// allowing the worker to shutdown without receiving a response to
// a sendSliceComplete() message
this.abortController.abort();
});
}

onExecutionFinished(fn: () => void) {
Expand All @@ -85,6 +94,7 @@ export class Client extends core.Client {
return this.send('worker:slice:complete', withoutNil(payload), {
response: true,
volatile: false,
signal: this.abortController.signal
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/src/messenger/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export class Client extends Core {
respondBy,
};

const responseMsg = this.handleSendResponse(message);
const responseMsg = this.handleSendResponse(message, options.signal);
this.socket.emit(eventName, message);
return responseMsg;
}
Expand Down
25 changes: 21 additions & 4 deletions packages/teraslice-messaging/src/messenger/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import ms from 'ms';
import { pEvent } from 'p-event';
import { EventEmitter } from 'node:events';
import {
toString, isInteger, debugLogger, Logger
toString, isInteger, debugLogger,
Logger, TSError
} from '@terascope/utils';
import * as i from './interfaces.js';

Expand Down Expand Up @@ -40,11 +41,22 @@ export class Core extends EventEmitter {
this.removeAllListeners();
}

protected async handleSendResponse(sent: i.Message): Promise<i.Message | null> {
protected async handleSendResponse(
sent: i.Message,
signal?: AbortSignal
): Promise<i.Message | null> {
if (!sent.response) return null;

const remaining = sent.respondBy - Date.now();
const response = await this.onceWithTimeout(sent.id, remaining);
const response = await this.onceWithTimeout(sent.id, remaining, signal);

// server shutdown
if (signal?.aborted) {
const msg = sent.eventName === 'worker:slice:complete'
? `Execution controller shutdown before receiving worker slice analytics. Event: "${sent.eventName}"`
: `Execution controller shutdown before receiving "${sent.eventName}" event`;
throw new TSError(msg, { retryable: false });
}

// it is a timeout
if (response == null) {
Expand Down Expand Up @@ -136,12 +148,17 @@ export class Core extends EventEmitter {
}
}

async onceWithTimeout(eventName: string, timeout?: number): Promise<any> {
async onceWithTimeout(
eventName: string,
timeout?: number,
abortSignal?: AbortSignal
): Promise<any> {
const timeoutMs: number = this.getTimeout(timeout);
try {
const { payload } = (await pEvent(this, eventName, {
rejectionEvents: [],
timeout: timeoutMs,
signal: abortSignal
})) as i.EventMessage;
return payload;
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/teraslice-messaging/src/messenger/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface SendOptions {
volatile?: boolean;
response?: boolean;
timeout?: number;
signal?: AbortSignal;
}

export interface ConnectedClient {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@kubernetes/client-node": "~0.22.0",
"@terascope/elasticsearch-api": "~4.4.0",
"@terascope/job-components": "~1.6.0",
"@terascope/teraslice-messaging": "~1.7.0",
"@terascope/teraslice-messaging": "~1.7.1",
"@terascope/types": "~1.3.0",
"@terascope/utils": "~1.4.0",
"async-mutex": "~0.5.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/teraslice/src/lib/workers/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ export class Worker {
} catch (err) {
if (this.isShuttingDown) {
throw err;
} else if (err.retryable === false) {
this.logger.warn(`${err}, will not retry.`);
return true;
} else {
this.logger.warn(err);
}
Expand Down

0 comments on commit 7690ba1

Please sign in to comment.