Skip to content

Commit

Permalink
Simplify alert LF observer interface
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Sep 5, 2023
1 parent 3370982 commit a8d13d2
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 142 deletions.
28 changes: 19 additions & 9 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@ package alertobserver

import (
"context"

"github.com/prometheus/alertmanager/types"
)

type AlertLifeCycleObserver interface {
Received(alerts ...*types.Alert)
Rejected(reason string, alerts ...*types.Alert)
AddedAggrGroup(groupKey string, alert *types.Alert)
PipelineStart(ctx context.Context, alerts ...*types.Alert)
PipelinePassStage(ctx context.Context, stageName string, alerts ...*types.Alert)
Sent(ctx context.Context, integration string, alerts ...*types.Alert)
SendFailed(ctx context.Context, integration, reason string, alerts ...*types.Alert)
const (
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta struct {
Ctx context.Context
Msg string
Integration string
StageName string
}

type LifeCycleObserver interface {
Observe(event string, alerts []*types.Alert, meta *AlertEventMeta)
}
53 changes: 11 additions & 42 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,25 @@
package alertobserver

import (
"context"

"github.com/prometheus/alertmanager/types"
)

type FakeAlertLifeCycleObserver struct {
ReceivedAlerts []*types.Alert
RejectedAlerts []*types.Alert
SentAlerts []*types.Alert
FailedAlerts []*types.Alert
AggrGroupAlerts []*types.Alert
PipelineAlerts []*types.Alert
type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
}

func (o *FakeAlertLifeCycleObserver) Received(alerts ...*types.Alert) {
o.ReceivedAlerts = append(o.ReceivedAlerts, alerts...)
}

func (o *FakeAlertLifeCycleObserver) Rejected(reason string, alerts ...*types.Alert) {
o.RejectedAlerts = append(o.RejectedAlerts, alerts...)
}

func (o *FakeAlertLifeCycleObserver) AddedAggrGroup(groupKey string, alert *types.Alert) {
o.AggrGroupAlerts = append(o.AggrGroupAlerts, alert)
}

func (o *FakeAlertLifeCycleObserver) PipelineStart(ctx context.Context, alerts ...*types.Alert) {
o.PipelineAlerts = append(o.PipelineAlerts, alerts...)
}

func (o *FakeAlertLifeCycleObserver) Sent(ctx context.Context, integration string, alerts ...*types.Alert) {
o.SentAlerts = append(o.SentAlerts, alerts...)
}

func (o *FakeAlertLifeCycleObserver) SendFailed(
ctx context.Context,
integration string,
reason string,
alerts ...*types.Alert,
) {
o.FailedAlerts = append(o.FailedAlerts, alerts...)
}

func (o *FakeAlertLifeCycleObserver) PipelinePassStage(ctx context.Context, stageName string, alerts ...*types.Alert) {
o.PipelineStageAlerts[stageName] = append(o.PipelineStageAlerts[stageName], alerts...)
func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta *AlertEventMeta) {
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta.StageName] = append(o.PipelineStageAlerts[meta.StageName], alerts...)
} else {
o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...)
}
}

func NewFakeAlertLifeCycleObserver() *FakeAlertLifeCycleObserver {
return &FakeAlertLifeCycleObserver{
func NewFakeLifeCycleObserver() *FakeLifeCycleObserver {
return &FakeLifeCycleObserver{
PipelineStageAlerts: map[string][]*types.Alert{},
AlertsPerEvent: map[string][]*types.Alert{},
}
}
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Options struct {
GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string)
// AlertLCObserver is used to add hooks to the different alert life cycle events.
// If nil then no observer methods will be invoked in the life cycle events.
AlertLCObserver alertobserver.AlertLifeCycleObserver
AlertLCObserver alertobserver.LifeCycleObserver
}

func (o Options) validate() error {
Expand Down
12 changes: 7 additions & 5 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type API struct {
peer cluster.ClusterPeer
logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.AlertLifeCycleObserver
alertLCObserver alertobserver.LifeCycleObserver

getAlertStatus getAlertStatusFn

Expand All @@ -93,7 +93,7 @@ func New(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.AlertLifeCycleObserver,
o alertobserver.LifeCycleObserver,
) *API {
if l == nil {
l = log.NewNopLogger()
Expand Down Expand Up @@ -452,7 +452,8 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
api.alertLCObserver.Rejected(err.Error(), a)
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
Expand All @@ -464,12 +465,13 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
err: err,
}, nil)
if api.alertLCObserver != nil {
api.alertLCObserver.Rejected(err.Error(), validAlerts...)
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return
}
if api.alertLCObserver != nil {
api.alertLCObserver.Received(validAlerts...)
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, &alertobserver.AlertEventMeta{})
}

if validationErrs.Len() > 0 {
Expand Down
12 changes: 6 additions & 6 deletions api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestAddAlerts(t *testing.T) {

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))

observer := alertobserver.NewFakeAlertLifeCycleObserver()
observer := alertobserver.NewFakeLifeCycleObserver()
api.alertLCObserver = observer
r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w = httptest.NewRecorder()
Expand All @@ -164,9 +164,9 @@ func TestAddAlerts(t *testing.T) {
}
api.addAlerts(w, r)
if tc.code == 200 {
require.Equal(t, observer.ReceivedAlerts[0].Fingerprint(), alerts[0].Fingerprint())
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
} else {
require.Equal(t, observer.RejectedAlerts[0].Fingerprint(), alerts[0].Fingerprint())
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
}
}
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestAddAlertsWithAlertLCObserver(t *testing.T) {
}

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
observer := alertobserver.NewFakeAlertLifeCycleObserver()
observer := alertobserver.NewFakeLifeCycleObserver()
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
Expand All @@ -218,9 +218,9 @@ func TestAddAlertsWithAlertLCObserver(t *testing.T) {

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))
if tc.code == 200 {
require.Equal(t, observer.ReceivedAlerts[0].Fingerprint(), alerts[0].Fingerprint())
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
} else {
require.Equal(t, observer.RejectedAlerts[0].Fingerprint(), alerts[0].Fingerprint())
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type API struct {

logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.AlertLifeCycleObserver
alertLCObserver alertobserver.LifeCycleObserver

Handler http.Handler
}
Expand All @@ -91,7 +91,7 @@ func NewAPI(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.AlertLifeCycleObserver,
o alertobserver.LifeCycleObserver,
) (*API, error) {
api := API{
alerts: alerts,
Expand Down Expand Up @@ -355,7 +355,8 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
api.alertLCObserver.Rejected(err.Error(), a)
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
Expand All @@ -364,7 +365,8 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
if err := api.alerts.Put(validAlerts...); err != nil {
level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
if api.alertLCObserver != nil {
api.alertLCObserver.Rejected(err.Error(), validAlerts...)
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
}
Expand All @@ -374,7 +376,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}
if api.alertLCObserver != nil {
api.alertLCObserver.Received(validAlerts...)
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, &alertobserver.AlertEventMeta{})
}

return alert_ops.NewPostAlertsOK()
Expand Down
6 changes: 3 additions & 3 deletions api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func TestPostAlertHandler(t *testing.T) {

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body)))

