Skip to content

Commit

Permalink
fix: restore breaker state when circuit state is restored
Browse files Browse the repository at this point in the history
  • Loading branch information
connor4312 committed Jul 22, 2024
1 parent aa54e83 commit 6015acd
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 16 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 3.2.1

- **fix:** restore breaker state when circuit state is restored

## 3.2.0

- **feat:** allow hydration of circuit breaker `initialState` ([#89](https://github.com/connor4312/cockatiel/issues/89))
Expand Down
20 changes: 20 additions & 0 deletions src/CircuitBreakerPolicy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,26 @@ describe('CircuitBreakerPolicy', () => {
await p2.execute(stub().resolves(42));
});

it('restores breaker state', async () => {
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new MyBreaker(0),
breaker: new ConsecutiveBreaker(2),
});

const s = stub().throws(new MyException());
await expect(p.execute(s)).to.be.rejectedWith(MyException);

const p2 = circuitBreaker(handleType(MyException), {
halfOpenAfter: new MyBreaker(0),
breaker: new ConsecutiveBreaker(2),
initialState: p.toJSON(),
});

expect(p2.state).to.equal(CircuitState.Closed);
await expect(p2.execute(s)).to.be.rejectedWith(MyException);
expect(p2.state).to.equal(CircuitState.Open);
});

