Skip to content

Commit

Permalink
Add abort controller for use with sendSliceComplete
Browse files Browse the repository at this point in the history
  • Loading branch information
busma13 committed Nov 20, 2024
1 parent c99f3e7 commit 260f94e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 4 deletions.
10 changes: 10 additions & 0 deletions packages/teraslice-messaging/src/execution-controller/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const ONE_MIN = 60 * 1000;

export class Client extends core.Client {
public workerId: string;
private abortController: AbortController;

constructor(opts: i.ClientOptions) {
const {
Expand Down Expand Up @@ -47,6 +48,7 @@ export class Client extends core.Client {

this.workerId = workerId;
this.available = false;
this.abortController = new AbortController();
}

async start() {
Expand Down Expand Up @@ -75,6 +77,13 @@ export class Client extends core.Client {
payload: msg.payload,
});
});

this.on('server:shutdown', () => {
// this will send an abort signal to the Core.onceWithTimeout pEvent
// allowing the worker to shutdown without receiving a response to
// a sendSliceComplete() message
this.abortController.abort();
});
}

onExecutionFinished(fn: () => void) {
Expand All @@ -85,6 +94,7 @@ export class Client extends core.Client {
return this.send('worker:slice:complete', withoutNil(payload), {
response: true,
volatile: false,
signal: this.abortController.signal
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/teraslice-messaging/src/messenger/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export class Client extends Core {
respondBy,
};

const responseMsg = this.handleSendResponse(message);
const responseMsg = this.handleSendResponse(message, options.signal);
this.socket.emit(eventName, message);
return responseMsg;
}
Expand Down
19 changes: 16 additions & 3 deletions packages/teraslice-messaging/src/messenger/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,19 @@ export class Core extends EventEmitter {
this.removeAllListeners();
}

protected async handleSendResponse(sent: i.Message): Promise<i.Message | null> {
protected async handleSendResponse(
sent: i.Message,
signal?: AbortSignal
): Promise<i.Message | null> {
if (!sent.response) return null;

const remaining = sent.respondBy - Date.now();
const response = await this.onceWithTimeout(sent.id, remaining);
const response = await this.onceWithTimeout(sent.id, remaining, signal);

// server shutdown
if (signal?.aborted) {
throw new Error(`Messaging server shutdown before responding to message "${sent.eventName}"`);
}

// it is a timeout
if (response == null) {
Expand Down Expand Up @@ -136,12 +144,17 @@ export class Core extends EventEmitter {
}
}

async onceWithTimeout(eventName: string, timeout?: number): Promise<any> {
async onceWithTimeout(
eventName: string,
timeout?: number,
abortSignal?: AbortSignal
): Promise<any> {
const timeoutMs: number = this.getTimeout(timeout);
try {
const { payload } = (await pEvent(this, eventName, {
rejectionEvents: [],
timeout: timeoutMs,
signal: abortSignal
})) as i.EventMessage;
return payload;
} catch (err) {
Expand Down
1 change: 1 addition & 0 deletions packages/teraslice-messaging/src/messenger/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface SendOptions {
volatile?: boolean;
response?: boolean;
timeout?: number;
signal?: AbortSignal;
}

export interface ConnectedClient {
Expand Down

0 comments on commit 260f94e

Please sign in to comment.