Skip to content

Commit

Permalink
feat!: turn prometheus middleware into collector
Browse files Browse the repository at this point in the history
This makes integration with the "recursive collectors" pattern easier, since no additional registerer is necessary.
Users who already have a prometheus.Registerer in scope can explicitly register the middleware.

BREAKING CHANGE: all prometheus usage must be refactored
  • Loading branch information
costela committed Feb 14, 2024
1 parent f2c0810 commit 018d790
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 85 deletions.
146 changes: 76 additions & 70 deletions extensions/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package hogprom
import (
"context"
"errors"
"fmt"
"strconv"
"time"

Expand All @@ -13,95 +12,102 @@ import (

const (
namespace = "hoglet"
subsystem = "circuit"
)

// WithPrometheusMetrics returns a [hoglet.BreakerMiddleware] that registers prometheus metrics for the circuit.
// NewCollector returns a [hoglet.BreakerMiddleware] that exposes prometheus metrics for the circuit.
// It implements prometheus.Collector and can therefore be registered with a prometheus.Registerer.
//
// ⚠️ Note: the provided name must be unique across all hoglet instances using the same registerer.
func WithPrometheusMetrics(circuitName string, reg prometheus.Registerer) hoglet.BreakerMiddleware {
return func(next hoglet.ObserverFactory) (hoglet.ObserverFactory, error) {
callDurations := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "circuit",
Name: "call_durations_seconds",
Help: "Call durations in seconds",
ConstLabels: prometheus.Labels{
"circuit": circuitName,
},
// ⚠️ Note: the provided name must be unique across all hoglet instances ultimately registered to the same
// prometheus.Registerer.
func NewCollector(circuitName string) *Middleware {
callDurations := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "call_durations_seconds",
Help: "Call durations in seconds",
ConstLabels: prometheus.Labels{
"circuit": circuitName,
},
[]string{"success"},
)

droppedCalls := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "circuit",
Name: "dropped_calls_total",
Help: "Total number of calls with an open circuit (i.e.: calls that did not reach the wrapped function)",
ConstLabels: prometheus.Labels{
"circuit": circuitName,
},
},
[]string{"success"},
)

droppedCalls := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "dropped_calls_total",
Help: "Total number of calls with an open circuit (i.e.: calls that did not reach the wrapped function)",
ConstLabels: prometheus.Labels{
"circuit": circuitName,
},
[]string{"cause"},
)

inflightCalls := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "circuit",
Name: "inflight_calls_current",
Help: "Current number of calls in-flight",
ConstLabels: prometheus.Labels{
"circuit": circuitName,
},
},
[]string{"cause"},
)

inflightCalls := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "inflight_calls_current",
Help: "Current number of calls in-flight",
ConstLabels: prometheus.Labels{
"circuit": circuitName,
},
)

for _, c := range []prometheus.Collector{
callDurations,
droppedCalls,
inflightCalls,
} {
if err := reg.Register(c); err != nil {
return nil, fmt.Errorf("hoglet: registering collector: %w", err)
}
}

return &prometheusObserverFactory{
next: next,
},
)

timesource: wallclock{},

callDurations: callDurations,
droppedCalls: droppedCalls,
inflightCalls: inflightCalls,
}, nil
return &Middleware{
callDurations: callDurations,
droppedCalls: droppedCalls,
inflightCalls: inflightCalls,
}
}

type prometheusObserverFactory struct {
next hoglet.ObserverFactory

timesource timesource

type Middleware struct {
callDurations *prometheus.HistogramVec
droppedCalls *prometheus.CounterVec
inflightCalls prometheus.Gauge
}

func (pos *prometheusObserverFactory) ObserverForCall(ctx context.Context, state hoglet.State) (hoglet.Observer, error) {
o, err := pos.next.ObserverForCall(ctx, state)
func (m Middleware) Collect(ch chan<- prometheus.Metric) {
m.callDurations.Collect(ch)
m.droppedCalls.Collect(ch)
m.inflightCalls.Collect(ch)
}

func (m Middleware) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(m, ch)
}

func (m Middleware) Wrap(of hoglet.ObserverFactory) (hoglet.ObserverFactory, error) {
return &wrappedMiddleware{
Middleware: m,
next: of,
timesource: wallclock{},
}, nil
}

type wrappedMiddleware struct {
Middleware
next hoglet.ObserverFactory
timesource timesource
}

func (wm *wrappedMiddleware) ObserverForCall(ctx context.Context, state hoglet.State) (hoglet.Observer, error) {
o, err := wm.next.ObserverForCall(ctx, state)
if err != nil {
pos.droppedCalls.WithLabelValues(errToCause(err)).Inc()
wm.droppedCalls.WithLabelValues(errToCause(err)).Inc()
return nil, err
}
start := pos.timesource.Now()
pos.inflightCalls.Inc()
start := wm.timesource.Now()
wm.inflightCalls.Inc()
return hoglet.ObserverFunc(func(b bool) {
// invert failure → success to make the metric more intuitive
pos.callDurations.WithLabelValues(strconv.FormatBool(!b)).Observe(pos.timesource.Since(start).Seconds())
pos.inflightCalls.Dec()
wm.callDurations.WithLabelValues(strconv.FormatBool(!b)).Observe(wm.timesource.Since(start).Seconds())
wm.inflightCalls.Dec()
o.Observe(b)
}), nil
}
Expand Down
16 changes: 7 additions & 9 deletions extensions/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/exaring/hoglet"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -44,21 +43,20 @@ func (m mockTimesource) Since(t time.Time) time.Duration {
}