observer := alertobserver.NewFakeAlertLifeCycleObserver()
observer := alertobserver.NewFakeLifeCycleObserver()
api.alertLCObserver = observer
r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes))
require.NoError(t, err)
Expand All @@ -568,9 +568,9 @@ func TestPostAlertHandler(t *testing.T) {
})
amAlert := OpenAPIAlertsToAlerts(alerts)
if tc.code == 200 {
require.Equal(t, observer.ReceivedAlerts[0].Fingerprint(), amAlert[0].Fingerprint())
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint())
} else {
require.Equal(t, observer.RejectedAlerts[0].Fingerprint(), amAlert[0].Fingerprint())
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint())
}
}
}
1 change: 1 addition & 0 deletions cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func run() int {
timeIntervals,
notificationLog,
pipelinePeer,
nil,
)
configuredReceivers.Set(float64(len(activeReceivers)))
configuredIntegrations.Set(float64(integrationsNum))
Expand Down
16 changes: 7 additions & 9 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Dispatcher struct {
cancel func()

logger log.Logger
alertLCObserver alertobserver.AlertLifeCycleObserver
alertLCObserver alertobserver.LifeCycleObserver
}

// Limits describes limits used by Dispatcher.
Expand All @@ -113,7 +113,7 @@ func NewDispatcher(
lim Limits,
l log.Logger,
m *DispatcherMetrics,
o alertobserver.AlertLifeCycleObserver,
o alertobserver.LifeCycleObserver,
) *Dispatcher {
if lim == nil {
lim = nilLimits{}
Expand All @@ -140,11 +140,7 @@ func (d *Dispatcher) Run() {
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
d.aggrGroupsNum = 0
d.metrics.aggrGroups.Set(0)
ctx := context.Background()
if d.alertLCObserver != nil {
ctx = notify.WithAlertLCObserver(ctx, d.alertLCObserver)
}
d.ctx, d.cancel = context.WithCancel(ctx)
d.ctx, d.cancel = context.WithCancel(context.Background())
d.mtx.Unlock()

d.run(d.alerts.Subscribe())
Expand Down Expand Up @@ -328,7 +324,8 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
if ok {
ag.insert(alert)
if d.alertLCObserver != nil {
d.alertLCObserver.AddedAggrGroup(ag.GroupKey(), alert)
dummyCtx := notify.WithGroupKey(context.TODO(), ag.GroupKey())
d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, &alertobserver.AlertEventMeta{Ctx: dummyCtx})
}
return
}
Expand All @@ -350,7 +347,8 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// alert is already there.
ag.insert(alert)
if d.alertLCObserver != nil {
d.alertLCObserver.AddedAggrGroup(ag.GroupKey(), alert)
dummyCtx := notify.WithGroupKey(context.TODO(), ag.GroupKey())
d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, &alertobserver.AlertEventMeta{Ctx: dummyCtx})
}

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
Expand Down
7 changes: 2 additions & 5 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ route:
timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
m := NewDispatcherMetrics(true, prometheus.NewRegistry())
observer := alertobserver.NewFakeAlertLifeCycleObserver()
observer := alertobserver.NewFakeLifeCycleObserver()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, m, observer)
go dispatcher.Run()
defer dispatcher.Stop()
Expand All @@ -613,10 +613,7 @@ route:
time.Sleep(200 * time.Millisecond)
}
require.Equal(t, 1, len(recorder.Alerts()))
require.Equal(t, inputAlerts[0].Fingerprint(), observer.AggrGroupAlerts[0].Fingerprint())
o, ok := notify.AlertLCObserver(dispatcher.ctx)
require.True(t, ok)
require.Equal(t, observer, o)
require.Equal(t, inputAlerts[0].Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint())
}

type recordStage struct {
Expand Down
Loading

0 comments on commit a8d13d2

Please sign in to comment.