it('restores open state', async () => {
p = circuitBreaker(handleType(MyException), {
halfOpenAfter: new MyBreaker(0),
Expand Down
22 changes: 16 additions & 6 deletions src/CircuitBreakerPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type InnerState =
backoff: IBackoff<IHalfOpenAfterBackoffContext>;
};

interface ISerializedState {
ownState: Partial<InnerState>;
breakerState: unknown;
}

export class CircuitBreakerPolicy implements IPolicy {
declare readonly _altReturn: never;

Expand Down Expand Up @@ -146,7 +151,9 @@ export class CircuitBreakerPolicy implements IPolicy {
: options.halfOpenAfter;

if (options.initialState) {
this.innerState = options.initialState as InnerState;
const initialState = options.initialState as ISerializedState;
this.innerState = initialState.ownState as InnerState;
this.options.breaker.state = initialState.breakerState;

if (
this.innerState.value === CircuitState.Open ||
Expand Down Expand Up @@ -265,21 +272,24 @@ export class CircuitBreakerPolicy implements IPolicy {
*/
public toJSON(): unknown {
const state = this.innerState;
let ownState: Partial<InnerState>;
if (state.value === CircuitState.HalfOpen) {
return {
ownState = {
value: CircuitState.Open,
openedAt: 0,
attemptNo: state.attemptNo,
} satisfies Partial<InnerState>;
};
} else if (state.value === CircuitState.Open) {
return {
ownState = {
value: CircuitState.Open,
openedAt: state.openedAt,
attemptNo: state.attemptNo,
} satisfies Partial<InnerState>;
};
} else {
return state;
ownState = state;
}

return { ownState, breakerState: this.options.breaker.state } satisfies ISerializedState;
}

private async halfOpen<T>(
Expand Down
9 changes: 8 additions & 1 deletion src/breaker/Breaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import { CircuitState } from '../CircuitBreakerPolicy';
* The breaker determines when the circuit breaker should open.
*/
export interface IBreaker {
/**
* Gets or sets the internal state of the breaker. Used for serialization
* with {@link CircuitBreaker.toJSON}.
*/
state: unknown;

/**
* Called when a call succeeds.
*/
Expand All @@ -15,6 +21,7 @@ export interface IBreaker {
failure(state: CircuitState): boolean;
}

export * from './SamplingBreaker';
export * from './ConsecutiveBreaker';
export * from './CountBreaker';
export * from './SamplingBreaker';

16 changes: 16 additions & 0 deletions src/breaker/ConsecutiveBreaker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,20 @@ describe('ConsecutiveBreaker', () => {
expect(c.failure()).to.be.false;
expect(c.failure()).to.be.true;
});

it('serializes and deserializes', () => {
const c = new ConsecutiveBreaker(3);
expect(c.failure()).to.be.false;
expect(c.failure()).to.be.false;
expect(c.failure()).to.be.true;

const c2 = new ConsecutiveBreaker(3);
c2.state = c.state;
expect(c.failure()).to.be.true;

c.success();
expect(c.failure()).to.be.false;
expect(c.failure()).to.be.false;
expect(c.failure()).to.be.true;
});
});
9 changes: 6 additions & 3 deletions src/breaker/ConsecutiveBreaker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { IBreaker } from './Breaker';

export class ConsecutiveBreaker implements IBreaker {
private count = 0;
/**
* @inheritdoc
*/
public state = 0;

/**
* ConsecutiveBreaker breaks if more than `threshold` exceptions are received
Expand All @@ -13,13 +16,13 @@ export class ConsecutiveBreaker implements IBreaker {
* @inheritdoc
*/
public success() {
this.count = 0;
this.state = 0;
}

/**
* @inheritdoc
*/
public failure() {
return ++this.count >= this.threshold;
return ++this.state >= this.threshold;
}
}
19 changes: 19 additions & 0 deletions src/breaker/CountBreaker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,25 @@ describe('CountBreaker', () => {
expect(state.currentSample).to.equal(4);
expect(state.samples).to.deep.equal([true, false, true, true, true]);
});


it('serializes and deserializes', () => {
let b = new CountBreaker({ threshold: 0.5, size: 5 });
for (let i = 0; i < 9; i++) {
if (i % 3 === 0) {
b.failure(CircuitState.Closed);
} else {
b.success(CircuitState.Closed);
}
const state = b.state;
b = new CountBreaker({ threshold: 0.5, size: 5 });
b.state = state;
}

const state = getState(b);
expect(state.currentSample).to.equal(4);
expect(state.samples).to.deep.equal([true, false, true, true, true]);
});
});

describe('functionality', () => {
Expand Down
38 changes: 32 additions & 6 deletions src/breaker/CountBreaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,47 @@ export interface ICountBreakerOptions {
minimumNumberOfCalls?: number;
}


interface ICountBreakerState {
samples: (boolean | null)[];
currentSample: number;
failures: number;
successes: number;
}


export class CountBreaker implements IBreaker {
private readonly threshold: number;
private readonly minimumNumberOfCalls: number;

/**
* The samples in the sliding window. `true` means "success", `false` means
* "failure" and `undefined` means that there is no sample yet.
* "failure" and `null` means that there is no sample yet.
*/
private readonly samples: (boolean | undefined)[];
private samples: (boolean | null)[];
private successes = 0;
private failures = 0;
private currentSample = 0;

/**
* @inheritdoc
*/
public get state(): unknown {
return {
samples: this.samples,
currentSample: this.currentSample,
failures: this.failures,
successes: this.successes,
} satisfies ICountBreakerState;
}

/**
* @inheritdoc
*/
public set state(value: unknown) {
Object.assign(this, value);
}

/**
* CountBreaker breaks if more than `threshold` percentage of the last `size`
* calls failed, so long as at least `minimumNumberOfCalls` calls have been
Expand All @@ -59,7 +87,7 @@ export class CountBreaker implements IBreaker {

this.threshold = threshold;
this.minimumNumberOfCalls = minimumNumberOfCalls;
this.samples = Array.from<undefined>({ length: size }).fill(undefined);
this.samples = Array.from({ length: size }, () => null);
}

/**
Expand Down Expand Up @@ -97,9 +125,7 @@ export class CountBreaker implements IBreaker {
}

private reset() {
for (let i = 0; i < this.samples.length; i++) {
this.samples[i] = undefined;
}
this.samples.fill(null);
this.successes = 0;
this.failures = 0;
}
Expand Down
31 changes: 31 additions & 0 deletions src/breaker/SamplingBreaker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,39 @@ describe('SamplingBreaker', () => {
{ failures: 4, successes: 8, startedAt: 4000 },
]);
});

it('serializes and deserializes', () => {
for (let i = 0; i < 7; i++) {
for (let k = 0; k < i; k++) {
b.failure(CircuitState.Closed);
b.success(CircuitState.Closed);
b.success(CircuitState.Closed);
}

clock.tick(1000);
const b2 = new SamplingBreaker({ threshold: 0.5, duration: 5_000, minimumRps: 3 });
b2.state = b.state;
b = b2;
}

expect(getState(b)).to.containSubset({
currentFailures: 20,
currentSuccesses: 40,
currentWindow: 1,
});
expect(getState(b).windows).to.deep.equal([
{ failures: 5, successes: 10, startedAt: 5000 },
{ failures: 6, successes: 12, startedAt: 6000 },
{ failures: 2, successes: 4, startedAt: 2000 },
{ failures: 3, successes: 6, startedAt: 3000 },
{ failures: 4, successes: 8, startedAt: 4000 },
]);
});
});




describe('functionality', () => {
let b: SamplingBreaker;
let clock: SinonFakeTimers;
Expand Down
26 changes: 26 additions & 0 deletions src/breaker/SamplingBreaker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ export interface ISamplingBreakerOptions {
minimumRps?: number;
}

interface ISamplingBreakerState {
windows: IWindow[];
currentWindow: number;
currentFailures: number;
currentSuccesses: number;
}

export class SamplingBreaker implements IBreaker {
private readonly threshold: number;
private readonly minimumRpms: number;
Expand All @@ -37,6 +44,25 @@ export class SamplingBreaker implements IBreaker {
private currentFailures = 0;
private currentSuccesses = 0;

/**
* @inheritdoc
*/
public get state(): unknown {
return {
windows: this.windows,
currentWindow: this.currentWindow,
currentFailures: this.currentFailures,
currentSuccesses: this.currentSuccesses,
} satisfies ISamplingBreakerState;
}

/**
* @inheritdoc
*/
public set state(value: unknown) {
Object.assign(this, value);
}

/**
* SamplingBreaker breaks if more than `threshold` percentage of calls over the
* last `samplingDuration`, so long as there's at least `minimumRps` (to avoid
Expand Down

0 comments on commit 6015acd

Please sign in to comment.