Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Oct 11, 2023
1 parent 261befb commit 9a6a3ea
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 19 deletions.
17 changes: 9 additions & 8 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
)

const (
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta map[string]interface{}
Expand Down
5 changes: 5 additions & 0 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
package alertobserver

import (
"sync"

"github.com/prometheus/alertmanager/types"
)

type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
MetaPerEvent map[string][]AlertEventMeta
Mtx sync.RWMutex
}

func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
o.Mtx.Lock()
defer o.Mtx.Unlock()
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
} else {
Expand Down
6 changes: 5 additions & 1 deletion dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,11 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// If the group does not exist, create it. But check the limit first.
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
d.metrics.aggrGroupLimitReached.Inc()
level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
errMsg := "Too many aggregation groups, cannot create new group for alert"
level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
if d.alertLCObserver != nil {
d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg})
}
return
}

Expand Down
27 changes: 20 additions & 7 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,27 +596,40 @@ route:
recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
m := NewDispatcherMetrics(true, prometheus.NewRegistry())
observer := alertobserver.NewFakeLifeCycleObserver()
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, m, observer)
lim := limits{groups: 1}
dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer)
go dispatcher.Run()
defer dispatcher.Stop()

// Create alerts. the dispatcher will automatically create the groups.
inputAlerts := []*types.Alert{
newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
}
err = alerts.Put(inputAlerts...)
alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"})
alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"})
err = alerts.Put(alert1)
if err != nil {
t.Fatal(err)
}
// Let alerts get processed.
for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
err = alerts.Put(alert2)
if err != nil {
t.Fatal(err)
}
// Let alert get processed.
for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ {
time.Sleep(200 * time.Millisecond)
}
observer.Mtx.RLock()
defer observer.Mtx.RUnlock()
require.Equal(t, 1, len(recorder.Alerts()))
require.Equal(t, inputAlerts[0].Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint())
groupFp := getGroupLabels(inputAlerts[0], route).Fingerprint()
require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint())
groupFp := getGroupLabels(alert1, route).Fingerprint()
groupKey := dispatcher.aggrGroupsPerRoute[route][groupFp].GroupKey()
require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string))

require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup]))
require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint())
}

type recordStage struct {
Expand Down
6 changes: 3 additions & 3 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m)
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m)
}
return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i)
}
Expand All @@ -860,7 +860,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSent, alerts, m)
r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m)
}
lvl.Log("msg", "Notify success", "attempts", i)

Expand All @@ -877,7 +877,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m)
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m)
}
return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
}
Expand Down

0 comments on commit 9a6a3ea

Please sign in to comment.