diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index ad444e344..4e165eb91 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -162,8 +162,8 @@ func run() int { hactx, cancelHactx := context.WithCancel(ctx) for hactx.Err() == nil { select { - case <-ha.Takeover(): - logger.Info("Taking over") + case takeoverReason := <-ha.Takeover(): + logger.Infow("Taking over", zap.String("reason", takeoverReason)) go func() { for hactx.Err() == nil { @@ -324,8 +324,8 @@ func run() int { } } }() - case <-ha.Handover(): - logger.Warn("Handing over") + case handoverReason := <-ha.Handover(): + logger.Warnw("Handing over", zap.String("reason", handoverReason)) cancelHactx() case <-hactx.Done(): diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index de31f773f..939446d06 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -23,7 +23,10 @@ import ( "time" ) -var timeout = 60 * time.Second +// peerTimeout defines the timeout for HA heartbeats, being used to detect absent nodes. +// +// Because this timeout relies on icingaredis.Timeout, it is icingaredis.Timeout plus a short grace period. +const peerTimeout = icingaredis.Timeout + 5*time.Second type haState struct { responsibleTsMilli int64 @@ -43,8 +46,8 @@ type HA struct { heartbeat *icingaredis.Heartbeat logger *logging.Logger responsible bool - handover chan struct{} - takeover chan struct{} + handover chan string + takeover chan string done chan struct{} errOnce sync.Once errMu sync.Mutex @@ -64,8 +67,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger db: db, heartbeat: heartbeat, logger: logger, - handover: make(chan struct{}), - takeover: make(chan struct{}), + handover: make(chan string), + takeover: make(chan string), done: make(chan struct{}), } @@ -107,13 +110,13 @@ func (h *HA) Err() error { return h.err } -// Handover returns a channel with which handovers are signaled. -func (h *HA) Handover() chan struct{} { +// Handover returns a channel with which handovers and their reasons are signaled. +func (h *HA) Handover() chan string { return h.handover } -// Takeover returns a channel with which takeovers are signaled. -func (h *HA) Takeover() chan struct{} { +// Takeover returns a channel with which takeovers and their reasons are signaled. +func (h *HA) Takeover() chan string { return h.takeover } @@ -141,9 +144,10 @@ func (h *HA) controller() { oldInstancesRemoved := false - logTicker := time.NewTicker(time.Second * 60) - defer logTicker.Stop() - shouldLog := true + // Suppress recurring log messages in the realize method to be only logged this often. + routineLogTicker := time.NewTicker(5 * time.Minute) + defer routineLogTicker.Stop() + shouldLogRoutineEvents := true for { select { @@ -158,9 +162,9 @@ func (h *HA) controller() { if tt.After(now.Add(1 * time.Second)) { h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) } - if tt.Before(now.Add(-1 * timeout)) { + if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) - h.signalHandover() + h.signalHandover("received heartbeat from the past") h.realizeLostHeartbeat() continue } @@ -192,8 +196,8 @@ func (h *HA) controller() { } select { - case <-logTicker.C: - shouldLog = true + case <-routineLogTicker.C: + shouldLogRoutineEvents = true default: } @@ -204,10 +208,10 @@ func (h *HA) controller() { } else { realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) } - err = h.realize(realizeCtx, s, t, envId, shouldLog) + err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { - h.signalHandover() + h.signalHandover("context deadline exceeded") continue } if err != nil { @@ -219,10 +223,10 @@ func (h *HA) controller() { oldInstancesRemoved = true } - shouldLog = false + shouldLogRoutineEvents = false } else { h.logger.Error("Lost heartbeat") - h.signalHandover() + h.signalHandover("lost heartbeat") h.realizeLostHeartbeat() } case <-h.heartbeat.Done(): @@ -235,13 +239,25 @@ func (h *HA) controller() { } } -func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { - var takeover, otherResponsible bool +// realize a HA cycle triggered by a heartbeat event. +// +// shouldLogRoutineEvents indicates if recurrent events should be logged. +func (h *HA) realize( + ctx context.Context, + s *icingaredisv1.IcingaStatus, + t *types.UnixMilli, + envId types.Binary, + shouldLogRoutineEvents bool, +) error { + var ( + takeover string + otherResponsible bool + ) err := retry.WithBackoff( ctx, func(ctx context.Context) error { - takeover = false + takeover = "" otherResponsible = false isoLvl := sql.LevelSerializable selectLock := "" @@ -259,25 +275,41 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type } query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ - "WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock + "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock instance := &v1.IcingadbInstance{} + errQuery := tx.QueryRowxContext(ctx, query, envId, "y", h.instanceId).StructScan(instance) + + switch { + case errQuery == nil: + fields := []any{ + zap.String("instance_id", instance.Id.String()), + zap.String("environment", envId.String()), + zap.Time("heartbeat", instance.Heartbeat.Time()), + zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())), + } - errQuery := tx.QueryRowxContext( - ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(), - ).StructScan(instance) - switch errQuery { - case nil: - otherResponsible = true - if shouldLog { - h.logger.Infow("Another instance is active", - zap.String("instance_id", instance.Id.String()), - zap.String("environment", envId.String()), - "heartbeat", instance.Heartbeat, - zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) + if instance.Heartbeat.Time().Before(time.Now().Add(-1 * peerTimeout)) { + takeover = "other instance's heartbeat has expired" + h.logger.Debugw("Preparing to take over HA as other instance's heartbeat has expired", fields...) + } else { + otherResponsible = true + if shouldLogRoutineEvents { + h.logger.Infow("Another instance is active", fields...) + } } - case sql.ErrNoRows: - takeover = true + + case errors.Is(errQuery, sql.ErrNoRows): + fields := []any{ + zap.String("instance_id", h.instanceId.String()), + zap.String("environment", envId.String())} + if !h.responsible { + takeover = "no other instance is active" + h.logger.Debugw("Preparing to take over HA as no instance is active", fields...) + } else if h.responsible && shouldLogRoutineEvents { + h.logger.Debugw("Continuing being the active instance", fields...) + } + default: return internal.CantPerformQuery(errQuery, query) } @@ -292,7 +324,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type EnvironmentId: envId, }, Heartbeat: *t, - Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, Icinga2StartTime: s.ProgramStart, @@ -309,7 +341,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return internal.CantPerformQuery(err, stmt) } - if takeover { + if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) @@ -343,14 +375,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return err } - if takeover { + if takeover != "" { // Insert the environment after each heartbeat takeover if it does not already exist in the database // as the environment may have changed, although this is likely to happen very rarely. if err := h.insertEnvironment(); err != nil { return errors.Wrap(err, "can't insert environment") } - h.signalTakeover() + h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { state.otherResponsible = true @@ -361,6 +393,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return nil } +// realizeLostHeartbeat updates "responsible = n" for this HA into the database. func (h *HA) realizeLostHeartbeat() { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?") if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) { @@ -394,10 +427,10 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar select { case <-h.ctx.Done(): return - case <-time.After(timeout): + case <-time.After(peerTimeout): query := h.db.Rebind("DELETE FROM icingadb_instance " + "WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?") - heartbeat := types.UnixMilli(time.Now().Add(-timeout)) + heartbeat := types.UnixMilli(time.Now().Add(-1 * peerTimeout)) result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId, s.EndpointId, heartbeat) if err != nil { @@ -416,7 +449,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar } } -func (h *HA) signalHandover() { +// signalHandover gives up HA.responsible and notifies the HA.Handover chan. +func (h *HA) signalHandover(reason string) { if h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -425,7 +459,7 @@ func (h *HA) signalHandover() { }) select { - case h.handover <- struct{}{}: + case h.handover <- reason: h.responsible = false case <-h.ctx.Done(): // Noop @@ -433,7 +467,8 @@ func (h *HA) signalHandover() { } } -func (h *HA) signalTakeover() { +// signalTakeover claims HA.responsible and notifies the HA.Takeover chan. +func (h *HA) signalTakeover(reason string) { if !h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -442,7 +477,7 @@ func (h *HA) signalTakeover() { }) select { - case h.takeover <- struct{}{}: + case h.takeover <- reason: h.responsible = true case <-h.ctx.Done(): // Noop diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 97e99038b..8c4fc80ff 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -16,9 +16,9 @@ import ( "time" ) -// timeout defines how long a heartbeat may be absent if a heartbeat has already been received. +// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received. // After this time, a heartbeat loss is propagated. -var timeout = 60 * time.Second +const Timeout = time.Minute // Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received. // Also signals on if the heartbeat is Lost. @@ -141,9 +141,9 @@ func (h *Heartbeat) controller(ctx context.Context) { atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) h.sendEvent(m) - case <-time.After(timeout): + case <-time.After(Timeout): if h.active { - h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout)) + h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout)) h.sendEvent(nil) h.active = false } else { @@ -217,5 +217,5 @@ func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) { // ExpiryTime returns the timestamp when the heartbeat expires. func (m *HeartbeatMessage) ExpiryTime() time.Time { - return m.received.Add(timeout) + return m.received.Add(Timeout) }