Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[teraslice-messaging, teraslice] Add abortController for use with sendSliceComplete() #3838

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@terascope/teraslice-messaging",
"displayName": "Teraslice Messaging",
"version": "1.7.0",
"version": "1.7.1",
"description": "An internal teraslice messaging library using socket.io",
"homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-messaging#readme",
"bugs": {
Expand Down
10 changes: 10 additions & 0 deletions packages/teraslice-messaging/src/execution-controller/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const ONE_MIN = 60 * 1000;

export class Client extends core.Client {
public workerId: string;
private abortController: AbortController;

constructor(opts: i.ClientOptions) {
const {
Expand Down Expand Up @@ -47,6 +48,7 @@ export class Client extends core.Client {

this.workerId = workerId;
this.available = false;
this.abortController = new AbortController();
}

async start() {
Expand Down Expand Up @@ -75,6 +77,13 @@ export class Client extends core.Client {
payload: msg.payload,
});
});

this.on('server:shutdown', () => {
// this will send an abort signal to the Core.onceWithTimeout pEvent
// allowing the worker to shutdown without receiving a response to
// a sendSliceComplete() message
this.abortController.abort();
});
}

onExecutionFinished(fn: () => void) {
Expand All @@ -85,6 +94,7 @@ export class Client extends core.Client {
return this.send('worker:slice:complete', withoutNil(payload), {
response: true,
volatile: false,
signal: this.abortController.signal
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/src/messenger/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export class Client extends Core {
respondBy,
};

const responseMsg = this.handleSendResponse(message);
const responseMsg = this.handleSendResponse(message, options.signal);
this.socket.emit(eventName, message);
return responseMsg;
}
Expand Down
25 changes: 21 additions & 4 deletions packages/teraslice-messaging/src/messenger/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import ms from 'ms';
import { pEvent } from 'p-event';
import { EventEmitter } from 'node:events';
import {
toString, isInteger, debugLogger, Logger
toString, isInteger, debugLogger,
Logger, TSError
} from '@terascope/utils';
import * as i from './interfaces.js';

Expand Down Expand Up @@ -40,11 +41,22 @@ export class Core extends EventEmitter {
this.removeAllListeners();
}

protected async handleSendResponse(sent: i.Message): Promise<i.Message | null> {
protected async handleSendResponse(
sent: i.Message,
signal?: AbortSignal
): Promise<i.Message | null> {
if (!sent.response) return null;

const remaining = sent.respondBy - Date.now();
const response = await this.onceWithTimeout(sent.id, remaining);
const response = await this.onceWithTimeout(sent.id, remaining, signal);

// server shutdown
if (signal?.aborted) {
const msg = sent.eventName === 'worker:slice:complete'
? `Execution controller shutdown before receiving worker slice analytics. Event: "${sent.eventName}"`
: `Execution controller shutdown before receiving "${sent.eventName}" event`;
throw new TSError(msg, { retryable: false });
}

// it is a timeout
if (response == null) {
Expand Down Expand Up @@ -136,12 +148,17 @@ export class Core extends EventEmitter {
}
}

async onceWithTimeout(eventName: string, timeout?: number): Promise<any> {
async onceWithTimeout(
eventName: string,
timeout?: number,
abortSignal?: AbortSignal
): Promise<any> {
const timeoutMs: number = this.getTimeout(timeout);
try {
const { payload } = (await pEvent(this, eventName, {
rejectionEvents: [],
timeout: timeoutMs,
signal: abortSignal
})) as i.EventMessage;
return payload;
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/teraslice-messaging/src/messenger/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface SendOptions {
volatile?: boolean;
response?: boolean;
timeout?: number;
signal?: AbortSignal;
}

export interface ConnectedClient {
Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@kubernetes/client-node": "~0.22.0",
"@terascope/elasticsearch-api": "~4.4.0",
"@terascope/job-components": "~1.6.0",
"@terascope/teraslice-messaging": "~1.7.0",
"@terascope/teraslice-messaging": "~1.7.1",
"@terascope/types": "~1.3.0",
"@terascope/utils": "~1.4.0",
"async-mutex": "~0.5.0",
Expand Down
3 changes: 3 additions & 0 deletions packages/teraslice/src/lib/workers/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ export class Worker {
} catch (err) {
if (this.isShuttingDown) {
throw err;
} else if (err.retryable === false) {
this.logger.warn(`${err}, will not retry.`);
return true;
} else {
this.logger.warn(err);
}
Expand Down