diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 5a2ff5250..4584ccaf8 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -162,8 +162,8 @@ func run() int { hactx, cancelHactx := context.WithCancel(ctx) for hactx.Err() == nil { select { - case <-ha.Takeover(): - logger.Info("Taking over") + case takeoverReason := <-ha.Takeover(): + logger.Infow("Taking over", zap.String("reason", takeoverReason)) go func() { for hactx.Err() == nil { @@ -324,8 +324,8 @@ func run() int { } } }() - case <-ha.Handover(): - logger.Warn("Handing over") + case handoverReason := <-ha.Handover(): + logger.Warnw("Handing over", zap.String("reason", handoverReason)) cancelHactx() case <-hactx.Done(): diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 74d3b3234..db4cca3c0 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -43,8 +43,8 @@ type HA struct { heartbeat *icingaredis.Heartbeat logger *logging.Logger responsible bool - handover chan struct{} - takeover chan struct{} + handover chan string + takeover chan string done chan struct{} errOnce sync.Once errMu sync.Mutex @@ -64,8 +64,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger db: db, heartbeat: heartbeat, logger: logger, - handover: make(chan struct{}), - takeover: make(chan struct{}), + handover: make(chan string), + takeover: make(chan string), done: make(chan struct{}), } @@ -107,13 +107,13 @@ func (h *HA) Err() error { return h.err } -// Handover returns a channel with which handovers are signaled. -func (h *HA) Handover() chan struct{} { +// Handover returns a channel with which handovers and their reasons are signaled. +func (h *HA) Handover() chan string { return h.handover } -// Takeover returns a channel with which takeovers are signaled. -func (h *HA) Takeover() chan struct{} { +// Takeover returns a channel with which takeovers and their reasons are signaled. +func (h *HA) Takeover() chan string { return h.takeover } @@ -141,9 +141,10 @@ func (h *HA) controller() { oldInstancesRemoved := false - logTicker := time.NewTicker(time.Second * 60) - defer logTicker.Stop() - shouldLog := true + // Suppress recurring log messages in the realize method to be only logged this often. + routineLogTicker := time.NewTicker(time.Second * 60) + defer routineLogTicker.Stop() + shouldLogRoutineEvents := true for { select { @@ -160,7 +161,7 @@ func (h *HA) controller() { } if tt.Before(now.Add(-1 * timeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) - h.signalHandover() + h.signalHandover("received heartbeat from the past") h.realizeLostHeartbeat() continue } @@ -197,8 +198,8 @@ func (h *HA) controller() { } select { - case <-logTicker.C: - shouldLog = true + case <-routineLogTicker.C: + shouldLogRoutineEvents = true default: } @@ -209,10 +210,10 @@ func (h *HA) controller() { } else { realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) } - err = h.realize(realizeCtx, s, t, envId, shouldLog) + err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { - h.signalHandover() + h.signalHandover("context deadline exceeded") continue } if err != nil { @@ -224,10 +225,10 @@ func (h *HA) controller() { oldInstancesRemoved = true } - shouldLog = false + shouldLogRoutineEvents = false } else { h.logger.Error("Lost heartbeat") - h.signalHandover() + h.signalHandover("lost heartbeat") h.realizeLostHeartbeat() } case <-h.heartbeat.Done(): @@ -240,7 +241,16 @@ func (h *HA) controller() { } } -func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { +// realize a HA cycle triggered by a heartbeat event. +// +// shouldLogRoutineEvents indicates if recurrent events should be logged. +func (h *HA) realize( + ctx context.Context, + s *icingaredisv1.IcingaStatus, + t *types.UnixMilli, + envId types.Binary, + shouldLogRoutineEvents bool, +) error { var takeover, otherResponsible bool err := retry.WithBackoff( @@ -271,17 +281,25 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type errQuery := tx.QueryRowxContext( ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(), ).StructScan(instance) - switch errQuery { - case nil: + switch { + case errQuery == nil: otherResponsible = true - if shouldLog { - h.logger.Infow("Another instance is active", + if shouldLogRoutineEvents { + h.logger.Debugw("Another instance is active", zap.String("instance_id", instance.Id.String()), zap.String("environment", envId.String()), - "heartbeat", instance.Heartbeat, + zap.Time("heartbeat", instance.Heartbeat.Time()), zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) } - case sql.ErrNoRows: + case errors.Is(errQuery, sql.ErrNoRows): + fields := []any{ + zap.String("instance_id", h.instanceId.String()), + zap.String("environment", envId.String())} + if !h.responsible { + h.logger.Debugw("Preparing to take over HA as no instance is active", fields...) + } else if h.responsible && shouldLogRoutineEvents { + h.logger.Debugw("Continuing being the active instance", fields...) + } takeover = true default: return internal.CantPerformQuery(errQuery, query) @@ -355,7 +373,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return errors.Wrap(err, "can't insert environment") } - h.signalTakeover() + h.signalTakeover("no other instance is active") } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { state.otherResponsible = true @@ -366,6 +384,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return nil } +// realizeLostHeartbeat updates "responsible = n" for this HA into the database. func (h *HA) realizeLostHeartbeat() { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?") if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) { @@ -421,7 +440,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar } } -func (h *HA) signalHandover() { +// signalHandover gives up HA.responsible and notifies the HA.Handover chan. +func (h *HA) signalHandover(reason string) { if h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -430,7 +450,7 @@ func (h *HA) signalHandover() { }) select { - case h.handover <- struct{}{}: + case h.handover <- reason: h.responsible = false case <-h.ctx.Done(): // Noop @@ -438,7 +458,8 @@ func (h *HA) signalHandover() { } } -func (h *HA) signalTakeover() { +// signalTakeover claims HA.responsible and notifies the HA.Takeover chan. +func (h *HA) signalTakeover(reason string) { if !h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -447,7 +468,7 @@ func (h *HA) signalTakeover() { }) select { - case h.takeover <- struct{}{}: + case h.takeover <- reason: h.responsible = true case <-h.ctx.Done(): // Noop