Skip to content

Commit

Permalink
refactor: store backoff in circuit breaker state
Browse files Browse the repository at this point in the history
  • Loading branch information
connor4312 committed Jul 22, 2024
1 parent 99e5887 commit 19b86a8
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 41 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"sinon": "^14.0.0",
"sinon-chai": "^3.7.0",
"source-map-support": "^0.5.21",
"typescript": "^4.7.4"
"typescript": "^5.5.3"
},
"prettier": {
"printWidth": 100,
Expand Down
46 changes: 29 additions & 17 deletions src/CircuitBreakerPolicy.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { expect } from 'chai';
import { SinonFakeTimers, SinonStub, stub, useFakeTimers } from 'sinon';
import { promisify } from 'util';
import { IBackoffFactory } from './backoff/Backoff';
import { IterableBackoff } from './backoff/IterableBackoff';
import { ConsecutiveBreaker } from './breaker/Breaker';
import { CircuitBreakerPolicy, CircuitState } from './CircuitBreakerPolicy';
import {
CircuitBreakerPolicy,
CircuitState,
IHalfOpenAfterBackoffContext,
} from './CircuitBreakerPolicy';
import { abortedSignal } from './common/abort';
import { BrokenCircuitError, TaskCancelledError } from './errors/Errors';
import { IsolatedCircuitError } from './errors/IsolatedCircuitError';
import { circuitBreaker, handleAll, handleType } from './Policy';
import { IterableBackoff } from './backoff/IterableBackoff';

class MyException extends Error {}

Expand Down Expand Up @@ -115,34 +120,41 @@ describe('CircuitBreakerPolicy', () => {
});

it('resets the backoff when closing the circuit', async () => {
let args: { duration: number; attempt: number }[] = [];
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new IterableBackoff([1000, 2000]),
halfOpenAfter: new (class MyBreaker implements IBackoffFactory<IHalfOpenAfterBackoffContext> {
constructor(public readonly duration: number) {}

next(context: IHalfOpenAfterBackoffContext) {
args.push({ duration: this.duration + 1, attempt: context.attempt });
expect('error' in context.result).to.be.true;
return new MyBreaker(this.duration + 1);
}
})(0),
breaker: new ConsecutiveBreaker(2),
});
p.onReset(onReset);
p.onHalfOpen(onHalfOpen);

await openBreaker();

clock.tick(1000);
expect(args).to.deep.equal([{ duration: 1, attempt: 1 }]);
clock.tick(args.pop()!.duration);

const halfOpenTest1 = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledOnce;
expect(await halfOpenTest1).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledOnce;
await expect(p.execute(stub().throws(new MyException()))).to.be.rejectedWith(MyException);
expect(args).to.deep.equal([{ duration: 2, attempt: 2 }]);
clock.tick(args.pop()!.duration);

await p.execute(stub().resolves(42));
expect(args).to.be.empty;

await openBreaker();

clock.tick(1000);
expect(args).to.deep.equal([{ duration: 1, attempt: 1 }]);
clock.tick(args.pop()!.duration);

const halfOpenTest2 = p.execute(stub().resolves(42));
expect(p.state).to.equal(CircuitState.HalfOpen);
expect(onHalfOpen).calledTwice;
expect(await halfOpenTest2).to.equal(42);
expect(p.state).to.equal(CircuitState.Closed);
expect(onReset).calledTwice;
await p.execute(stub().resolves(42));
expect(args).to.be.empty;
});

it('dedupes half-open tests', async () => {
Expand Down
40 changes: 28 additions & 12 deletions src/CircuitBreakerPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,18 @@ export interface ICircuitBreakerOptions {
type InnerState =
| { value: CircuitState.Closed }
| { value: CircuitState.Isolated; counters: number }
| { value: CircuitState.Open; openedAt: number }
| { value: CircuitState.HalfOpen; test: Promise<any> };
| {
value: CircuitState.Open;
openedAt: number;
attemptNo: number;
backoff: IBackoff<IHalfOpenAfterBackoffContext>;
}
| {
value: CircuitState.HalfOpen;
test: Promise<any>;
attemptNo: number;
backoff: IBackoff<IHalfOpenAfterBackoffContext>;
};

export class CircuitBreakerPolicy implements IPolicy {
declare readonly _altReturn: never;
Expand All @@ -74,8 +84,6 @@ export class CircuitBreakerPolicy implements IPolicy {
private readonly halfOpenAfterBackoffFactory: IBackoffFactory<IHalfOpenAfterBackoffContext>;
private innerLastFailure?: FailureReason<unknown>;
private innerState: InnerState = { value: CircuitState.Closed };
private openEnteredCount = 0;
private halfOpenAfterBackof: IBackoff<IHalfOpenAfterBackoffContext> | undefined;

/**
* Event emitted when the circuit breaker opens.
Expand Down Expand Up @@ -198,11 +206,16 @@ export class CircuitBreakerPolicy implements IPolicy {
return this.execute(fn);

case CircuitState.Open:
if (Date.now() - state.openedAt < (this.halfOpenAfterBackof?.duration ?? 0)) {
if (Date.now() - state.openedAt < state.backoff.duration) {
throw new BrokenCircuitError();
}
const test = this.halfOpen(fn, signal);
this.innerState = { value: CircuitState.HalfOpen, test };
this.innerState = {
value: CircuitState.HalfOpen,
test,
backoff: state.backoff,
attemptNo: state.attemptNo + 1,
};
this.stateChangeEmitter.emit(CircuitState.HalfOpen);
return test;

Expand Down Expand Up @@ -245,21 +258,24 @@ export class CircuitBreakerPolicy implements IPolicy {
return;
}

this.innerState = { value: CircuitState.Open, openedAt: Date.now() };
const attemptNo =
this.innerState.value === CircuitState.HalfOpen ? this.innerState.attemptNo : 1;
const context = { attempt: attemptNo, result: reason, signal };
const backoff =
this.innerState.value === CircuitState.HalfOpen
? this.innerState.backoff.next(context)
: this.halfOpenAfterBackoffFactory.next(context);

this.innerState = { value: CircuitState.Open, openedAt: Date.now(), backoff, attemptNo };
this.breakEmitter.emit(reason);
this.stateChangeEmitter.emit(CircuitState.Open);
const context = { attempt: ++this.openEnteredCount, result: reason, signal };
this.halfOpenAfterBackof =
this.halfOpenAfterBackof?.next(context) ?? this.halfOpenAfterBackoffFactory.next(context);
}

private close() {
if (this.state === CircuitState.HalfOpen) {
this.innerState = { value: CircuitState.Closed };
this.resetEmitter.emit();
this.stateChangeEmitter.emit(CircuitState.Closed);
this.halfOpenAfterBackof = undefined;
this.openEnteredCount = 0;
}
}
}
3 changes: 1 addition & 2 deletions src/backoff/Backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
*/
export interface IBackoffFactory<T> {
/**
* Returns the first backoff duration. Can return "undefined" to signal
* that we should not back off.
* Returns the first backoff duration.
*/
next(context: T): IBackoff<T>;
}
Expand Down
2 changes: 1 addition & 1 deletion src/backoff/IterableBackoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class IterableBackoff implements IBackoffFactory<unknown> {
/**
* @inheritdoc
*/
public next() {
public next(_context: unknown) {
return instance(this.durations, 0);
}
}
Expand Down

0 comments on commit 19b86a8

Please sign in to comment.