func TestWithPrometheusMetrics(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
m := WithPrometheusMetrics("test", reg)
of, err := m(&mockObserverFactory{})
m := NewCollector("test")
of, err := m.Wrap(&mockObserverFactory{})
require.NoError(t, err)

mt := &mockTimesource{time.Now()}

of.(*prometheusObserverFactory).timesource = mt
of.(*wrappedMiddleware).timesource = mt

inflightOut0 := `# HELP hoglet_circuit_inflight_calls_current Current number of calls in-flight
# TYPE hoglet_circuit_inflight_calls_current gauge
hoglet_circuit_inflight_calls_current{circuit="test"} 0
`

if err := testutil.GatherAndCompare(reg, strings.NewReader(inflightOut0)); err != nil {
if err := testutil.CollectAndCompare(m, strings.NewReader(inflightOut0)); err != nil {
t.Fatal(err)
}

Expand All @@ -72,7 +70,7 @@ func TestWithPrometheusMetrics(t *testing.T) {
# TYPE hoglet_circuit_inflight_calls_current gauge
hoglet_circuit_inflight_calls_current{circuit="test"} 0
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(droppedOut1)); err != nil {
if err := testutil.CollectAndCompare(m, strings.NewReader(droppedOut1)); err != nil {
t.Fatal(err)
}

Expand All @@ -86,7 +84,7 @@ func TestWithPrometheusMetrics(t *testing.T) {
# TYPE hoglet_circuit_inflight_calls_current gauge
hoglet_circuit_inflight_calls_current{circuit="test"} 1
`
if err := testutil.GatherAndCompare(reg, strings.NewReader(inflightOut1)); err != nil {
if err := testutil.CollectAndCompare(m, strings.NewReader(inflightOut1)); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -118,7 +116,7 @@ func TestWithPrometheusMetrics(t *testing.T) {
hoglet_circuit_inflight_calls_current{circuit="test"} 0
`

if err := testutil.GatherAndCompare(reg, strings.NewReader(durationsOut1)); err != nil {
if err := testutil.CollectAndCompare(m, strings.NewReader(durationsOut1)); err != nil {
t.Fatal(err)
}
}
12 changes: 10 additions & 2 deletions hoglet.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,16 @@ type ObserverFactory interface {
ObserverForCall(context.Context, State) (Observer, error)
}

// BreakerMiddleware is a function that wraps an [ObserverFactory] and returns a new [ObserverFactory].
type BreakerMiddleware func(ObserverFactory) (ObserverFactory, error)
// BreakerMiddleware wraps an [ObserverFactory] and returns a new [ObserverFactory].
type BreakerMiddleware interface {
Wrap(ObserverFactory) (ObserverFactory, error)
}

type BreakerMiddlewareFunc func(ObserverFactory) (ObserverFactory, error)

func (f BreakerMiddlewareFunc) Wrap(of ObserverFactory) (ObserverFactory, error) {
return f(of)
}

// WrappedFunc is the type of the function wrapped by a Breaker.
type WrappedFunc[IN, OUT any] func(context.Context, IN) (OUT, error)
Expand Down
4 changes: 2 additions & 2 deletions limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// - or blocks until a slot is available if blocking is true, potentially returning [ErrWaitingForSlot]. The returned
// error wraps the underlying cause (e.g. [context.Canceled] or [context.DeadlineExceeded]).
func ConcurrencyLimiter(limit int64, block bool) BreakerMiddleware {
return func(next ObserverFactory) (ObserverFactory, error) {
return BreakerMiddlewareFunc(func(next ObserverFactory) (ObserverFactory, error) {
cl := concurrencyLimiter{
sem: semaphore.NewWeighted(limit),
next: next,
Expand All @@ -26,7 +26,7 @@ func ConcurrencyLimiter(limit int64, block bool) BreakerMiddleware {
return concurrencyLimiterNonBlocking{
concurrencyLimiter: cl,
}, nil
}
})
}

type concurrencyLimiter struct {
Expand Down
2 changes: 1 addition & 1 deletion limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func Test_ConcurrencyLimiter(t *testing.T) {
defer wgStop.Wait()

cl := hoglet.ConcurrencyLimiter(tt.args.limit, tt.args.block)
of, err := cl(mockObserverFactory{})
of, err := cl.Wrap(mockObserverFactory{})
require.NoError(t, err)
for i := 0; i < tt.calls; i++ {
wantPanic := tt.wantPanicOn != nil && *tt.wantPanicOn == i
Expand Down
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func IgnoreContextCanceled(err error) bool {
// middleware and should therefore be AFTER it in the parameter list.
func WithBreakerMiddleware(bm BreakerMiddleware) Option {
return optionFunc(func(o *options) error {
b, err := bm(o.observerFactory)
b, err := bm.Wrap(o.observerFactory)
if err != nil {
return fmt.Errorf("creating middleware: %w", err)
}
Expand Down

0 comments on commit 018d790

Please sign in to comment.