Skip to content

Commit

Permalink
feat: implement dynamic behavior for halfOpenAfter
Browse files Browse the repository at this point in the history
Closes #68
  • Loading branch information
ghost91- authored and connor4312 committed Jul 22, 2024
1 parent 9dca8f8 commit 99e5887
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 8 deletions.
66 changes: 66 additions & 0 deletions src/CircuitBreakerPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { abortedSignal } from './common/abort';
import { BrokenCircuitError, TaskCancelledError } from './errors/Errors';
import { IsolatedCircuitError } from './errors/IsolatedCircuitError';
import { circuitBreaker, handleAll, handleType } from './Policy';
import { IterableBackoff } from './backoff/IterableBackoff';

class MyException extends Error {}

Expand Down Expand Up @@ -79,6 +80,71 @@ describe('CircuitBreakerPolicy', () => {
expect(onReset).calledOnce;
});

it('uses the given backof factory to decide whether to enter the half open state', async () => {
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new IterableBackoff([1000, 2000]),
breaker: new ConsecutiveBreaker(2),
});
p.onReset(onReset);
p.onHalfOpen(onHalfOpen);

await openBreaker();

clock.tick(1000);

const failedAttempt = p.execute(stub().throws(new MyException()));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledOnce;
await expect(failedAttempt).to.be.rejectedWith(MyException);
expect(p.state).to.equal(CircuitState.Open);

clock.tick(1000);

await expect(p.execute(stub().throws(new MyException()))).to.be.rejectedWith(
BrokenCircuitError,
);

clock.tick(1000);

const result = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledTwice;
expect(await result).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledOnce;
});

it('resets the backoff when closing the circuit', async () => {
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new IterableBackoff([1000, 2000]),
breaker: new ConsecutiveBreaker(2),
});
p.onReset(onReset);
p.onHalfOpen(onHalfOpen);

await openBreaker();

clock.tick(1000);

const halfOpenTest1 = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledOnce;
expect(await halfOpenTest1).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledOnce;

await openBreaker();

clock.tick(1000);

const halfOpenTest2 = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledTwice;
expect(await halfOpenTest2).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledTwice;
});

