Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance HA "Taking over", "Handing over" logging #692

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func run() int {
hactx, cancelHactx := context.WithCancel(ctx)
for hactx.Err() == nil {
select {
case <-ha.Takeover():
logger.Info("Taking over")
case takeoverReason := <-ha.Takeover():
logger.Infow("Taking over", zap.String("reason", takeoverReason))

go func() {
for hactx.Err() == nil {
Expand Down Expand Up @@ -324,8 +324,8 @@ func run() int {
}
}
}()
case <-ha.Handover():
logger.Warn("Handing over")
case handoverReason := <-ha.Handover():
logger.Warnw("Handing over", zap.String("reason", handoverReason))

cancelHactx()
case <-hactx.Done():
Expand Down
131 changes: 83 additions & 48 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"time"
)

var timeout = 60 * time.Second
// peerTimeout defines the timeout for HA heartbeats, being used to detect absent nodes.
//
// Because this timeout relies on icingaredis.Timeout, it is icingaredis.Timeout plus a short grace period.
const peerTimeout = icingaredis.Timeout + 5*time.Second

type haState struct {
responsibleTsMilli int64
Expand All @@ -43,8 +46,8 @@ type HA struct {
heartbeat *icingaredis.Heartbeat
logger *logging.Logger
responsible bool
handover chan struct{}
takeover chan struct{}
handover chan string
takeover chan string
done chan struct{}
errOnce sync.Once
errMu sync.Mutex
Expand All @@ -64,8 +67,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger
db: db,
heartbeat: heartbeat,
logger: logger,
handover: make(chan struct{}),
takeover: make(chan struct{}),
handover: make(chan string),
takeover: make(chan string),
done: make(chan struct{}),
}

Expand Down Expand Up @@ -107,13 +110,13 @@ func (h *HA) Err() error {
return h.err
}

// Handover returns a channel with which handovers are signaled.
func (h *HA) Handover() chan struct{} {
// Handover returns a channel with which handovers and their reasons are signaled.
func (h *HA) Handover() chan string {
return h.handover
}

// Takeover returns a channel with which takeovers are signaled.
func (h *HA) Takeover() chan struct{} {
// Takeover returns a channel with which takeovers and their reasons are signaled.
func (h *HA) Takeover() chan string {
return h.takeover
}

Expand Down Expand Up @@ -141,9 +144,10 @@ func (h *HA) controller() {

oldInstancesRemoved := false

logTicker := time.NewTicker(time.Second * 60)
defer logTicker.Stop()
shouldLog := true
// Suppress recurring log messages in the realize method to be only logged this often.
routineLogTicker := time.NewTicker(5 * time.Minute)
defer routineLogTicker.Stop()
shouldLogRoutineEvents := true

for {
select {
Expand All @@ -158,9 +162,9 @@ func (h *HA) controller() {
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * timeout)) {
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
h.signalHandover()
h.signalHandover("received heartbeat from the past")
h.realizeLostHeartbeat()
continue
}
Expand Down Expand Up @@ -192,8 +196,8 @@ func (h *HA) controller() {
}

select {
case <-logTicker.C:
shouldLog = true
case <-routineLogTicker.C:
shouldLogRoutineEvents = true
default:
}

Expand All @@ -204,10 +208,10 @@ func (h *HA) controller() {
} else {
realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
}
err = h.realize(realizeCtx, s, t, envId, shouldLog)
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover()
h.signalHandover("context deadline exceeded")
continue
}
if err != nil {
Expand All @@ -219,10 +223,10 @@ func (h *HA) controller() {
oldInstancesRemoved = true
}

shouldLog = false
shouldLogRoutineEvents = false
} else {
h.logger.Error("Lost heartbeat")
h.signalHandover()
h.signalHandover("lost heartbeat")
h.realizeLostHeartbeat()
}
case <-h.heartbeat.Done():
Expand All @@ -235,13 +239,25 @@ func (h *HA) controller() {
}
}

func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error {
var takeover, otherResponsible bool
// realize a HA cycle triggered by a heartbeat event.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
var (
takeover string
oxzi marked this conversation as resolved.
Show resolved Hide resolved
otherResponsible bool
)

err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
takeover = false
takeover = ""
otherResponsible = false
isoLvl := sql.LevelSerializable
selectLock := ""
Expand All @@ -259,25 +275,41 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
}

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?") + selectLock
lippserd marked this conversation as resolved.
Show resolved Hide resolved
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock

instance := &v1.IcingadbInstance{}
errQuery := tx.QueryRowxContext(ctx, query, envId, "y", h.instanceId).StructScan(instance)

switch {
case errQuery == nil:
fields := []any{
zap.String("instance_id", instance.Id.String()),
oxzi marked this conversation as resolved.
Show resolved Hide resolved
zap.String("environment", envId.String()),
zap.Time("heartbeat", instance.Heartbeat.Time()),
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())),
}

errQuery := tx.QueryRowxContext(
ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(),
).StructScan(instance)
switch errQuery {
case nil:
otherResponsible = true
if shouldLog {
h.logger.Infow("Another instance is active",
zap.String("instance_id", instance.Id.String()),
zap.String("environment", envId.String()),
"heartbeat", instance.Heartbeat,
zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time())))
if instance.Heartbeat.Time().Before(time.Now().Add(-1 * peerTimeout)) {
takeover = "other instance's heartbeat has expired"
h.logger.Debugw("Preparing to take over HA as other instance's heartbeat has expired", fields...)
} else {
otherResponsible = true
if shouldLogRoutineEvents {
h.logger.Infow("Another instance is active", fields...)
}
}
case sql.ErrNoRows:
takeover = true

case errors.Is(errQuery, sql.ErrNoRows):
fields := []any{
zap.String("instance_id", h.instanceId.String()),
zap.String("environment", envId.String())}
if !h.responsible {
takeover = "no other instance is active"
h.logger.Debugw("Preparing to take over HA as no instance is active", fields...)
} else if h.responsible && shouldLogRoutineEvents {
h.logger.Debugw("Continuing being the active instance", fields...)
}

default:
return internal.CantPerformQuery(errQuery, query)
}
Expand All @@ -292,7 +324,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
EnvironmentId: envId,
},
Heartbeat: *t,
Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true},
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Icinga2StartTime: s.ProgramStart,
Expand All @@ -309,7 +341,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return internal.CantPerformQuery(err, stmt)
}

