Skip to content

Commit

Permalink
refactor: hide more internal API; move half-open delay to option
Browse files Browse the repository at this point in the history
  • Loading branch information
costela committed Oct 10, 2023
1 parent 5bcc25e commit 20f8c01
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 207 deletions.
25 changes: 21 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,32 @@ h := hoglet.NewCircuit(
}
return Foo{}, fmt.Errorf("bar is not 42")
},
hoglet.NewSlidingWindowBreaker(10, 0.1, 5*time.Second),
hoglet.NewSlidingWindowBreaker(10, 0.1),
hoglet.WithFailureCondition(hoglet.IgnoreContextCancelation),
)
f, _ := h.Do(context.Background(), 42)
f, _ := h.Call(context.Background(), 42)
fmt.Println(f.Bar) // 42

_, err = h.Do(context.Background(), 0)
_, err = h.Call(context.Background(), 0)
fmt.Println(err) // bar is not 42

_, err = h.Do(context.Background(), 42)
_, err = h.Call(context.Background(), 42)
fmt.Println(err) // hoglet: breaker is open
```

## Operation

Each call to the wrapped function (via `Circuit.Call`) is tracked and its result "observed". Breakers then react to
these observations according to their own logic, optionally opening the circuit.

An open circuit does not allow any calls to go through, and will return an error immediately.

If the wrapped function blocks, `Circuit.Call` will block as well, but any context cancelations or expirations will
count towards the failure rate, allowing the circuit to respond timely to failures, while still having well-defined and
non-racy behavior around the failed function.


## Design

Hoglet prefers throughput to correctness (e.g. by avoiding locks), which means it cannot guarantee an exact number of
calls will go through.
162 changes: 59 additions & 103 deletions breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,69 +7,20 @@ import (
"time"
)

// state is unexported because its usefulness is unclear outside of tests.
type state int

const (
stateClosed state = iota
stateHalfOpen
stateOpen
)

func (s state) String() string {
switch s {
case stateClosed:
return "closed"
case stateHalfOpen:
return "half-open"
case stateOpen:
return "open"
default:
return "unknown"
}
// untypedCircuit is used to avoid type annotations when implementing a breaker.
type untypedCircuit interface {
stateForCall() State
setOpenedAt(int64)
}

type Observable interface {
// Observe is called after the wrapped function returns. If [Circuit.Do] returns a non-nil [Observable], it will be
type observer interface {
// observe is called after the wrapped function returns. If [Circuit.Do] returns a non-nil [Observable], it will be
// called exactly once.
Observe(failure bool)
}

// callable encapsulates the common open/half-open/closed logic for all breakers. Each breaker implements decision of
// *when* to open based on its own state.
type callable struct {
halfOpenDelay time.Duration

// State

openedAt atomic.Int64 // unix microseconds
}

// state checks if the callable can be called. If it returns true, the caller MUST then proceed to Call() to ensure
// the half-open logic is respected.
func (c *callable) state() state {
oa := c.openedAt.Load()

if oa == 0 {
// closed
return stateClosed
}

if time.Since(time.UnixMicro(oa)) < c.halfOpenDelay {
// open
return stateOpen
}

// We reset openedAt to block further calls to pass through when half-open. A success will cause the breaker to
// close.
// CompareAndSwap is needed to avoid clobbering another goroutine's openedAt value.
c.openedAt.CompareAndSwap(oa, time.Now().UnixMicro())
// half-open
return stateHalfOpen
observe(failure bool)
}

// newObservableCall creates a new [Observable] that ensure it can only be observed a single time.
func newObservableCall(f func(bool)) Observable {
func newObservableCall(f func(bool)) observer {
o := sync.Once{}
return observableCall(func(failure bool) {
o.Do(func() {
Expand All @@ -82,61 +33,62 @@ func newObservableCall(f func(bool)) Observable {
// It should be instantiated via [newObservableCall] to ensure the observer is only called once.
type observableCall func(bool)

func (o observableCall) Observe(failure bool) {
func (o observableCall) observe(failure bool) {
o(failure)
}

// EWMABreaker is a [Breaker] that uses an exponentially weighted moving failure rate between 0 and 1. Each failure
// counting as 1, and each success as 0.
// It assumes the wrapped function is called with an approximately constant interval and will skew results otherwise.
// A zero EWMABreaker is ready to use, but will never open.
// EWMABreaker is a [Breaker] that uses an exponentially weighted moving failure rate. See [NewEWMABreaker] for details.
//
// Compared to the [SlidingWindowBreaker], this breaker responds faster to failure bursts, but is more lenient with
// constant failure rates.
// A zero EWMABreaker is ready to use, but will never open.
type EWMABreaker struct {
decay float64
threshold float64
circuit untypedCircuit

// State
failureRate atomic.Value
callable
}

// NewEWMABreaker creates a new [EWMABreaker] with the given sample count and threshold.
// NewEWMABreaker creates a new [EWMABreaker] with the given sample count and threshold. It uses an Exponentially
// Weighted Moving Average to calculate the current failure rate.
//
// ⚠️ This is an observation-based breaker, which means it requires new calls to be able to update the failure rate, and
// therefore REQUIRES the circuit to set a half-open threshold via [WithHalfOpenDelay]. Otherwise an open circuit will
// never observe any successes and thus never close.
//
// Compared to the [SlidingWindowBreaker], this breaker responds faster to failure bursts, but is more lenient with
// constant failure rates.
//
// The sample count is used to determine how fast previous observations "decay". A value of 1 causes a single sample to
// be considered. A higher value slows down convergence. As a rule of thumb, breakers with higher throughput should use
// higher sample counts to avoid opening up on small hiccups.
//
// The threshold is the failure rate above which the breaker should open.
//
// The halfOpenDelay is the duration the breaker will stay open before switching to the half-open state, where a
// limited amount of calls are allowed and - if successful - may re-close the breaker.
// Setting it to 0 will cause the breaker to effectively never open.
func NewEWMABreaker(sampleCount int, threshold float64, halfOpenDelay time.Duration) *EWMABreaker {
// The failureThreshold is the failure rate above which the breaker should open (0.0-1.0).
func NewEWMABreaker(sampleCount int, failureThreshold float64) *EWMABreaker {
e := &EWMABreaker{
// https://en.wikipedia.org/wiki/Exponential_smoothing
decay: 2 / (float64(sampleCount)/2 + 1),
threshold: threshold,
callable: callable{
halfOpenDelay: halfOpenDelay,
},
threshold: failureThreshold,
}

e.failureRate.Store(float64(math.SmallestNonzeroFloat64)) // start closed; also work around "initial value" problem

return e
}

func (e *EWMABreaker) Call() Observable {
state := e.callable.state()
func (e *EWMABreaker) connect(c untypedCircuit) {
e.circuit = c
}

func (e *EWMABreaker) observerForCall() observer {
state := e.circuit.stateForCall()

if state == stateOpen {
if state == StateOpen {
return nil
}

return newObservableCall(func(failure bool) {
e.observe(state == stateHalfOpen, failure)
e.observe(state == StateHalfOpen, failure)
})
}

Expand All @@ -146,7 +98,7 @@ func (e *EWMABreaker) observe(halfOpen, failure bool) {
}

if !failure && halfOpen {
e.openedAt.Store(0)
e.circuit.setOpenedAt(0)
e.failureRate.Store(e.threshold)
return
}
Expand All @@ -166,16 +118,17 @@ func (e *EWMABreaker) observe(halfOpen, failure bool) {
}

if failureRate > e.threshold {
e.openedAt.CompareAndSwap(0, time.Now().UnixMicro())
e.circuit.setOpenedAt(time.Now().UnixMicro())
} else {
e.openedAt.Store(0)
e.circuit.setOpenedAt(0)
}
}

// SlidingWindowBreaker is a [Breaker] that uses a sliding window to determine the error rate.
type SlidingWindowBreaker struct {
windowSize time.Duration
threshold float64
circuit untypedCircuit

// State

Expand All @@ -184,39 +137,42 @@ type SlidingWindowBreaker struct {
currentFailureCount atomic.Int64
lastSuccessCount atomic.Int64
lastFailureCount atomic.Int64

callable
}

// NewSlidingWindowBreaker creates a new [SlidingWindowBreaker] with the given window size, failure rate threshold and
// half-open delay.
// NewSlidingWindowBreaker creates a new [SlidingWindowBreaker] with the given window size and failure rate threshold.
//
// If no observations are made in the window, the breaker will default to closed.
// This means: if halfOpenDelay is bigger than windowSize, the breaker will never enter half-open state and instead
// directly close.
// This is a time-based breaker, which means it will revert back to closed after its window size has passed: if no
// observations are made in the window, the failure rate is effectively zero.
// This also means: if the circuit has a halfOpenDelay and it is bigger than windowSize, the breaker will never enter
// half-open state and will directly close instead.
// Conversely, if halfOpenDelay is smaller than windowSize, the errors observed in the last window will still count
// proportionally in half-open state, which will lead to faster re-opening on errors.
func NewSlidingWindowBreaker(windowSize time.Duration, threshold float64, halfOpenDelay time.Duration) *SlidingWindowBreaker {
//
// The windowSize is the time interval over which to calculate the failure rate.
//
// The failureThreshold is the failure rate above which the breaker should open (0.0-1.0).
func NewSlidingWindowBreaker(windowSize time.Duration, failureThreshold float64) *SlidingWindowBreaker {
s := &SlidingWindowBreaker{
windowSize: windowSize,
threshold: threshold,
callable: callable{
halfOpenDelay: halfOpenDelay,
},
threshold: failureThreshold,
}

return s
}

func (s *SlidingWindowBreaker) Call() Observable {
state := s.callable.state()
func (s *SlidingWindowBreaker) connect(c untypedCircuit) {
s.circuit = c
}

func (s *SlidingWindowBreaker) observerForCall() observer {
state := s.circuit.stateForCall()

if state == stateOpen {
if state == StateOpen {
return nil
}

return newObservableCall(func(failure bool) {
s.observe(state == stateHalfOpen, failure)
s.observe(state == StateHalfOpen, failure)
})
}

Expand All @@ -229,7 +185,7 @@ func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
)

if !failure && halfOpen {
s.openedAt.Store(0)
s.circuit.setOpenedAt(0)
return
}

Expand All @@ -238,7 +194,7 @@ func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
if currentStartMicros := s.currentStart.Load(); sinceMicros(currentStartMicros) > s.windowSize && s.currentStart.CompareAndSwap(currentStartMicros, time.Now().UnixMicro()) {
lastFailureCount = s.lastFailureCount.Swap(s.currentFailureCount.Swap(0))
lastSuccessCount = s.lastSuccessCount.Swap(s.currentSuccessCount.Swap(0))
s.openedAt.Store(0)
s.circuit.setOpenedAt(0)
} else {
lastFailureCount = s.lastFailureCount.Load()
lastSuccessCount = s.lastSuccessCount.Load()
Expand All @@ -261,9 +217,9 @@ func (s *SlidingWindowBreaker) observe(halfOpen, failure bool) {
failureRate := weightedFailures / weightedTotal

if failureRate > s.threshold {
s.openedAt.CompareAndSwap(0, time.Now().UnixMicro())
s.circuit.setOpenedAt(time.Now().UnixMicro())
} else {
s.openedAt.Store(0)
s.circuit.setOpenedAt(0)
}
}

Expand Down
Loading

0 comments on commit 20f8c01

Please sign in to comment.