it('dedupes half-open tests', async () => {
await openBreaker();
clock.tick(1000);
Expand Down
48 changes: 42 additions & 6 deletions src/CircuitBreakerPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ConstantBackoff, IBackoff, IBackoffFactory } from './backoff/Backoff';
import { IBreaker } from './breaker/Breaker';
import { neverAbortedSignal } from './common/abort';
import { EventEmitter } from './common/Event';
Expand Down Expand Up @@ -30,9 +31,31 @@ export enum CircuitState {
Isolated,
}

/**
* Context passed into halfOpenAfter backoff delegate.
*/
export interface IHalfOpenAfterBackoffContext extends IDefaultPolicyContext {
/**
* The consecutive number of times the circuit has entered the
* {@link CircuitState.Open} state.
*/
attempt: number;
/**
* The result of the last method call that caused the circuit to enter the
* {@link CircuitState.Open} state. Either a thrown error, or a value that we
* determined should open the circuit.
*/
result: FailureReason<unknown>;
}

export interface ICircuitBreakerOptions {
breaker: IBreaker;
halfOpenAfter: number;
/**
* When to (potentially) enter the {@link CircuitState.HalfOpen} state from
* the {@link CircuitState.Open} state. Either a duration in milliseconds or a
* backoff factory.
*/
halfOpenAfter: number | IBackoffFactory<IHalfOpenAfterBackoffContext>;
}

type InnerState =
Expand All @@ -48,8 +71,11 @@ export class CircuitBreakerPolicy implements IPolicy {
private readonly resetEmitter = new EventEmitter<void>();
private readonly halfOpenEmitter = new EventEmitter<void>();
private readonly stateChangeEmitter = new EventEmitter<CircuitState>();
private readonly halfOpenAfterBackoffFactory: IBackoffFactory<IHalfOpenAfterBackoffContext>;
private innerLastFailure?: FailureReason<unknown>;
private innerState: InnerState = { value: CircuitState.Closed };
private openEnteredCount = 0;
private halfOpenAfterBackof: IBackoff<IHalfOpenAfterBackoffContext> | undefined;

/**
* Event emitted when the circuit breaker opens.
Expand Down Expand Up @@ -99,7 +125,12 @@ export class CircuitBreakerPolicy implements IPolicy {
constructor(
private readonly options: ICircuitBreakerOptions,
private readonly executor: ExecuteWrapper,
) {}
) {
this.halfOpenAfterBackoffFactory =
typeof options.halfOpenAfter === 'number'
? new ConstantBackoff(options.halfOpenAfter)
: options.halfOpenAfter;
}

/**
* Manually holds open the circuit breaker.
Expand Down Expand Up @@ -152,7 +183,7 @@ export class CircuitBreakerPolicy implements IPolicy {
} else {
this.innerLastFailure = result;
if (this.options.breaker.failure(state.value)) {
this.open(result);
this.open(result, signal);
}
}

Expand All @@ -167,7 +198,7 @@ export class CircuitBreakerPolicy implements IPolicy {
return this.execute(fn);

case CircuitState.Open:
if (Date.now() - state.openedAt < this.options.halfOpenAfter) {
if (Date.now() - state.openedAt < (this.halfOpenAfterBackof?.duration ?? 0)) {
throw new BrokenCircuitError();
}
const test = this.halfOpen(fn, signal);
Expand Down Expand Up @@ -197,7 +228,7 @@ export class CircuitBreakerPolicy implements IPolicy {
} else {
this.innerLastFailure = result;
this.options.breaker.failure(CircuitState.HalfOpen);
this.open(result);
this.open(result, signal);
}

return returnOrThrow(result);
Expand All @@ -209,21 +240,26 @@ export class CircuitBreakerPolicy implements IPolicy {
}
}

private open(reason: FailureReason<unknown>) {
private open(reason: FailureReason<unknown>, signal: AbortSignal) {
if (this.state === CircuitState.Isolated || this.state === CircuitState.Open) {
return;
}

this.innerState = { value: CircuitState.Open, openedAt: Date.now() };
this.breakEmitter.emit(reason);
this.stateChangeEmitter.emit(CircuitState.Open);
const context = { attempt: ++this.openEnteredCount, result: reason, signal };
this.halfOpenAfterBackof =
this.halfOpenAfterBackof?.next(context) ?? this.halfOpenAfterBackoffFactory.next(context);
}

private close() {
if (this.state === CircuitState.HalfOpen) {
this.innerState = { value: CircuitState.Closed };
this.resetEmitter.emit();
this.stateChangeEmitter.emit(CircuitState.Closed);
this.halfOpenAfterBackof = undefined;
this.openEnteredCount = 0;
}
}
}
10 changes: 8 additions & 2 deletions src/Policy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ConstantBackoff, IBackoffFactory } from './backoff/Backoff';
import { IBreaker } from './breaker/Breaker';
import { BulkheadPolicy } from './BulkheadPolicy';
import { CircuitBreakerPolicy } from './CircuitBreakerPolicy';
import { CircuitBreakerPolicy, IHalfOpenAfterBackoffContext } from './CircuitBreakerPolicy';
import { Event } from './common/Event';
import { ExecuteWrapper } from './common/Executor';
import { FallbackPolicy } from './FallbackPolicy';
Expand Down Expand Up @@ -450,7 +450,13 @@ export function retry(
* @param breaker The circuit breaker to use. This package exports
* ConsecutiveBreaker and SamplingBreakers for you to use.
*/
export function circuitBreaker(policy: Policy, opts: { halfOpenAfter: number; breaker: IBreaker }) {
export function circuitBreaker(
policy: Policy,
opts: {
halfOpenAfter: number | IBackoffFactory<IHalfOpenAfterBackoffContext>;
breaker: IBreaker;
},
) {
return new CircuitBreakerPolicy(
opts,
new ExecuteWrapper(policy.options.errorFilter, policy.options.resultFilter),
Expand Down

0 comments on commit 99e5887

Please sign in to comment.