if takeover {
if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)

Expand Down Expand Up @@ -343,14 +375,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return err
}

if takeover {
if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover()
h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
state.otherResponsible = true
Expand All @@ -361,6 +393,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
return nil
}

// realizeLostHeartbeat updates "responsible = n" for this HA into the database.
func (h *HA) realizeLostHeartbeat() {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?")
if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) {
Expand Down Expand Up @@ -394,10 +427,10 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
select {
case <-h.ctx.Done():
return
case <-time.After(timeout):
case <-time.After(peerTimeout):
query := h.db.Rebind("DELETE FROM icingadb_instance " +
"WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?")
heartbeat := types.UnixMilli(time.Now().Add(-timeout))
heartbeat := types.UnixMilli(time.Now().Add(-1 * peerTimeout))
result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId,
s.EndpointId, heartbeat)
if err != nil {
Expand All @@ -416,7 +449,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
}
}

func (h *HA) signalHandover() {
// signalHandover gives up HA.responsible and notifies the HA.Handover chan.
func (h *HA) signalHandover(reason string) {
if h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
Expand All @@ -425,15 +459,16 @@ func (h *HA) signalHandover() {
})

select {
case h.handover <- struct{}{}:
case h.handover <- reason:
h.responsible = false
case <-h.ctx.Done():
// Noop
}
}
}

func (h *HA) signalTakeover() {
// signalTakeover claims HA.responsible and notifies the HA.Takeover chan.
func (h *HA) signalTakeover(reason string) {
if !h.responsible {
h.state.Store(haState{
responsibleTsMilli: time.Now().UnixMilli(),
Expand All @@ -442,7 +477,7 @@ func (h *HA) signalTakeover() {
})

select {
case h.takeover <- struct{}{}:
case h.takeover <- reason:
h.responsible = true
case <-h.ctx.Done():
// Noop
Expand Down
10 changes: 5 additions & 5 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"time"
)

// timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// Timeout defines how long a heartbeat may be absent if a heartbeat has already been received.
// After this time, a heartbeat loss is propagated.
var timeout = 60 * time.Second
const Timeout = time.Minute

// Heartbeat periodically reads heartbeats from a Redis stream and signals in Beat channels when they are received.
// Also signals on if the heartbeat is Lost.
Expand Down Expand Up @@ -141,9 +141,9 @@ func (h *Heartbeat) controller(ctx context.Context) {

atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())
h.sendEvent(m)
case <-time.After(timeout):
case <-time.After(Timeout):
if h.active {
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", timeout))
h.logger.Warnw("Lost Icinga heartbeat", zap.Duration("timeout", Timeout))
h.sendEvent(nil)
h.active = false
} else {
Expand Down Expand Up @@ -217,5 +217,5 @@ func (m *HeartbeatMessage) EnvironmentID() (types.Binary, error) {

// ExpiryTime returns the timestamp when the heartbeat expires.
func (m *HeartbeatMessage) ExpiryTime() time.Time {
return m.received.Add(timeout)
return m.received.Add(Timeout)
}
Loading