Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the neverAbortedSignal #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/BulkheadPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,17 @@ describe('Bulkhead', () => {
expect(b.queueSlots).to.equal(2);
});

it('links parent cancellation token', async () => {
it('links parent abort signal', async () => {
const b = bulkhead(1, Infinity);
const todo: Array<PromiseLike<void>> = [];
for (let i = 0; i < 3; i++) {
const parent = new AbortController();
todo.push(
b.execute(async ({ signal }) => {
await delay(1);
expect(signal.aborted).to.be.false;
expect(signal?.aborted).to.be.false;
parent.abort();
expect(signal.aborted).to.be.true;
expect(signal?.aborted).to.be.true;
}, parent.signal),
);
}
Expand Down
7 changes: 3 additions & 4 deletions src/BulkheadPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { neverAbortedSignal } from './common/abort';
import { defer } from './common/defer';
import { EventEmitter } from './common/Event';
import { ExecuteWrapper } from './common/Executor';
Expand All @@ -7,7 +6,7 @@ import { TaskCancelledError } from './errors/Errors';
import { IDefaultPolicyContext, IPolicy } from './Policy';

interface IQueueItem<T> {
signal: AbortSignal;
signal?: AbortSignal;
fn(context: IDefaultPolicyContext): Promise<T> | T;
resolve(value: T): void;
reject(error: Error): void;
Expand Down Expand Up @@ -62,9 +61,9 @@ export class BulkheadPolicy implements IPolicy {
*/
public async execute<T>(
fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T,
signal = neverAbortedSignal,
signal?: AbortSignal,
): Promise<T> {
if (signal.aborted) {
if (signal?.aborted) {
throw new TaskCancelledError();
}

Expand Down
6 changes: 3 additions & 3 deletions src/CircuitBreakerPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ describe('CircuitBreakerPolicy', () => {
expect(await p.execute(() => 42)).to.equal(42);
});

it('links parent cancellation token', async () => {
it('links parent abort signal', async () => {
const parent = new AbortController();
await circuitBreaker(handleAll, {
halfOpenAfter: 1000,
breaker: new ConsecutiveBreaker(3),
}).execute(({ signal }) => {
expect(signal.aborted).to.be.false;
expect(signal?.aborted).to.be.false;
parent.abort();
expect(signal.aborted).to.be.true;
expect(signal?.aborted).to.be.true;
}, parent.signal);
});

Expand Down
7 changes: 3 additions & 4 deletions src/CircuitBreakerPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { IBreaker } from './breaker/Breaker';
import { neverAbortedSignal } from './common/abort';
import { EventEmitter } from './common/Event';
import { ExecuteWrapper, returnOrThrow } from './common/Executor';
import { BrokenCircuitError, TaskCancelledError } from './errors/Errors';
Expand Down Expand Up @@ -141,7 +140,7 @@ export class CircuitBreakerPolicy implements IPolicy {
*/
public async execute<T>(
fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T,
signal = neverAbortedSignal,
signal?: AbortSignal,
): Promise<T> {
const state = this.innerState;
switch (state.value) {
Expand All @@ -160,7 +159,7 @@ export class CircuitBreakerPolicy implements IPolicy {

case CircuitState.HalfOpen:
await state.test.catch(() => undefined);
if (this.state === CircuitState.Closed && signal.aborted) {
if (this.state === CircuitState.Closed && signal?.aborted) {
throw new TaskCancelledError();
}

Expand All @@ -185,7 +184,7 @@ export class CircuitBreakerPolicy implements IPolicy {

private async halfOpen<T>(
fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T,
signal: AbortSignal,
signal?: AbortSignal,
): Promise<T> {
this.halfOpenEmitter.emit();

Expand Down
6 changes: 3 additions & 3 deletions src/FallbackPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ describe('FallbackPolicy', () => {
});
});

it('links parent cancellation token', async () => {
it('links parent abort signal', async () => {
const parent = new AbortController();
await fallback(handleAll, 'error').execute(({ signal }) => {
expect(signal.aborted).to.be.false;
expect(signal?.aborted).to.be.false;
parent.abort();
expect(signal.aborted).to.be.true;
expect(signal?.aborted).to.be.true;
}, parent.signal);
});
});
3 changes: 1 addition & 2 deletions src/FallbackPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { neverAbortedSignal } from './common/abort';
import { ExecuteWrapper } from './common/Executor';
import { IDefaultPolicyContext, IPolicy } from './Policy';

Expand All @@ -24,7 +23,7 @@ export class FallbackPolicy<AltReturn> implements IPolicy<IDefaultPolicyContext,
*/
public async execute<T>(
fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T,
signal = neverAbortedSignal,
signal?: AbortSignal,
): Promise<T | AltReturn> {
const result = await this.executor.invoke(fn, { signal });
if ('success' in result) {
Expand Down
3 changes: 1 addition & 2 deletions src/NoopPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { neverAbortedSignal } from './common/abort';
import { ExecuteWrapper, returnOrThrow } from './common/Executor';
import { IDefaultPolicyContext, IPolicy } from './Policy';

Expand All @@ -13,7 +12,7 @@ export class NoopPolicy implements IPolicy {

public async execute<T>(
fn: (context: IDefaultPolicyContext) => PromiseLike<T> | T,
signal: AbortSignal = neverAbortedSignal,
signal?: AbortSignal,
): Promise<T> {
return returnOrThrow(await this.executor.invoke(fn, { signal }));
}
Expand Down
6 changes: 3 additions & 3 deletions src/Policy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ describe('Policy', () => {
});
});

it('uses cancellation token in use', async () => {
it('uses abort signal in use', async () => {
class Calculator {
@usePolicy(retry(handleAll, { maxAttempts: 5 }))
public double(n: number, context: IRetryContext) {
expect(n).to.equal(2);
expect(context.signal.aborted).to.be.false;
expect(context.signal?.aborted).to.be.false;
cts.abort();
expect(context.signal.aborted).to.be.true;
expect(context.signal?.aborted).to.be.true;
return n * 2;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export interface IDefaultPolicyContext {
* Abort signal for the operation. This is propagated through multiple
* retry policies.
*/
signal: AbortSignal;
signal?: AbortSignal;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/RetryPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ describe('RetryPolicy', () => {
await expect(
retry(handleAll, { maxAttempts: 3 }).execute(({ signal }) => {
calls++;
expect(signal.aborted).to.be.false;
expect(signal?.aborted).to.be.false;
parent.abort();
expect(signal.aborted).to.be.true;
expect(signal?.aborted).to.be.true;
throw err;
}, parent.signal),
).to.eventually.be.rejectedWith(err);
Expand Down
5 changes: 2 additions & 3 deletions src/RetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { IBackoff, IBackoffFactory } from './backoff/Backoff';
import { ConstantBackoff } from './backoff/ConstantBackoff';
import { neverAbortedSignal } from './common/abort';
import { EventEmitter } from './common/Event';
import { ExecuteWrapper } from './common/Executor';
import { FailureReason, IDefaultPolicyContext, IPolicy } from './Policy';
Expand Down Expand Up @@ -94,7 +93,7 @@ export class RetryPolicy implements IPolicy<IRetryContext> {
*/
public async execute<T>(
fn: (context: IRetryContext) => PromiseLike<T> | T,
signal = neverAbortedSignal,
signal?: AbortSignal,
): Promise<T> {
const factory: IBackoffFactory<IRetryBackoffContext<unknown>> =
this.options.backoff || new ConstantBackoff(0);
Expand All @@ -105,7 +104,7 @@ export class RetryPolicy implements IPolicy<IRetryContext> {
return result.success;
}

if (!signal.aborted && retries < this.options.maxAttempts) {
if (!signal?.aborted && retries < this.options.maxAttempts) {
const context = { attempt: retries + 1, signal, result };
backoff = backoff ? backoff.next(context) : factory.next(context);
const delayDuration = backoff.duration;
Expand Down
2 changes: 1 addition & 1 deletion src/TimeoutPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe('TimeoutPolicy', () => {
expect(output).to.be.empty;
});

it('links parent cancellation token', async () => {
it('links parent abort signal', async () => {
const parent = new AbortController();
await timeout(1000, TimeoutStrategy.Cooperative).execute((_, signal) => {
expect(signal.aborted).to.be.false;
Expand Down
4 changes: 2 additions & 2 deletions src/TimeoutPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { IPolicy } from './Policy';

export enum TimeoutStrategy {
/**
* Cooperative timeouts will simply revoke the inner cancellation token,
* Cooperative timeouts will simply abort the inner abort signal,
* assuming the caller handles cancellation and throws or returns appropriately.
*/
Cooperative = 'optimistic',
Expand Down Expand Up @@ -72,7 +72,7 @@ export class TimeoutPolicy implements IPolicy<ICancellationContext> {

/**
* Executes the given function.
* @param fn Function to execute. Takes in a nested cancellation token.
* @param fn Function to execute. Takes in a nested abort signal.
* @throws a {@link TaskCancelledError} if a timeout occurs
*/
public async execute<T>(
Expand Down
9 changes: 3 additions & 6 deletions src/common/abort.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { onAbort } from './Event';

export const neverAbortedSignal = new AbortController().signal;

const cancelledSrc = new AbortController();
cancelledSrc.abort();
export const abortedSignal = cancelledSrc.signal;
Expand All @@ -18,12 +16,11 @@ export const deriveAbortController = (signal?: AbortSignal) => {

if (signal.aborted) {
ctrl.abort();
return ctrl;
}

if (signal !== neverAbortedSignal) {
const ref = new WeakRef(ctrl);
onAbort(signal)(() => ref.deref()?.abort());
}
const ref = new WeakRef(ctrl);
onAbort(signal)(() => ref.deref()?.abort());

return ctrl;
};