diff --git a/alertobserver/alertobserver.go b/alertobserver/alertobserver.go index 708942175f..cf05d9210c 100644 --- a/alertobserver/alertobserver.go +++ b/alertobserver/alertobserver.go @@ -18,14 +18,15 @@ import ( ) const ( - EventAlertReceived string = "received" - EventAlertRejected string = "rejected" - EventAlertAddedToAggrGroup string = "addedAggrGroup" - EventAlertPipelineStart string = "pipelineStart" - EventAlertPipelinePassStage string = "pipelinePassStage" - EventAlertMuted string = "muted" - EventAlertSent string = "sent" - EventAlertSendFailed string = "sendFailed" + EventAlertReceived string = "received" + EventAlertRejected string = "rejected" + EventAlertAddedToAggrGroup string = "addedAggrGroup" + EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup" + EventAlertPipelineStart string = "pipelineStart" + EventAlertPipelinePassStage string = "pipelinePassStage" + EventAlertMuted string = "muted" + EventAlertSent string = "sent" + EventAlertSendFailed string = "sendFailed" ) type AlertEventMeta map[string]interface{} diff --git a/alertobserver/testing.go b/alertobserver/testing.go index 45e3611fba..66f774fbb7 100644 --- a/alertobserver/testing.go +++ b/alertobserver/testing.go @@ -14,6 +14,8 @@ package alertobserver import ( + "sync" + "github.com/prometheus/alertmanager/types" ) @@ -21,9 +23,12 @@ type FakeLifeCycleObserver struct { AlertsPerEvent map[string][]*types.Alert PipelineStageAlerts map[string][]*types.Alert MetaPerEvent map[string][]AlertEventMeta + Mtx sync.RWMutex } func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) { + o.Mtx.Lock() + defer o.Mtx.Unlock() if event == EventAlertPipelinePassStage { o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...) } else { diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 4b7dc31ec9..833c70a544 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -332,7 +332,11 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // If the group does not exist, create it. But check the limit first. if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { d.metrics.aggrGroupLimitReached.Inc() - level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + errMsg := "Too many aggregation groups, cannot create new group for alert" + level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg}) + } return } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 6a3bfdd166..e026dffb84 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -596,15 +596,15 @@ route: recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} m := NewDispatcherMetrics(true, prometheus.NewRegistry()) observer := alertobserver.NewFakeLifeCycleObserver() - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, m, observer) + lim := limits{groups: 1} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer) go dispatcher.Run() defer dispatcher.Stop() // Create alerts. the dispatcher will automatically create the groups. - inputAlerts := []*types.Alert{ - newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}), - } - err = alerts.Put(inputAlerts...) + alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}) + alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"}) + err = alerts.Put(alert1) if err != nil { t.Fatal(err) } @@ -612,11 +612,24 @@ route: for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ { time.Sleep(200 * time.Millisecond) } + err = alerts.Put(alert2) + if err != nil { + t.Fatal(err) + } + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + observer.Mtx.RLock() + defer observer.Mtx.RUnlock() require.Equal(t, 1, len(recorder.Alerts())) - require.Equal(t, inputAlerts[0].Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) - groupFp := getGroupLabels(inputAlerts[0], route).Fingerprint() + require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) + groupFp := getGroupLabels(alert1, route).Fingerprint() groupKey := dispatcher.aggrGroupsPerRoute[route][groupFp].GroupKey() require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string)) + + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup])) + require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint()) } type recordStage struct { diff --git a/notify/notify.go b/notify/notify.go index 30ab453160..43117c47af 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -837,7 +837,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale "integration": r.integration.Name(), "stageName": "RetryStage", } - r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m) + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m) } return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) } @@ -860,7 +860,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale "integration": r.integration.Name(), "stageName": "RetryStage", } - r.alertLCObserver.Observe(alertobserver.EventAlertSent, alerts, m) + r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m) } lvl.Log("msg", "Notify success", "attempts", i) @@ -877,7 +877,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale "integration": r.integration.Name(), "stageName": "RetryStage", } - r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m) + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m) } return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) }