From d01f36a488512908916d6f688d90a428e5ff0e5e Mon Sep 17 00:00:00 2001 From: busma13 Date: Wed, 20 Nov 2024 12:12:17 -0700 Subject: [PATCH 1/4] Add abort controller for use with sendSliceComplete --- .../src/execution-controller/client.ts | 10 ++++++++++ .../src/messenger/client.ts | 2 +- .../teraslice-messaging/src/messenger/core.ts | 19 ++++++++++++++++--- .../src/messenger/interfaces.ts | 1 + 4 files changed, 28 insertions(+), 4 deletions(-) diff --git a/packages/teraslice-messaging/src/execution-controller/client.ts b/packages/teraslice-messaging/src/execution-controller/client.ts index 1e3db2c97a5..a3080a47bdb 100644 --- a/packages/teraslice-messaging/src/execution-controller/client.ts +++ b/packages/teraslice-messaging/src/execution-controller/client.ts @@ -7,6 +7,7 @@ const ONE_MIN = 60 * 1000; export class Client extends core.Client { public workerId: string; + private abortController: AbortController; constructor(opts: i.ClientOptions) { const { @@ -47,6 +48,7 @@ export class Client extends core.Client { this.workerId = workerId; this.available = false; + this.abortController = new AbortController(); } async start() { @@ -75,6 +77,13 @@ export class Client extends core.Client { payload: msg.payload, }); }); + + this.on('server:shutdown', () => { + // this will send an abort signal to the Core.onceWithTimeout pEvent + // allowing the worker to shutdown without receiving a response to + // a sendSliceComplete() message + this.abortController.abort(); + }); } onExecutionFinished(fn: () => void) { @@ -85,6 +94,7 @@ export class Client extends core.Client { return this.send('worker:slice:complete', withoutNil(payload), { response: true, volatile: false, + signal: this.abortController.signal }); } diff --git a/packages/teraslice-messaging/src/messenger/client.ts b/packages/teraslice-messaging/src/messenger/client.ts index 986bd8c1c46..38ad490a9fa 100644 --- a/packages/teraslice-messaging/src/messenger/client.ts +++ b/packages/teraslice-messaging/src/messenger/client.ts @@ -278,7 +278,7 @@ export class Client extends Core { respondBy, }; - const responseMsg = this.handleSendResponse(message); + const responseMsg = this.handleSendResponse(message, options.signal); this.socket.emit(eventName, message); return responseMsg; } diff --git a/packages/teraslice-messaging/src/messenger/core.ts b/packages/teraslice-messaging/src/messenger/core.ts index 609237fdd5e..f660dc23a7d 100644 --- a/packages/teraslice-messaging/src/messenger/core.ts +++ b/packages/teraslice-messaging/src/messenger/core.ts @@ -40,11 +40,19 @@ export class Core extends EventEmitter { this.removeAllListeners(); } - protected async handleSendResponse(sent: i.Message): Promise { + protected async handleSendResponse( + sent: i.Message, + signal?: AbortSignal + ): Promise { if (!sent.response) return null; const remaining = sent.respondBy - Date.now(); - const response = await this.onceWithTimeout(sent.id, remaining); + const response = await this.onceWithTimeout(sent.id, remaining, signal); + + // server shutdown + if (signal?.aborted) { + throw new Error(`Messaging server shutdown before responding to message "${sent.eventName}"`); + } // it is a timeout if (response == null) { @@ -136,12 +144,17 @@ export class Core extends EventEmitter { } } - async onceWithTimeout(eventName: string, timeout?: number): Promise { + async onceWithTimeout( + eventName: string, + timeout?: number, + abortSignal?: AbortSignal + ): Promise { const timeoutMs: number = this.getTimeout(timeout); try { const { payload } = (await pEvent(this, eventName, { rejectionEvents: [], timeout: timeoutMs, + signal: abortSignal })) as i.EventMessage; return payload; } catch (err) { diff --git a/packages/teraslice-messaging/src/messenger/interfaces.ts b/packages/teraslice-messaging/src/messenger/interfaces.ts index af6c2ab393d..a08878fb6a9 100644 --- a/packages/teraslice-messaging/src/messenger/interfaces.ts +++ b/packages/teraslice-messaging/src/messenger/interfaces.ts @@ -57,6 +57,7 @@ export interface SendOptions { volatile?: boolean; response?: boolean; timeout?: number; + signal?: AbortSignal; } export interface ConnectedClient { From d6d87f318dbccb8c479f25420d701e9975cf8477 Mon Sep 17 00:00:00 2001 From: busma13 Date: Wed, 20 Nov 2024 13:28:39 -0700 Subject: [PATCH 2/4] bump: (patch) @terascope/teraslice-messaging@1.7.1 --- packages/teraslice-messaging/package.json | 2 +- packages/teraslice/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/teraslice-messaging/package.json b/packages/teraslice-messaging/package.json index d72415d5d12..328dca0dfaf 100644 --- a/packages/teraslice-messaging/package.json +++ b/packages/teraslice-messaging/package.json @@ -1,7 +1,7 @@ { "name": "@terascope/teraslice-messaging", "displayName": "Teraslice Messaging", - "version": "1.7.0", + "version": "1.7.1", "description": "An internal teraslice messaging library using socket.io", "homepage": "https://github.com/terascope/teraslice/tree/master/packages/teraslice-messaging#readme", "bugs": { diff --git a/packages/teraslice/package.json b/packages/teraslice/package.json index 24f9b7ac3ee..059b249829b 100644 --- a/packages/teraslice/package.json +++ b/packages/teraslice/package.json @@ -41,7 +41,7 @@ "@kubernetes/client-node": "~0.22.0", "@terascope/elasticsearch-api": "~4.4.0", "@terascope/job-components": "~1.6.0", - "@terascope/teraslice-messaging": "~1.7.0", + "@terascope/teraslice-messaging": "~1.7.1", "@terascope/types": "~1.3.0", "@terascope/utils": "~1.4.0", "async-mutex": "~0.5.0", From 5e71e50630b43143c3baf9f7f796423050002aed Mon Sep 17 00:00:00 2001 From: busma13 Date: Thu, 21 Nov 2024 11:30:39 -0700 Subject: [PATCH 3/4] better error message, don't retry if server shutdown --- packages/teraslice-messaging/src/messenger/core.ts | 7 +++++-- packages/teraslice/src/lib/workers/worker/index.ts | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/teraslice-messaging/src/messenger/core.ts b/packages/teraslice-messaging/src/messenger/core.ts index f660dc23a7d..1a6fee2acde 100644 --- a/packages/teraslice-messaging/src/messenger/core.ts +++ b/packages/teraslice-messaging/src/messenger/core.ts @@ -2,7 +2,7 @@ import ms from 'ms'; import { pEvent } from 'p-event'; import { EventEmitter } from 'node:events'; import { - toString, isInteger, debugLogger, Logger + toString, isInteger, debugLogger, Logger, TSError } from '@terascope/utils'; import * as i from './interfaces.js'; @@ -51,7 +51,10 @@ export class Core extends EventEmitter { // server shutdown if (signal?.aborted) { - throw new Error(`Messaging server shutdown before responding to message "${sent.eventName}"`); + const msg = sent.eventName === 'worker:slice:complete' + ? `Execution controller shutdown before receiving worker slice analytics. Event: "${sent.eventName}"` + : `Execution controller shutdown before receiving "${sent.eventName}" event`; + throw new TSError(msg, { retryable: false }); } // it is a timeout diff --git a/packages/teraslice/src/lib/workers/worker/index.ts b/packages/teraslice/src/lib/workers/worker/index.ts index 18d4ddb4bd2..7a0807144c1 100644 --- a/packages/teraslice/src/lib/workers/worker/index.ts +++ b/packages/teraslice/src/lib/workers/worker/index.ts @@ -349,6 +349,9 @@ export class Worker { } catch (err) { if (this.isShuttingDown) { throw err; + } else if (err.retryable === false) { + this.logger.warn(`${err}, will not retry.`); + return true; } else { this.logger.warn(err); } From 3c296b0e62de6f92449f461f375f129c63228725 Mon Sep 17 00:00:00 2001 From: busma13 Date: Thu, 21 Nov 2024 11:33:11 -0700 Subject: [PATCH 4/4] fix import spacing --- packages/teraslice-messaging/src/messenger/core.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/teraslice-messaging/src/messenger/core.ts b/packages/teraslice-messaging/src/messenger/core.ts index 1a6fee2acde..8e952efbd52 100644 --- a/packages/teraslice-messaging/src/messenger/core.ts +++ b/packages/teraslice-messaging/src/messenger/core.ts @@ -2,7 +2,8 @@ import ms from 'ms'; import { pEvent } from 'p-event'; import { EventEmitter } from 'node:events'; import { - toString, isInteger, debugLogger, Logger, TSError + toString, isInteger, debugLogger, + Logger, TSError } from '@terascope/utils'; import * as i from './interfaces.js';