From 0c3dbd66d663c6a939224ff21bda13e3afe253d9 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 9 Dec 2024 15:34:37 -0600 Subject: [PATCH 01/34] cleanup: implement context-aware sleep and cleanup manager functionality --- app/initriver.go | 12 +++++ engine/cleanupmanager/alerts.go | 85 +++++++++++++++++++++++++++++++ engine/cleanupmanager/db.go | 16 +----- engine/cleanupmanager/queries.sql | 37 ++++++++++++++ engine/cleanupmanager/setup.go | 37 ++++++++++++++ engine/cleanupmanager/update.go | 34 ------------- gadb/queries.sql.go | 75 +++++++++++++++++++++++++++ util/contextsleep.go | 16 ++++++ 8 files changed, 263 insertions(+), 49 deletions(-) create mode 100644 engine/cleanupmanager/alerts.go create mode 100644 engine/cleanupmanager/queries.sql create mode 100644 engine/cleanupmanager/setup.go create mode 100644 util/contextsleep.go diff --git a/app/initriver.go b/app/initriver.go index 4bc2c48a78..8d6d8da522 100644 --- a/app/initriver.go +++ b/app/initriver.go @@ -75,6 +75,12 @@ func (i *ignoreCancel) WithAttrs(attrs []slog.Attr) slog.Handler { return &ignoreCancel{h: i.h.WithAttrs(attrs)} } +type workerMiddlewareFunc func(context.Context, func(ctx context.Context) error) error + +func (w workerMiddlewareFunc) Work(ctx context.Context, job *rivertype.JobRow, doInner func(ctx context.Context) error) error { + return w(ctx, doInner) +} + func (app *App) initRiver(ctx context.Context) error { app.RiverWorkers = river.NewWorkers() @@ -94,6 +100,12 @@ func (app *App) initRiver(ctx context.Context) error { Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, + WorkerMiddleware: []rivertype.WorkerMiddleware{ + workerMiddlewareFunc(func(ctx context.Context, doInner func(ctx context.Context) error) error { + // Ensure config is set in the context for all workers. + return doInner(app.ConfigStore.Config().Context(ctx)) + }), + }, ErrorHandler: &riverErrs{ // The error handler logger is used differently than the main logger, so it should be separate, and doesn't need the wrapper. Logger: app.Logger.With("module", "river"), diff --git a/engine/cleanupmanager/alerts.go b/engine/cleanupmanager/alerts.go new file mode 100644 index 0000000000..c3c9c38d38 --- /dev/null +++ b/engine/cleanupmanager/alerts.go @@ -0,0 +1,85 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/riverqueue/river" + "github.com/target/goalert/alert" + "github.com/target/goalert/alert/alertlog" + "github.com/target/goalert/config" + "github.com/target/goalert/gadb" + "github.com/target/goalert/util" +) + +type AlertArgs struct{} + +func (AlertArgs) Kind() string { return "cleanup-manager-alerts" } + +// CleanupAlerts will automatically close and delete old alerts. +func (db *DB) CleanupAlerts(ctx context.Context, j *river.Job[AlertArgs]) error { + cfg := config.FromContext(ctx) + + for cfg.Maintenance.AlertAutoCloseDays > 0 { + var count int64 + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + ids, err := gadb.New(tx).CleanupMgrFindStaleAlerts(ctx, gadb.CleanupMgrFindStaleAlertsParams{ + AutoCloseDays: int64(cfg.Maintenance.AlertAutoCloseDays), + IncludeAcked: cfg.Maintenance.AutoCloseAckedAlerts, + }) + if err != nil { + return fmt.Errorf("find stale alerts: %w", err) + } + + var idsInt []int + for _, id := range ids { + idsInt = append(idsInt, int(id)) + } + + _, err = db.alertStore.UpdateManyAlertStatus(ctx, alert.StatusClosed, idsInt, alertlog.AutoClose{AlertAutoCloseDays: cfg.Maintenance.AlertAutoCloseDays}) + if err != nil { + return fmt.Errorf("update alerts: %w", err) + } + + count = int64(len(ids)) + return nil + }) + if err != nil { + return fmt.Errorf("auto close alerts: %w", err) + } + if count < 100 { + // Assume we've closed all old alerts, since we got less than 100 (which is the max we close at once). + break + } + + err = util.ContextSleep(ctx, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("auto close alerts: sleep: %w", err) + } + } + + for cfg.Maintenance.AlertCleanupDays > 0 { + var count int64 + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + var err error + count, err = gadb.New(tx).CleanupMgrDeleteOldAlerts(ctx, int64(cfg.Maintenance.AlertCleanupDays)) + return err + }) + if err != nil { + return fmt.Errorf("delete old alerts: %w", err) + } + if count < 100 { + // Assume we've deleted all old alerts, since we got less than 100 (which is the max we delete at once). + break + } + + err = util.ContextSleep(ctx, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("delete old alerts: sleep: %w", err) + } + } + + return nil +} diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 317194ac41..9cd67aebfc 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -17,7 +17,6 @@ type DB struct { now *sql.Stmt userIDs *sql.Stmt - cleanupAlerts *sql.Stmt cleanupAPIKeys *sql.Stmt setTimeout *sql.Stmt @@ -31,7 +30,6 @@ type DB struct { cleanupOverrides *sql.Stmt cleanupSchedOnCall *sql.Stmt cleanupEPOnCall *sql.Stmt - staleAlerts *sql.Stmt alertStore *alert.Store logIndex int @@ -62,7 +60,6 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error // Abort any cleanup operation that takes longer than 3 seconds // error will be logged. setTimeout: p.P(`SET LOCAL statement_timeout = 3000`), - cleanupAlerts: p.P(`delete from alerts where id = any(select id from alerts where status = 'closed' AND created_at < (now() - $1::interval) order by id limit 100 for update skip locked)`), cleanupAPIKeys: p.P(`update user_calendar_subscriptions set disabled = true where id = any(select id from user_calendar_subscriptions where greatest(last_access, last_update) < (now() - $1::interval) order by id limit 100 for update skip locked)`), schedData: p.P(` @@ -94,17 +91,6 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error cleanupOverrides: p.P(`DELETE FROM user_overrides WHERE id = ANY(SELECT id FROM user_overrides WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), cleanupSchedOnCall: p.P(`DELETE FROM schedule_on_call_users WHERE id = ANY(SELECT id FROM schedule_on_call_users WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), cleanupEPOnCall: p.P(`DELETE FROM ep_step_on_call_users WHERE id = ANY(SELECT id FROM ep_step_on_call_users WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), - staleAlerts: p.P(` - select id from alerts a - where - (a.status='triggered' or ($2 and a.status = 'active')) and - created_at <= now() - '1 day'::interval * $1 and - not exists ( - select 1 from alert_logs log - where timestamp > now() - '1 day'::interval * $1 and - log.alert_id = a.id - ) - limit 100`), - alertStore: alertstore, + alertStore: alertstore, }, p.Err } diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql new file mode 100644 index 0000000000..d6bbf430a8 --- /dev/null +++ b/engine/cleanupmanager/queries.sql @@ -0,0 +1,37 @@ +-- name: CleanupMgrDeleteOldAlerts :execrows +-- CleanupMgrDeleteOldAlerts will delete old alerts from the alerts table that are closed and older than the given number of days before now. +DELETE FROM alerts +WHERE id = ANY ( + SELECT + id + FROM + alerts a + WHERE + status = 'closed' + AND a.created_at < now() -(sqlc.arg(cleanup_days)::bigint * '1 day'::interval) + ORDER BY + id + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + +-- name: CleanupMgrFindStaleAlerts :many +-- CleanupMgrFindStaleAlerts will find alerts that are triggered or active and have no activity in specified number of days. +SELECT + id +FROM + alerts a +WHERE (a.status = 'triggered' + OR (sqlc.arg(include_acked) + AND a.status = 'active')) +AND created_at <= now() - '1 day'::interval * sqlc.arg(auto_close_days) +AND NOT EXISTS ( + SELECT + 1 + FROM + alert_logs log + WHERE + timestamp > now() - '1 day'::interval * sqlc.arg(auto_close_days) + AND log.alert_id = a.id) +LIMIT 100; + diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go new file mode 100644 index 0000000000..5b8673fdea --- /dev/null +++ b/engine/cleanupmanager/setup.go @@ -0,0 +1,37 @@ +package cleanupmanager + +import ( + "context" + "fmt" + "time" + + "github.com/riverqueue/river" + "github.com/target/goalert/engine/processinglock" +) + +var _ processinglock.Setupable = &DB{} + +const QueueName = "cleanup-manager" + +func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) + + err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 2}) + if err != nil { + return fmt.Errorf("add queue: %w", err) + } + + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return AlertArgs{}, &river.InsertOpts{ + Queue: QueueName, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + + return nil +} diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go index cb5518065c..ff3a3b9950 100644 --- a/engine/cleanupmanager/update.go +++ b/engine/cleanupmanager/update.go @@ -9,8 +9,6 @@ import ( "time" "github.com/jackc/pgtype" - "github.com/target/goalert/alert" - "github.com/target/goalert/alert/alertlog" "github.com/target/goalert/config" "github.com/target/goalert/permission" "github.com/target/goalert/schedule" @@ -55,38 +53,6 @@ func (db *DB) update(ctx context.Context) error { } cfg := config.FromContext(ctx) - if cfg.Maintenance.AlertCleanupDays > 0 { - var dur pgtype.Interval - dur.Days = int32(cfg.Maintenance.AlertCleanupDays) - dur.Status = pgtype.Present - _, err = tx.StmtContext(ctx, db.cleanupAlerts).ExecContext(ctx, &dur) - if err != nil { - return fmt.Errorf("cleanup alerts: %w", err) - } - } - - if cfg.Maintenance.AlertAutoCloseDays > 0 { - rows, err := tx.StmtContext(ctx, db.staleAlerts).QueryContext(ctx, cfg.Maintenance.AlertAutoCloseDays, cfg.Maintenance.AutoCloseAckedAlerts) - if err != nil { - return fmt.Errorf("query auto-close alerts: %w", err) - } - defer rows.Close() - var ids []int - for rows.Next() { - var id int - err = rows.Scan(&id) - if err != nil { - return fmt.Errorf("cleanup auto-close alerts: scan : %w", err) - } - ids = append(ids, id) - } - var autoCloseDays alertlog.AutoClose - autoCloseDays.AlertAutoCloseDays = cfg.Maintenance.AlertAutoCloseDays - _, err = db.alertStore.UpdateManyAlertStatus(ctx, alert.StatusClosed, ids, autoCloseDays) - if err != nil { - return fmt.Errorf("cleanup auto-close alerts: %w", err) - } - } if cfg.Maintenance.APIKeyExpireDays > 0 { var dur pgtype.Interval diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 5a257ea21c..e558dbd26b 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -851,6 +851,81 @@ func (q *Queries) CalSubUserNames(ctx context.Context, dollar_1 []uuid.UUID) ([] return items, nil } +const cleanupMgrDeleteOldAlerts = `-- name: CleanupMgrDeleteOldAlerts :execrows +DELETE FROM alerts +WHERE id = ANY ( + SELECT + id + FROM + alerts a + WHERE + status = 'closed' + AND a.created_at < now() -($1::bigint * '1 day'::interval) + ORDER BY + id + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldAlerts will delete old alerts from the alerts table that are closed and older than the given number of days before now. +func (q *Queries) CleanupMgrDeleteOldAlerts(ctx context.Context, cleanupDays int64) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldAlerts, cleanupDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const cleanupMgrFindStaleAlerts = `-- name: CleanupMgrFindStaleAlerts :many +SELECT + id +FROM + alerts a +WHERE (a.status = 'triggered' + OR ($1 + AND a.status = 'active')) +AND created_at <= now() - '1 day'::interval * $2 +AND NOT EXISTS ( + SELECT + 1 + FROM + alert_logs log + WHERE + timestamp > now() - '1 day'::interval * $2 + AND log.alert_id = a.id) +LIMIT 100 +` + +type CleanupMgrFindStaleAlertsParams struct { + IncludeAcked interface{} + AutoCloseDays interface{} +} + +// CleanupMgrFindStaleAlerts will find alerts that are triggered or active and have no activity in specified number of days. +func (q *Queries) CleanupMgrFindStaleAlerts(ctx context.Context, arg CleanupMgrFindStaleAlertsParams) ([]int64, error) { + rows, err := q.db.QueryContext(ctx, cleanupMgrFindStaleAlerts, arg.IncludeAcked, arg.AutoCloseDays) + if err != nil { + return nil, err + } + defer rows.Close() + var items []int64 + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const connectionInfo = `-- name: ConnectionInfo :many SELECT application_name AS NAME, COUNT(*) diff --git a/util/contextsleep.go b/util/contextsleep.go new file mode 100644 index 0000000000..6ff6a0574a --- /dev/null +++ b/util/contextsleep.go @@ -0,0 +1,16 @@ +package util + +import ( + "context" + "time" +) + +// ContextSleep will sleep for the specified duration or until the context is canceled. +func ContextSleep(ctx context.Context, d time.Duration) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(d): + return nil + } +} From 9cc35dd27f63ddeb085d9c7fc4ceb221264a26f4 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 9 Dec 2024 15:36:16 -0600 Subject: [PATCH 02/34] cleanup: format code for consistency in alert store initialization --- engine/cleanupmanager/db.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 9cd67aebfc..642d02ce2d 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -91,6 +91,7 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error cleanupOverrides: p.P(`DELETE FROM user_overrides WHERE id = ANY(SELECT id FROM user_overrides WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), cleanupSchedOnCall: p.P(`DELETE FROM schedule_on_call_users WHERE id = ANY(SELECT id FROM schedule_on_call_users WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), cleanupEPOnCall: p.P(`DELETE FROM ep_step_on_call_users WHERE id = ANY(SELECT id FROM ep_step_on_call_users WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), - alertStore: alertstore, + + alertStore: alertstore, }, p.Err } From e5c72d79ad5012eded5ad4cd569bb60664ede1b4 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 9 Dec 2024 15:42:11 -0600 Subject: [PATCH 03/34] cleanup: remove unused ConfigSource field from SetupArgs struct --- engine/engine.go | 9 ++++----- engine/processinglock/module.go | 10 ++++------ 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index d830fb7ad2..e2faa84186 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -167,11 +167,10 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) { } args := processinglock.SetupArgs{ - DB: db, - Workers: c.RiverWorkers, - ConfigSource: c.ConfigSource, - EventBus: c.EventBus, - River: c.River, + DB: db, + Workers: c.RiverWorkers, + EventBus: c.EventBus, + River: c.River, } for _, m := range p.modules { if s, ok := m.(processinglock.Setupable); ok { diff --git a/engine/processinglock/module.go b/engine/processinglock/module.go index 4cb700ea75..7b80e82582 100644 --- a/engine/processinglock/module.go +++ b/engine/processinglock/module.go @@ -6,7 +6,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/riverqueue/river" - "github.com/target/goalert/config" "github.com/target/goalert/event" ) @@ -23,11 +22,10 @@ type Updatable interface { // SetupArgs is a struct that contains the arguments for the setup function. type SetupArgs struct { - DB *sql.DB - Workers *river.Workers - ConfigSource config.Source - EventBus *event.Bus - River *river.Client[pgx.Tx] + DB *sql.DB + Workers *river.Workers + EventBus *event.Bus + River *river.Client[pgx.Tx] } // Setupable is an interface for types that can be set up using the job queue system. From 7ba52bb8e8865f3f11b74fb8824316ab514a8a2e Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 10 Dec 2024 11:11:36 -0600 Subject: [PATCH 04/34] cleanup: refactor alert cleanup logic and add shift cleanup functionality --- engine/cleanupmanager/alerts.go | 48 ++++++--------------- engine/cleanupmanager/queries.sql | 42 +++++++++++++++++++ engine/cleanupmanager/setup.go | 27 ++++++++++++ engine/cleanupmanager/shifts.go | 58 ++++++++++++++++++++++++++ gadb/queries.sql.go | 69 +++++++++++++++++++++++++++++++ 5 files changed, 209 insertions(+), 35 deletions(-) create mode 100644 engine/cleanupmanager/shifts.go diff --git a/engine/cleanupmanager/alerts.go b/engine/cleanupmanager/alerts.go index c3c9c38d38..58de9e7488 100644 --- a/engine/cleanupmanager/alerts.go +++ b/engine/cleanupmanager/alerts.go @@ -4,14 +4,12 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/riverqueue/river" "github.com/target/goalert/alert" "github.com/target/goalert/alert/alertlog" "github.com/target/goalert/config" "github.com/target/goalert/gadb" - "github.com/target/goalert/util" ) type AlertArgs struct{} @@ -21,16 +19,14 @@ func (AlertArgs) Kind() string { return "cleanup-manager-alerts" } // CleanupAlerts will automatically close and delete old alerts. func (db *DB) CleanupAlerts(ctx context.Context, j *river.Job[AlertArgs]) error { cfg := config.FromContext(ctx) - - for cfg.Maintenance.AlertAutoCloseDays > 0 { - var count int64 - err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + if cfg.Maintenance.AlertAutoCloseDays > 0 { + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { ids, err := gadb.New(tx).CleanupMgrFindStaleAlerts(ctx, gadb.CleanupMgrFindStaleAlertsParams{ AutoCloseDays: int64(cfg.Maintenance.AlertAutoCloseDays), IncludeAcked: cfg.Maintenance.AutoCloseAckedAlerts, }) if err != nil { - return fmt.Errorf("find stale alerts: %w", err) + return false, fmt.Errorf("find stale alerts: %w", err) } var idsInt []int @@ -40,44 +36,26 @@ func (db *DB) CleanupAlerts(ctx context.Context, j *river.Job[AlertArgs]) error _, err = db.alertStore.UpdateManyAlertStatus(ctx, alert.StatusClosed, idsInt, alertlog.AutoClose{AlertAutoCloseDays: cfg.Maintenance.AlertAutoCloseDays}) if err != nil { - return fmt.Errorf("update alerts: %w", err) + return false, fmt.Errorf("update alerts: %w", err) } - count = int64(len(ids)) - return nil + return len(ids) < 100, nil }) if err != nil { return fmt.Errorf("auto close alerts: %w", err) } - if count < 100 { - // Assume we've closed all old alerts, since we got less than 100 (which is the max we close at once). - break - } - - err = util.ContextSleep(ctx, 100*time.Millisecond) - if err != nil { - return fmt.Errorf("auto close alerts: sleep: %w", err) - } } - for cfg.Maintenance.AlertCleanupDays > 0 { - var count int64 - err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { - var err error - count, err = gadb.New(tx).CleanupMgrDeleteOldAlerts(ctx, int64(cfg.Maintenance.AlertCleanupDays)) - return err + if cfg.Maintenance.AlertCleanupDays > 0 { + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDeleteOldAlerts(ctx, int64(cfg.Maintenance.AlertCleanupDays)) + if err != nil { + return false, fmt.Errorf("delete old alerts: %w", err) + } + return count < 100, nil }) if err != nil { - return fmt.Errorf("delete old alerts: %w", err) - } - if count < 100 { - // Assume we've deleted all old alerts, since we got less than 100 (which is the max we delete at once). - break - } - - err = util.ContextSleep(ctx, 100*time.Millisecond) - if err != nil { - return fmt.Errorf("delete old alerts: sleep: %w", err) + return err } } diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index d6bbf430a8..5c3c6576a0 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -35,3 +35,45 @@ AND NOT EXISTS ( AND log.alert_id = a.id) LIMIT 100; +-- name: CleanupMgrDeleteOldOverrides :execrows +-- CleanupMgrDeleteOldOverrides will delete old overrides from the user_overrides table that are older than the given number of days before now. +DELETE FROM user_overrides +WHERE id = ANY ( + SELECT + id + FROM + user_overrides + WHERE + end_time <(now() - '1 day'::interval * sqlc.arg(cleanup_days)) + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + +-- name: CleanupMgrDeleteOldScheduleShifts :execrows +-- CleanupMgrDeleteOldScheduleShifts will delete old schedule shifts from the schedule_on_call_users table that are older than the given number of days before now. +DELETE FROM schedule_on_call_users +WHERE id = ANY ( + SELECT + id + FROM + schedule_on_call_users + WHERE + end_time <(now() - '1 day'::interval * sqlc.arg(cleanup_days)) + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + +-- name: CleanupMgrDeleteOldStepShifts :execrows +-- CleanupMgrDeleteOldStepShifts will delete old EP step shifts from the ep_step_on_call_users table that are older than the given number of days before now. +DELETE FROM ep_step_on_call_users +WHERE id = ANY ( + SELECT + id + FROM + ep_step_on_call_users + WHERE + end_time <(now() - '1 day'::interval * sqlc.arg(cleanup_days)) + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 5b8673fdea..bcbdbeaf77 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -2,17 +2,44 @@ package cleanupmanager import ( "context" + "database/sql" "fmt" "time" "github.com/riverqueue/river" "github.com/target/goalert/engine/processinglock" + "github.com/target/goalert/util" ) var _ processinglock.Setupable = &DB{} const QueueName = "cleanup-manager" +// whileWork will run the provided function in a loop until it returns done=true. +func (db *DB) whileWork(ctx context.Context, run func(ctx context.Context, tx *sql.Tx) (done bool, err error)) error { + var done bool + for { + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + var err error + done, err = run(ctx, tx) + return err + }) + if err != nil { + return fmt.Errorf("do work: %w", err) + } + if done { + break + } + + err = util.ContextSleep(ctx, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("sleep: %w", err) + } + } + + return nil +} + func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) diff --git a/engine/cleanupmanager/shifts.go b/engine/cleanupmanager/shifts.go new file mode 100644 index 0000000000..62da58030d --- /dev/null +++ b/engine/cleanupmanager/shifts.go @@ -0,0 +1,58 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "fmt" + + "github.com/riverqueue/river" + "github.com/target/goalert/config" + "github.com/target/goalert/gadb" +) + +type ShiftArgs struct{} + +func (ShiftArgs) Kind() string { return "cleanup-manager-alerts" } + +// CleanupShifts will automatically cleanup old shift and override records. +func (db *DB) CleanupShifts(ctx context.Context, j *river.Job[ShiftArgs]) error { + cfg := config.FromContext(ctx) + if cfg.Maintenance.ScheduleCleanupDays <= 0 { + return nil + } + + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDeleteOldScheduleShifts(ctx, int64(cfg.Maintenance.ScheduleCleanupDays)) + if err != nil { + return false, fmt.Errorf("delete old shifts: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + err = db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDeleteOldOverrides(ctx, int64(cfg.Maintenance.ScheduleCleanupDays)) + if err != nil { + return false, fmt.Errorf("delete old overrides: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + err = db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDeleteOldStepShifts(ctx, int64(cfg.Maintenance.ScheduleCleanupDays)) + if err != nil { + return false, fmt.Errorf("delete old step shifts: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + return nil +} diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 01850e1aee..0d000523b4 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -877,6 +877,75 @@ func (q *Queries) CleanupMgrDeleteOldAlerts(ctx context.Context, cleanupDays int return result.RowsAffected() } +const cleanupMgrDeleteOldOverrides = `-- name: CleanupMgrDeleteOldOverrides :execrows +DELETE FROM user_overrides +WHERE id = ANY ( + SELECT + id + FROM + user_overrides + WHERE + end_time <(now() - '1 day'::interval * $1) + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldOverrides will delete old overrides from the user_overrides table that are older than the given number of days before now. +func (q *Queries) CleanupMgrDeleteOldOverrides(ctx context.Context, cleanupDays interface{}) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldOverrides, cleanupDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const cleanupMgrDeleteOldScheduleShifts = `-- name: CleanupMgrDeleteOldScheduleShifts :execrows +DELETE FROM schedule_on_call_users +WHERE id = ANY ( + SELECT + id + FROM + schedule_on_call_users + WHERE + end_time <(now() - '1 day'::interval * $1) + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldScheduleShifts will delete old schedule shifts from the schedule_on_call_users table that are older than the given number of days before now. +func (q *Queries) CleanupMgrDeleteOldScheduleShifts(ctx context.Context, cleanupDays interface{}) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldScheduleShifts, cleanupDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const cleanupMgrDeleteOldStepShifts = `-- name: CleanupMgrDeleteOldStepShifts :execrows +DELETE FROM ep_step_on_call_users +WHERE id = ANY ( + SELECT + id + FROM + ep_step_on_call_users + WHERE + end_time <(now() - '1 day'::interval * $1) + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldStepShifts will delete old EP step shifts from the ep_step_on_call_users table that are older than the given number of days before now. +func (q *Queries) CleanupMgrDeleteOldStepShifts(ctx context.Context, cleanupDays interface{}) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldStepShifts, cleanupDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const cleanupMgrFindStaleAlerts = `-- name: CleanupMgrFindStaleAlerts :many SELECT id From bb0b63ece5f8c90cac69a72574538dc8f8b05701 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 10 Dec 2024 11:12:31 -0600 Subject: [PATCH 05/34] cleanup: refactor alert cleanup logic to use whileWork for better control flow --- engine/cleanupmanager/alerts.go | 48 +++++++++------------------------ engine/cleanupmanager/setup.go | 27 +++++++++++++++++++ 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/engine/cleanupmanager/alerts.go b/engine/cleanupmanager/alerts.go index c3c9c38d38..58de9e7488 100644 --- a/engine/cleanupmanager/alerts.go +++ b/engine/cleanupmanager/alerts.go @@ -4,14 +4,12 @@ import ( "context" "database/sql" "fmt" - "time" "github.com/riverqueue/river" "github.com/target/goalert/alert" "github.com/target/goalert/alert/alertlog" "github.com/target/goalert/config" "github.com/target/goalert/gadb" - "github.com/target/goalert/util" ) type AlertArgs struct{} @@ -21,16 +19,14 @@ func (AlertArgs) Kind() string { return "cleanup-manager-alerts" } // CleanupAlerts will automatically close and delete old alerts. func (db *DB) CleanupAlerts(ctx context.Context, j *river.Job[AlertArgs]) error { cfg := config.FromContext(ctx) - - for cfg.Maintenance.AlertAutoCloseDays > 0 { - var count int64 - err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + if cfg.Maintenance.AlertAutoCloseDays > 0 { + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { ids, err := gadb.New(tx).CleanupMgrFindStaleAlerts(ctx, gadb.CleanupMgrFindStaleAlertsParams{ AutoCloseDays: int64(cfg.Maintenance.AlertAutoCloseDays), IncludeAcked: cfg.Maintenance.AutoCloseAckedAlerts, }) if err != nil { - return fmt.Errorf("find stale alerts: %w", err) + return false, fmt.Errorf("find stale alerts: %w", err) } var idsInt []int @@ -40,44 +36,26 @@ func (db *DB) CleanupAlerts(ctx context.Context, j *river.Job[AlertArgs]) error _, err = db.alertStore.UpdateManyAlertStatus(ctx, alert.StatusClosed, idsInt, alertlog.AutoClose{AlertAutoCloseDays: cfg.Maintenance.AlertAutoCloseDays}) if err != nil { - return fmt.Errorf("update alerts: %w", err) + return false, fmt.Errorf("update alerts: %w", err) } - count = int64(len(ids)) - return nil + return len(ids) < 100, nil }) if err != nil { return fmt.Errorf("auto close alerts: %w", err) } - if count < 100 { - // Assume we've closed all old alerts, since we got less than 100 (which is the max we close at once). - break - } - - err = util.ContextSleep(ctx, 100*time.Millisecond) - if err != nil { - return fmt.Errorf("auto close alerts: sleep: %w", err) - } } - for cfg.Maintenance.AlertCleanupDays > 0 { - var count int64 - err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { - var err error - count, err = gadb.New(tx).CleanupMgrDeleteOldAlerts(ctx, int64(cfg.Maintenance.AlertCleanupDays)) - return err + if cfg.Maintenance.AlertCleanupDays > 0 { + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDeleteOldAlerts(ctx, int64(cfg.Maintenance.AlertCleanupDays)) + if err != nil { + return false, fmt.Errorf("delete old alerts: %w", err) + } + return count < 100, nil }) if err != nil { - return fmt.Errorf("delete old alerts: %w", err) - } - if count < 100 { - // Assume we've deleted all old alerts, since we got less than 100 (which is the max we delete at once). - break - } - - err = util.ContextSleep(ctx, 100*time.Millisecond) - if err != nil { - return fmt.Errorf("delete old alerts: sleep: %w", err) + return err } } diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 5b8673fdea..bcbdbeaf77 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -2,17 +2,44 @@ package cleanupmanager import ( "context" + "database/sql" "fmt" "time" "github.com/riverqueue/river" "github.com/target/goalert/engine/processinglock" + "github.com/target/goalert/util" ) var _ processinglock.Setupable = &DB{} const QueueName = "cleanup-manager" +// whileWork will run the provided function in a loop until it returns done=true. +func (db *DB) whileWork(ctx context.Context, run func(ctx context.Context, tx *sql.Tx) (done bool, err error)) error { + var done bool + for { + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + var err error + done, err = run(ctx, tx) + return err + }) + if err != nil { + return fmt.Errorf("do work: %w", err) + } + if done { + break + } + + err = util.ContextSleep(ctx, 100*time.Millisecond) + if err != nil { + return fmt.Errorf("sleep: %w", err) + } + } + + return nil +} + func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) From 859194a5ba3da55995c8cb263d03ef2c36ec9eda Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 10 Dec 2024 11:14:19 -0600 Subject: [PATCH 06/34] cleanup: add comment --- engine/cleanupmanager/setup.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index bcbdbeaf77..9e121210f6 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -40,6 +40,7 @@ func (db *DB) whileWork(ctx context.Context, run func(ctx context.Context, tx *s return nil } +// Setup implements processinglock.Setupable. func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) From be2a5d14640041413b648fca9dc5a92d4ea7234c Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 10 Dec 2024 11:15:48 -0600 Subject: [PATCH 07/34] cleanup: remove unused cleanup statements from DB and update logic --- engine/cleanupmanager/db.go | 9 +-------- engine/cleanupmanager/update.go | 19 ------------------- 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 642d02ce2d..37c63526a9 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -27,10 +27,7 @@ type DB struct { cleanupAlertLogs *sql.Stmt - cleanupOverrides *sql.Stmt - cleanupSchedOnCall *sql.Stmt - cleanupEPOnCall *sql.Stmt - alertStore *alert.Store + alertStore *alert.Store logIndex int } @@ -88,10 +85,6 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error select id from scope offset 99 `), - cleanupOverrides: p.P(`DELETE FROM user_overrides WHERE id = ANY(SELECT id FROM user_overrides WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), - cleanupSchedOnCall: p.P(`DELETE FROM schedule_on_call_users WHERE id = ANY(SELECT id FROM schedule_on_call_users WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), - cleanupEPOnCall: p.P(`DELETE FROM ep_step_on_call_users WHERE id = ANY(SELECT id FROM ep_step_on_call_users WHERE end_time < (now() - $1::interval) LIMIT 100 FOR UPDATE SKIP LOCKED)`), - alertStore: alertstore, }, p.Err } diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go index ff3a3b9950..fb77932bef 100644 --- a/engine/cleanupmanager/update.go +++ b/engine/cleanupmanager/update.go @@ -63,25 +63,6 @@ func (db *DB) update(ctx context.Context) error { return err } } - if cfg.Maintenance.ScheduleCleanupDays > 0 { - var dur pgtype.Interval - dur.Days = int32(cfg.Maintenance.ScheduleCleanupDays) - dur.Status = pgtype.Present - _, err = tx.StmtContext(ctx, db.cleanupOverrides).ExecContext(ctx, &dur) - if err != nil { - return fmt.Errorf("cleanup overrides: %w", err) - } - - _, err = tx.StmtContext(ctx, db.cleanupSchedOnCall).ExecContext(ctx, &dur) - if err != nil { - return fmt.Errorf("cleanup schedule on-call: %w", err) - } - - _, err = tx.StmtContext(ctx, db.cleanupEPOnCall).ExecContext(ctx, &dur) - if err != nil { - return fmt.Errorf("cleanup escalation policy on-call: %w", err) - } - } rows, err := tx.StmtContext(ctx, db.schedData).QueryContext(ctx) if err != nil { From f3bf6ddcc9545e2ae4890ba88e220f48a9941b40 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 10 Dec 2024 14:16:16 -0600 Subject: [PATCH 08/34] test: refactor alert auto-close and cleanup tests for improved reliability --- test/smoke/alertautoclose_test.go | 23 ++++++++++++-------- test/smoke/alertcleanup_test.go | 31 ++++++++++++++------------- test/smoke/harness/harness.go | 35 +++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 23 deletions(-) diff --git a/test/smoke/alertautoclose_test.go b/test/smoke/alertautoclose_test.go index 5040db720f..244f0e38ff 100644 --- a/test/smoke/alertautoclose_test.go +++ b/test/smoke/alertautoclose_test.go @@ -3,6 +3,7 @@ package smoke import ( "encoding/json" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/target/goalert/test/smoke/harness" @@ -50,14 +51,18 @@ func TestAlertAutoClose(t *testing.T) { assert.Equal(t, "2", data.B.ID) assert.Equal(t, "StatusUnacknowledged", data.B.Status) - h.SetConfigValue("Maintenance.AlertAutoCloseDays", "1") + cfg := h.Config() + cfg.Maintenance.AlertAutoCloseDays = 1 + h.RestartGoAlertWithConfig(cfg) - res = h.GraphQLQuery2("{a:alert(id: 1){id, status} b:alert(id: 2){id, status}}") - assert.Empty(t, res.Errors, "errors") - err = json.Unmarshal(res.Data, &data) - assert.NoError(t, err) - assert.Equal(t, "1", data.A.ID) - assert.Equal(t, "StatusClosed", data.A.Status) - assert.Equal(t, "2", data.B.ID) - assert.Equal(t, "StatusUnacknowledged", data.B.Status) + assert.EventuallyWithT(t, func(t *assert.CollectT) { + res = h.GraphQLQuery2("{a:alert(id: 1){id, status} b:alert(id: 2){id, status}}") + assert.Empty(t, res.Errors, "errors") + err = json.Unmarshal(res.Data, &data) + assert.NoError(t, err) + assert.Equal(t, "1", data.A.ID) + assert.Equal(t, "StatusClosed", data.A.Status) + assert.Equal(t, "2", data.B.ID) + assert.Equal(t, "StatusUnacknowledged", data.B.Status) + }, 15*time.Second, time.Second) } diff --git a/test/smoke/alertcleanup_test.go b/test/smoke/alertcleanup_test.go index 5524d74533..cee510abd4 100644 --- a/test/smoke/alertcleanup_test.go +++ b/test/smoke/alertcleanup_test.go @@ -3,6 +3,7 @@ package smoke import ( "encoding/json" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/target/goalert/test/smoke/harness" @@ -42,20 +43,22 @@ func TestAlertCleanup(t *testing.T) { assert.Equal(t, "1", data.A.ID) assert.Equal(t, "2", data.B.ID) - h.SetConfigValue("Maintenance.AlertCleanupDays", "1") + cfg := h.Config() + cfg.Maintenance.AlertCleanupDays = 1 + h.RestartGoAlertWithConfig(cfg) - h.Trigger() + assert.EventuallyWithT(t, func(t *assert.CollectT) { + res = h.GraphQLQuery2("{a:alert(id: 1){id}}") + assert.Empty(t, res.Errors, "errors") + err = json.Unmarshal(res.Data, &data) + assert.NoError(t, err) + assert.Equal(t, "1", data.A.ID) - res = h.GraphQLQuery2("{a:alert(id: 1){id}}") - assert.Empty(t, res.Errors, "errors") - err = json.Unmarshal(res.Data, &data) - assert.NoError(t, err) - assert.Equal(t, "1", data.A.ID) - - res = h.GraphQLQuery2("{a:alert(id: 2){id}}") - assert.Empty(t, res.Errors, "errors") - err = json.Unmarshal(res.Data, &data) - assert.NoError(t, err) - // #2 should have been cleaned up - assert.Nil(t, data.A) + res = h.GraphQLQuery2("{a:alert(id: 2){id}}") + assert.Empty(t, res.Errors, "errors") + err = json.Unmarshal(res.Data, &data) + assert.NoError(t, err) + // #2 should have been cleaned up + assert.Nil(t, data.A) + }, 15*time.Second, time.Second) } diff --git a/test/smoke/harness/harness.go b/test/smoke/harness/harness.go index 38739fa6b3..6b49b060b6 100644 --- a/test/smoke/harness/harness.go +++ b/test/smoke/harness/harness.go @@ -88,6 +88,8 @@ type Harness struct { cfg config.Config + appCfg app.Config + email *emailServer slack *slackServer slackS *httptest.Server @@ -309,6 +311,7 @@ func (h *Harness) Start() { appCfg.SMTPListenAddr = "localhost:0" appCfg.EmailIntegrationDomain = "smoketest.example.com" appCfg.InitialConfig = &h.cfg + h.appCfg = appCfg r, w := io.Pipe() h.backendLogs = w @@ -343,6 +346,38 @@ func (h *Harness) Start() { } } +// RestartGoAlertWithConfig will restart the backend with the provided config. +func (h *Harness) RestartGoAlertWithConfig(cfg config.Config) { + h.t.Helper() + + h.t.Logf("Stopping backend for restart") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + err := h.backend.Shutdown(ctx) + if err != nil { + h.t.Error("failed to shutdown backend cleanly:", err) + } + + h.t.Logf("Restarting backend") + h.appCfg.InitialConfig = &cfg + h.backend, err = app.NewApp(h.appCfg, h.appPool) + if err != nil { + h.t.Fatalf("failed to start backend: %v", err) + } + h.slack.SetActionURL(h.slackApp.ClientID, h.backend.URL()+"/api/v2/slack/message-action") + + go func() { + assert.NoError(h.t, h.backend.Run(context.Background())) // can't use require.NoError because we're in the background + }() + err = h.backend.WaitForStartup(ctx) + if err != nil { + h.t.Fatalf("failed to start backend: %v", err) + } + + h.t.Logf("Backend restarted") +} + // URL returns the backend server's URL func (h *Harness) URL() string { return h.backend.URL() From eeea937d150868ff0bec2f754cc9615549839eb9 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 16 Dec 2024 13:19:13 -0600 Subject: [PATCH 09/34] fix: update ShiftArgs Kind to reflect cleanup-manager-shifts --- engine/cleanupmanager/shifts.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/cleanupmanager/shifts.go b/engine/cleanupmanager/shifts.go index 62da58030d..39354679b6 100644 --- a/engine/cleanupmanager/shifts.go +++ b/engine/cleanupmanager/shifts.go @@ -12,7 +12,7 @@ import ( type ShiftArgs struct{} -func (ShiftArgs) Kind() string { return "cleanup-manager-alerts" } +func (ShiftArgs) Kind() string { return "cleanup-manager-shifts" } // CleanupShifts will automatically cleanup old shift and override records. func (db *DB) CleanupShifts(ctx context.Context, j *river.Job[ShiftArgs]) error { From 26523b7ea94f5e5a840d867a547410a2f8e28aa3 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 16 Dec 2024 17:44:41 -0600 Subject: [PATCH 10/34] feat: add periodic jobs for schedule data cleanup and update queries --- engine/cleanupmanager/db.go | 19 +--- engine/cleanupmanager/queries.sql | 45 +++++++++ engine/cleanupmanager/scheddata.go | 143 +++++++++++++++++++++++++++++ engine/cleanupmanager/setup.go | 24 +++++ engine/cleanupmanager/update.go | 98 -------------------- gadb/queries.sql.go | 99 ++++++++++++++++++++ 6 files changed, 313 insertions(+), 115 deletions(-) create mode 100644 engine/cleanupmanager/scheddata.go diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 37c63526a9..21b7d15cc5 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -3,6 +3,7 @@ package cleanupmanager import ( "context" "database/sql" + "log/slog" "github.com/target/goalert/alert" "github.com/target/goalert/engine/processinglock" @@ -14,15 +15,9 @@ type DB struct { db *sql.DB lock *processinglock.Lock - now *sql.Stmt - - userIDs *sql.Stmt cleanupAPIKeys *sql.Stmt setTimeout *sql.Stmt - schedData *sql.Stmt - setSchedData *sql.Stmt - cleanupSessions *sql.Stmt cleanupAlertLogs *sql.Stmt @@ -30,6 +25,7 @@ type DB struct { alertStore *alert.Store logIndex int + logger *slog.Logger } // Name returns the name of the module. @@ -51,22 +47,11 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error db: db, lock: lock, - now: p.P(`select now()`), - userIDs: p.P(`select id from users`), - // Abort any cleanup operation that takes longer than 3 seconds // error will be logged. setTimeout: p.P(`SET LOCAL statement_timeout = 3000`), cleanupAPIKeys: p.P(`update user_calendar_subscriptions set disabled = true where id = any(select id from user_calendar_subscriptions where greatest(last_access, last_update) < (now() - $1::interval) order by id limit 100 for update skip locked)`), - schedData: p.P(` - select schedule_id, data from schedule_data - where data notnull and (last_cleanup_at isnull or last_cleanup_at <= now() - '1 month'::interval) - order by last_cleanup_at asc nulls first - for update skip locked - limit 100 - `), - setSchedData: p.P(`update schedule_data set last_cleanup_at = now(), data = $2 where schedule_id = $1`), cleanupSessions: p.P(`DELETE FROM auth_user_sessions WHERE id = any(select id from auth_user_sessions where last_access_at < (now() - '30 days'::interval) LIMIT 100 for update skip locked)`), cleanupAlertLogs: p.P(` diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index 5c3c6576a0..43328f3c14 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -77,3 +77,48 @@ WHERE id = ANY ( FOR UPDATE SKIP LOCKED); +-- name: CleanupMgrScheduleData :one +-- CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. +SELECT + schedule_id, + data +FROM + schedule_data +WHERE + data NOTNULL + AND (last_cleanup_at ISNULL + OR last_cleanup_at <= now() - '1 month'::interval) +ORDER BY + last_cleanup_at ASC nulls FIRST +FOR UPDATE + SKIP LOCKED +LIMIT 1; + +-- name: CleanupMgrUpdateScheduleData :exec +-- CleanupMgrUpdateScheduleData will update the last_cleanup_at and data fields in the schedule_data table. +UPDATE + schedule_data +SET + last_cleanup_at = now(), + data = $2 +WHERE + schedule_id = $1; + +-- name: CleanupMgrScheduleDataSkip :exec +-- CleanupMgrScheduleDataSkip will update the last_cleanup_at field in the schedule_data table. +UPDATE + schedule_data +SET + last_cleanup_at = now() +WHERE + schedule_id = $1; + +-- name: CleanupMgrVerifyUsers :many +-- CleanupMgrVerifyUsers will verify that the given user ids exist in the users table. +SELECT + id +FROM + users +WHERE + id = ANY (sqlc.arg(user_ids)::uuid[]); + diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go new file mode 100644 index 0000000000..670d6fd0f2 --- /dev/null +++ b/engine/cleanupmanager/scheddata.go @@ -0,0 +1,143 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "log/slog" + "slices" + + "github.com/google/uuid" + "github.com/riverqueue/river" + "github.com/target/goalert/config" + "github.com/target/goalert/gadb" + "github.com/target/goalert/schedule" +) + +type SchedDataArgs struct{} + +func (SchedDataArgs) Kind() string { return "cleanup-manager-sched-data" } + +// CleanupScheduleData will automatically cleanup schedule data. +// - Remove temporary-schedule shifts for users that no longer exist. +// - Remove temporary-schedule shifts that occur in the past. +func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArgs]) error { + cfg := config.FromContext(ctx) + if cfg.Maintenance.ScheduleCleanupDays <= 0 { + return nil + } + + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + dataRow, err := gadb.New(tx).CleanupMgrScheduleData(ctx) + if errors.Is(err, sql.ErrNoRows) { + return true, nil + } + if err != nil { + return false, fmt.Errorf("get schedule data: %w", err) + } + + var data schedule.Data + err = json.Unmarshal(dataRow.Data, &data) + if err != nil { + db.logger.ErrorContext(ctx, "failed to unmarshal schedule data, skipping.", slog.String("error", err.Error()), slog.String("schedule_id", dataRow.ScheduleID.String())) + + // Mark as skipped so we don't keep trying to process it. + return false, gadb.New(tx).CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) + } + + now, err := gadb.New(tx).Now(ctx) + if err != nil { + return false, fmt.Errorf("get current time: %w", err) + } + + var users []uuid.UUID + var changed bool + newTempSched := data.V1.TemporarySchedules[:0] + for _, sched := range data.V1.TemporarySchedules { + if sched.End.Before(now) { + changed = true + continue + } + cleanShifts := sched.Shifts[:0] + for _, shift := range sched.Shifts { + if shift.End.Before(now) { + changed = true + continue + } + id, err := uuid.Parse(shift.UserID) + if err != nil { + changed = true + // invalid shift, delete it + continue + } + + cleanShifts = append(cleanShifts, shift) + if slices.Contains(users, id) { + continue + } + + users = append(users, id) + } + if len(cleanShifts) == 0 { + changed = true + continue + } + sched.Shifts = cleanShifts + newTempSched = append(newTempSched, sched) + } + data.V1.TemporarySchedules = newTempSched + + validUsers, err := gadb.New(tx).CleanupMgrVerifyUsers(ctx, users) + if err != nil { + return false, fmt.Errorf("verify users: %w", err) + } + + // repeat loop, but this time validating users + + newTempSched = data.V1.TemporarySchedules[:0] + for _, temp := range data.V1.TemporarySchedules { + cleanShifts := temp.Shifts[:0] + for _, shift := range temp.Shifts { + id, err := uuid.Parse(shift.UserID) + if err != nil { + changed = true + // invalid shift, delete it + continue + } + if !slices.Contains(validUsers, id) { + changed = true + continue + } + cleanShifts = append(cleanShifts, shift) + } + if len(cleanShifts) == 0 { + changed = true + continue + } + temp.Shifts = cleanShifts + newTempSched = append(newTempSched, temp) + } + data.V1.TemporarySchedules = newTempSched + + if !changed { + return false, gadb.New(tx).CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) + } + + rawData, err := json.Marshal(data) + if err != nil { + return false, fmt.Errorf("marshal schedule data: %w", err) + } + + return false, gadb.New(tx).CleanupMgrUpdateScheduleData(ctx, gadb.CleanupMgrUpdateScheduleDataParams{ + ScheduleID: dataRow.ScheduleID, + Data: rawData, + }) + }) + if err != nil { + return err + } + + return nil +} diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 9e121210f6..9c86dd04ed 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -61,5 +61,29 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { ), }) + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return ShiftArgs{}, &river.InsertOpts{ + Queue: QueueName, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(7*24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return SchedDataArgs{}, &river.InsertOpts{ + Queue: QueueName, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + return nil } diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go index fb77932bef..6c4dd53982 100644 --- a/engine/cleanupmanager/update.go +++ b/engine/cleanupmanager/update.go @@ -3,16 +3,12 @@ package cleanupmanager import ( "context" "database/sql" - "encoding/json" "errors" "fmt" - "time" "github.com/jackc/pgtype" "github.com/target/goalert/config" "github.com/target/goalert/permission" - "github.com/target/goalert/schedule" - "github.com/target/goalert/util/jsonutil" "github.com/target/goalert/util/log" "github.com/target/goalert/util/sqlutil" ) @@ -41,12 +37,6 @@ func (db *DB) update(ctx context.Context) error { return fmt.Errorf("set timeout: %w", err) } - var now time.Time - err = tx.StmtContext(ctx, db.now).QueryRowContext(ctx).Scan(&now) - if err != nil { - return err - } - _, err = tx.StmtContext(ctx, db.cleanupSessions).ExecContext(ctx) if err != nil { return fmt.Errorf("cleanup sessions: %w", err) @@ -64,51 +54,6 @@ func (db *DB) update(ctx context.Context) error { } } - rows, err := tx.StmtContext(ctx, db.schedData).QueryContext(ctx) - if err != nil { - return err - } - defer rows.Close() - - type schedData struct { - ID string - Data schedule.Data - Raw json.RawMessage - } - var m []schedData - for rows.Next() { - var data schedData - err = rows.Scan(&data.ID, &data.Raw) - if err != nil { - return err - } - err = json.Unmarshal(data.Raw, &data.Data) - if err != nil { - return err - } - m = append(m, data) - } - var currentUsers []string - if len(m) > 0 { - currentUsers, err = db.getUsers(ctx, tx) - if err != nil { - return err - } - } - lookup := lookupMap(currentUsers) - schedCuttoff := now.AddDate(-1, 0, 0) - for _, dat := range m { - cleanupScheduleData(&dat.Data, lookup, schedCuttoff) - rawData, err := jsonutil.Apply(dat.Raw, dat.Data) - if err != nil { - return err - } - _, err = tx.StmtContext(ctx, db.setSchedData).ExecContext(ctx, dat.ID, rawData) - if err != nil { - return fmt.Errorf("cleanup api keys: %w", err) - } - } - err = tx.StmtContext(ctx, db.cleanupAlertLogs).QueryRowContext(ctx, db.logIndex).Scan(&db.logIndex) if errors.Is(err, sql.ErrNoRows) { // repeat @@ -121,46 +66,3 @@ func (db *DB) update(ctx context.Context) error { return tx.Commit() } - -func lookupMap(users []string) map[string]struct{} { - userLookup := make(map[string]struct{}, len(users)) - for _, id := range users { - userLookup[id] = struct{}{} - } - return userLookup -} - -func cleanupScheduleData(data *schedule.Data, userMap map[string]struct{}, cutoff time.Time) { - filtered := data.V1.TemporarySchedules[:0] - for _, temp := range data.V1.TemporarySchedules { - if temp.End.Before(cutoff) { - continue - } - filtered = append(filtered, temp) - } - data.V1.TemporarySchedules = filtered -} - -// getUsers retrieves the current set of user IDs -func (db *DB) getUsers(ctx context.Context, tx *sql.Tx) ([]string, error) { - rows, err := tx.StmtContext(ctx, db.userIDs).QueryContext(ctx) - if err != nil { - return nil, err - } - defer rows.Close() - if err == sql.ErrNoRows { - return nil, nil - } - - var users []string - var id string - for rows.Next() { - err = rows.Scan(&id) - if err != nil { - return nil, err - } - users = append(users, id) - } - - return users, nil -} diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 0d000523b4..55f218ef64 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -995,6 +995,105 @@ func (q *Queries) CleanupMgrFindStaleAlerts(ctx context.Context, arg CleanupMgrF return items, nil } +const cleanupMgrScheduleData = `-- name: CleanupMgrScheduleData :one +SELECT + schedule_id, + data +FROM + schedule_data +WHERE + data NOTNULL + AND (last_cleanup_at ISNULL + OR last_cleanup_at <= now() - '1 month'::interval) +ORDER BY + last_cleanup_at ASC nulls FIRST +FOR UPDATE + SKIP LOCKED +LIMIT 1 +` + +type CleanupMgrScheduleDataRow struct { + ScheduleID uuid.UUID + Data json.RawMessage +} + +// CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. +func (q *Queries) CleanupMgrScheduleData(ctx context.Context) (CleanupMgrScheduleDataRow, error) { + row := q.db.QueryRowContext(ctx, cleanupMgrScheduleData) + var i CleanupMgrScheduleDataRow + err := row.Scan(&i.ScheduleID, &i.Data) + return i, err +} + +const cleanupMgrScheduleDataSkip = `-- name: CleanupMgrScheduleDataSkip :exec +UPDATE + schedule_data +SET + last_cleanup_at = now() +WHERE + schedule_id = $1 +` + +// CleanupMgrScheduleDataSkip will update the last_cleanup_at field in the schedule_data table. +func (q *Queries) CleanupMgrScheduleDataSkip(ctx context.Context, scheduleID uuid.UUID) error { + _, err := q.db.ExecContext(ctx, cleanupMgrScheduleDataSkip, scheduleID) + return err +} + +const cleanupMgrUpdateScheduleData = `-- name: CleanupMgrUpdateScheduleData :exec +UPDATE + schedule_data +SET + last_cleanup_at = now(), + data = $2 +WHERE + schedule_id = $1 +` + +type CleanupMgrUpdateScheduleDataParams struct { + ScheduleID uuid.UUID + Data json.RawMessage +} + +// CleanupMgrUpdateScheduleData will update the last_cleanup_at and data fields in the schedule_data table. +func (q *Queries) CleanupMgrUpdateScheduleData(ctx context.Context, arg CleanupMgrUpdateScheduleDataParams) error { + _, err := q.db.ExecContext(ctx, cleanupMgrUpdateScheduleData, arg.ScheduleID, arg.Data) + return err +} + +const cleanupMgrVerifyUsers = `-- name: CleanupMgrVerifyUsers :many +SELECT + id +FROM + users +WHERE + id = ANY ($1::uuid[]) +` + +// CleanupMgrVerifyUsers will verify that the given user ids exist in the users table. +func (q *Queries) CleanupMgrVerifyUsers(ctx context.Context, userIds []uuid.UUID) ([]uuid.UUID, error) { + rows, err := q.db.QueryContext(ctx, cleanupMgrVerifyUsers, pq.Array(userIds)) + if err != nil { + return nil, err + } + defer rows.Close() + var items []uuid.UUID + for rows.Next() { + var id uuid.UUID + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const connectionInfo = `-- name: ConnectionInfo :many SELECT application_name AS NAME, COUNT(*) From 2a2888778fdd94e78a369750b3c54a0bd2441135 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 16 Dec 2024 17:45:23 -0600 Subject: [PATCH 11/34] feat: add workers for cleanup of shifts and schedule data --- engine/cleanupmanager/setup.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 9c86dd04ed..68b456e1ed 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -43,6 +43,8 @@ func (db *DB) whileWork(ctx context.Context, run func(ctx context.Context, tx *s // Setup implements processinglock.Setupable. func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData)) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 2}) if err != nil { From f8c555de2d7fcdd4d197fe86a0447f08b6f22ebc Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 10:08:50 -0600 Subject: [PATCH 12/34] fix: update periodic job interval for schedule data cleanup to 24 hours --- engine/cleanupmanager/setup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 68b456e1ed..a2ea7cd095 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -77,7 +77,7 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ river.NewPeriodicJob( - river.PeriodicInterval(7*24*time.Hour), + river.PeriodicInterval(24*time.Hour), func() (river.JobArgs, *river.InsertOpts) { return SchedDataArgs{}, &river.InsertOpts{ Queue: QueueName, From 9ee4383371f6cc89c87c630a65b817fc3669c975 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 10:19:28 -0600 Subject: [PATCH 13/34] feat: add logging support to cleanup manager and engine initialization --- app/initengine.go | 1 + engine/cleanupmanager/db.go | 7 ++++--- engine/config.go | 2 ++ engine/engine.go | 3 ++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/app/initengine.go b/app/initengine.go index 03efe31816..51e7014c3e 100644 --- a/app/initengine.go +++ b/app/initengine.go @@ -56,6 +56,7 @@ func (app *App) initEngine(ctx context.Context) error { River: app.River, RiverDBSQL: app.RiverDBSQL, RiverWorkers: app.RiverWorkers, + Logger: app.Logger, }) if err != nil { return errors.Wrap(err, "init engine") diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 21b7d15cc5..56f37e8a1a 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -32,7 +32,7 @@ type DB struct { func (db *DB) Name() string { return "Engine.CleanupManager" } // NewDB creates a new DB. -func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error) { +func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store, log *slog.Logger) (*DB, error) { lock, err := processinglock.NewLock(ctx, db, processinglock.Config{ Version: 1, Type: processinglock.TypeCleanup, @@ -44,8 +44,9 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store) (*DB, error p := &util.Prepare{Ctx: ctx, DB: db} return &DB{ - db: db, - lock: lock, + db: db, + lock: lock, + logger: log, // Abort any cleanup operation that takes longer than 3 seconds // error will be logged. diff --git a/engine/config.go b/engine/config.go index 68250418c0..8f4e2a28e7 100644 --- a/engine/config.go +++ b/engine/config.go @@ -2,6 +2,7 @@ package engine import ( "database/sql" + "log/slog" "time" "github.com/jackc/pgx/v5" @@ -49,6 +50,7 @@ type Config struct { DisableCycle bool LogCycles bool + Logger *slog.Logger CycleTime time.Duration } diff --git a/engine/engine.go b/engine/engine.go index b77c3e68a8..1eac751e73 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "log/slog" "strings" "time" @@ -122,7 +123,7 @@ func NewEngine(ctx context.Context, db *sql.DB, c *Config) (*Engine, error) { if err != nil { return nil, errors.Wrap(err, "heartbeat processing backend") } - cleanMgr, err := cleanupmanager.NewDB(ctx, db, c.AlertStore) + cleanMgr, err := cleanupmanager.NewDB(ctx, db, c.AlertStore, c.Logger.With(slog.String("module", "cleanup"))) if err != nil { return nil, errors.Wrap(err, "cleanup backend") } From 8494e919a4577895949c9a8fe97db5f037ddd20a Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 10:44:28 -0600 Subject: [PATCH 14/34] refactor: streamline schedule data cleanup by extracting user validation and shift trimming logic --- engine/cleanupmanager/scheddata.go | 134 +++++++++++++++-------------- 1 file changed, 70 insertions(+), 64 deletions(-) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index 670d6fd0f2..25fe6b8934 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -8,6 +8,7 @@ import ( "fmt" "log/slog" "slices" + "time" "github.com/google/uuid" "github.com/riverqueue/river" @@ -52,75 +53,13 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return false, fmt.Errorf("get current time: %w", err) } - var users []uuid.UUID - var changed bool - newTempSched := data.V1.TemporarySchedules[:0] - for _, sched := range data.V1.TemporarySchedules { - if sched.End.Before(now) { - changed = true - continue - } - cleanShifts := sched.Shifts[:0] - for _, shift := range sched.Shifts { - if shift.End.Before(now) { - changed = true - continue - } - id, err := uuid.Parse(shift.UserID) - if err != nil { - changed = true - // invalid shift, delete it - continue - } - - cleanShifts = append(cleanShifts, shift) - if slices.Contains(users, id) { - continue - } - - users = append(users, id) - } - if len(cleanShifts) == 0 { - changed = true - continue - } - sched.Shifts = cleanShifts - newTempSched = append(newTempSched, sched) - } - data.V1.TemporarySchedules = newTempSched - + changed, users := trimExpiredShifts(&data, now) validUsers, err := gadb.New(tx).CleanupMgrVerifyUsers(ctx, users) if err != nil { return false, fmt.Errorf("verify users: %w", err) } - // repeat loop, but this time validating users - - newTempSched = data.V1.TemporarySchedules[:0] - for _, temp := range data.V1.TemporarySchedules { - cleanShifts := temp.Shifts[:0] - for _, shift := range temp.Shifts { - id, err := uuid.Parse(shift.UserID) - if err != nil { - changed = true - // invalid shift, delete it - continue - } - if !slices.Contains(validUsers, id) { - changed = true - continue - } - cleanShifts = append(cleanShifts, shift) - } - if len(cleanShifts) == 0 { - changed = true - continue - } - temp.Shifts = cleanShifts - newTempSched = append(newTempSched, temp) - } - data.V1.TemporarySchedules = newTempSched - + changed = trimInvalidUsers(&data, validUsers) || changed if !changed { return false, gadb.New(tx).CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) } @@ -141,3 +80,70 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return nil } + +func trimInvalidUsers(data *schedule.Data, validUsers []uuid.UUID) (changed bool) { + newTempSched := data.V1.TemporarySchedules[:0] + for _, temp := range data.V1.TemporarySchedules { + cleanShifts := temp.Shifts[:0] + for _, shift := range temp.Shifts { + id, err := uuid.Parse(shift.UserID) + if err != nil { + changed = true + // invalid shift, delete it + continue + } + if !slices.Contains(validUsers, id) { + changed = true + continue + } + cleanShifts = append(cleanShifts, shift) + } + if len(cleanShifts) == 0 { + changed = true + continue + } + temp.Shifts = cleanShifts + newTempSched = append(newTempSched, temp) + } + data.V1.TemporarySchedules = newTempSched + return changed +} + +// trimExpiredShifts will trim any past shifts and collect all remaining user IDs. +func trimExpiredShifts(data *schedule.Data, cutoff time.Time) (changed bool, users []uuid.UUID) { + newTempSched := data.V1.TemporarySchedules[:0] + for _, sched := range data.V1.TemporarySchedules { + if sched.End.Before(cutoff) { + changed = true + continue + } + cleanShifts := sched.Shifts[:0] + for _, shift := range sched.Shifts { + if shift.End.Before(cutoff) { + changed = true + continue + } + id, err := uuid.Parse(shift.UserID) + if err != nil { + changed = true + // invalid shift, delete it + continue + } + + cleanShifts = append(cleanShifts, shift) + if slices.Contains(users, id) { + continue + } + + users = append(users, id) + } + if len(cleanShifts) == 0 { + changed = true + continue + } + sched.Shifts = cleanShifts + newTempSched = append(newTempSched, sched) + } + data.V1.TemporarySchedules = newTempSched + return changed, users +} From fb7a78246c5b0dbecfea8f697d6889d52044dbff Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 10:57:40 -0600 Subject: [PATCH 15/34] feat: add logging for schedule data updates in cleanup manager --- engine/cleanupmanager/scheddata.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index 25fe6b8934..df80059deb 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -69,6 +69,7 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return false, fmt.Errorf("marshal schedule data: %w", err) } + db.logger.InfoContext(ctx, "Updated schedule data.", slog.String("schedule_id", dataRow.ScheduleID.String())) return false, gadb.New(tx).CleanupMgrUpdateScheduleData(ctx, gadb.CleanupMgrUpdateScheduleDataParams{ ScheduleID: dataRow.ScheduleID, Data: rawData, From fdad66de74b9fd6c6de9f38842e1ce166281a73c Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:00:07 -0600 Subject: [PATCH 16/34] refactor: remove unnecessary checks for empty shifts in user and shift trimming functions --- engine/cleanupmanager/scheddata.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index df80059deb..c19ddfece5 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -99,10 +99,6 @@ func trimInvalidUsers(data *schedule.Data, validUsers []uuid.UUID) (changed bool } cleanShifts = append(cleanShifts, shift) } - if len(cleanShifts) == 0 { - changed = true - continue - } temp.Shifts = cleanShifts newTempSched = append(newTempSched, temp) } @@ -138,10 +134,6 @@ func trimExpiredShifts(data *schedule.Data, cutoff time.Time) (changed bool, use users = append(users, id) } - if len(cleanShifts) == 0 { - changed = true - continue - } sched.Shifts = cleanShifts newTempSched = append(newTempSched, sched) } From 2c9ffe6609817e14833d37120022a495d076ffa9 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:09:45 -0600 Subject: [PATCH 17/34] refactor: enhance schedule data cleanup by improving user validation and logging --- engine/cleanupmanager/scheddata.go | 65 ++++++++++++++---------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index c19ddfece5..c251cdeef2 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -38,30 +38,36 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg if err != nil { return false, fmt.Errorf("get schedule data: %w", err) } + log := db.logger.With(slog.String("schedule_id", dataRow.ScheduleID.String())) + gdb := gadb.New(tx) var data schedule.Data err = json.Unmarshal(dataRow.Data, &data) if err != nil { - db.logger.ErrorContext(ctx, "failed to unmarshal schedule data, skipping.", slog.String("error", err.Error()), slog.String("schedule_id", dataRow.ScheduleID.String())) + log.ErrorContext(ctx, + "failed to unmarshal schedule data, skipping.", + slog.String("error", err.Error())) // Mark as skipped so we don't keep trying to process it. - return false, gadb.New(tx).CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) + return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) } - now, err := gadb.New(tx).Now(ctx) - if err != nil { - return false, fmt.Errorf("get current time: %w", err) + users := collectUsers(data) + var validUsers []uuid.UUID + if len(users) > 0 { + validUsers, err = gdb.CleanupMgrVerifyUsers(ctx, users) + if err != nil { + return false, fmt.Errorf("lookup valid users: %w", err) + } } - changed, users := trimExpiredShifts(&data, now) - validUsers, err := gadb.New(tx).CleanupMgrVerifyUsers(ctx, users) + now, err := gdb.Now(ctx) if err != nil { - return false, fmt.Errorf("verify users: %w", err) + return false, fmt.Errorf("get current time: %w", err) } - - changed = trimInvalidUsers(&data, validUsers) || changed + changed := cleanupData(&data, validUsers, now) if !changed { - return false, gadb.New(tx).CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) + return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) } rawData, err := json.Marshal(data) @@ -69,8 +75,8 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return false, fmt.Errorf("marshal schedule data: %w", err) } - db.logger.InfoContext(ctx, "Updated schedule data.", slog.String("schedule_id", dataRow.ScheduleID.String())) - return false, gadb.New(tx).CleanupMgrUpdateScheduleData(ctx, gadb.CleanupMgrUpdateScheduleDataParams{ + log.InfoContext(ctx, "Updated schedule data.") + return false, gdb.CleanupMgrUpdateScheduleData(ctx, gadb.CleanupMgrUpdateScheduleDataParams{ ScheduleID: dataRow.ScheduleID, Data: rawData, }) @@ -82,15 +88,20 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return nil } -func trimInvalidUsers(data *schedule.Data, validUsers []uuid.UUID) (changed bool) { +func cleanupData(data *schedule.Data, validUsers []uuid.UUID, now time.Time) (changed bool) { newTempSched := data.V1.TemporarySchedules[:0] for _, temp := range data.V1.TemporarySchedules { + if temp.End.Before(now) { + changed = true + continue + } + cleanShifts := temp.Shifts[:0] for _, shift := range temp.Shifts { id, err := uuid.Parse(shift.UserID) if err != nil { changed = true - // invalid shift, delete it + // invalid user id/shift, delete it continue } if !slices.Contains(validUsers, id) { @@ -106,37 +117,23 @@ func trimInvalidUsers(data *schedule.Data, validUsers []uuid.UUID) (changed bool return changed } -// trimExpiredShifts will trim any past shifts and collect all remaining user IDs. -func trimExpiredShifts(data *schedule.Data, cutoff time.Time) (changed bool, users []uuid.UUID) { - newTempSched := data.V1.TemporarySchedules[:0] +// collectUsers will collect all user ids from the schedule data. +func collectUsers(data schedule.Data) (users []uuid.UUID) { for _, sched := range data.V1.TemporarySchedules { - if sched.End.Before(cutoff) { - changed = true - continue - } - cleanShifts := sched.Shifts[:0] for _, shift := range sched.Shifts { - if shift.End.Before(cutoff) { - changed = true - continue - } id, err := uuid.Parse(shift.UserID) if err != nil { - changed = true - // invalid shift, delete it + // invalid id, skip it continue } - cleanShifts = append(cleanShifts, shift) if slices.Contains(users, id) { continue } users = append(users, id) } - sched.Shifts = cleanShifts - newTempSched = append(newTempSched, sched) } - data.V1.TemporarySchedules = newTempSched - return changed, users + + return users } From 8592ff8e29162b2a5909bf658e530521191aedb0 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:11:28 -0600 Subject: [PATCH 18/34] refactor: improve formatting of schedule data update call in cleanup manager --- engine/cleanupmanager/scheddata.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index c251cdeef2..fdf8614971 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -76,10 +76,11 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg } log.InfoContext(ctx, "Updated schedule data.") - return false, gdb.CleanupMgrUpdateScheduleData(ctx, gadb.CleanupMgrUpdateScheduleDataParams{ - ScheduleID: dataRow.ScheduleID, - Data: rawData, - }) + return false, gdb.CleanupMgrUpdateScheduleData(ctx, + gadb.CleanupMgrUpdateScheduleDataParams{ + ScheduleID: dataRow.ScheduleID, + Data: rawData, + }) }) if err != nil { return err From 86f7b84fab4eaad550b35f461017b56b61c42ae8 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:12:39 -0600 Subject: [PATCH 19/34] docs: add comment to clarify cleanupData function purpose in schedule data management --- engine/cleanupmanager/scheddata.go | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index fdf8614971..28a20f3c33 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -89,6 +89,7 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return nil } +// cleanupData will cleanup the schedule data, removing temporary-schedules that occur in the past; and removing shifts for users that no longer exist. func cleanupData(data *schedule.Data, validUsers []uuid.UUID, now time.Time) (changed bool) { newTempSched := data.V1.TemporarySchedules[:0] for _, temp := range data.V1.TemporarySchedules { From 0899d6c2886d4278d84002140822df72c5e1b6f3 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:33:59 -0600 Subject: [PATCH 20/34] feat: add CleanupAlertLogs function to manage alert log entries for deleted alerts --- engine/cleanupmanager/alertlogs.go | 42 +++++++++++++++++++ engine/cleanupmanager/queries.sql | 52 ++++++++++++++++++++++++ engine/cleanupmanager/setup.go | 13 ++++++ gadb/queries.sql.go | 65 ++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+) create mode 100644 engine/cleanupmanager/alertlogs.go diff --git a/engine/cleanupmanager/alertlogs.go b/engine/cleanupmanager/alertlogs.go new file mode 100644 index 0000000000..c0b6b10734 --- /dev/null +++ b/engine/cleanupmanager/alertlogs.go @@ -0,0 +1,42 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/riverqueue/river" + "github.com/target/goalert/gadb" +) + +type AlertLogArgs struct{} + +func (AlertLogArgs) Kind() string { return "cleanup-manager-alert-logs" } + +// CleanupAlertLogs will remove alert log entries for deleted alerts. +func (db *DB) CleanupAlertLogs(ctx context.Context, j *river.Job[AlertLogArgs]) error { + var lastID int64 // start at zero, we will scan _all_ logs + + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + db.logger.DebugContext(ctx, "Cleaning up alert logs...", "lastID", lastID) + lastID, err = gadb.New(tx).CleanupAlertLogs(ctx, + gadb.CleanupAlertLogsParams{ + BatchSize: 10000, + AfterID: lastID, + }) + if errors.Is(err, sql.ErrNoRows) { + return true, nil + } + if err != nil { + return false, fmt.Errorf("cleanup alert logs: %w", err) + } + + return false, nil + }) + if err != nil { + return fmt.Errorf("cleanup alert logs: %w", err) + } + + return nil +} diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index 43328f3c14..9b94c00a5c 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -122,3 +122,55 @@ FROM WHERE id = ANY (sqlc.arg(user_ids)::uuid[]); +-- name: CleanupAlertLogs :one +WITH scope AS ( + SELECT + id + FROM + alert_logs l + WHERE + l.id > @after_id + ORDER BY + l.id + LIMIT @batch_size +), +id_range AS ( + SELECT + min(id), + max(id) + FROM + scope +), +_delete AS ( + DELETE FROM alert_logs + WHERE id = ANY ( + SELECT + id + FROM + alert_logs + WHERE + id BETWEEN ( + SELECT + min + FROM + id_range) + AND ( + SELECT + max + FROM + id_range) + AND NOT EXISTS ( + SELECT + 1 + FROM + alerts + WHERE + alert_id = id) + FOR UPDATE + SKIP LOCKED)) + SELECT + id + FROM + scope OFFSET @batch_size - 1 + LIMIT 1; + diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index a2ea7cd095..eac9e9a311 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -45,6 +45,7 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs)) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 2}) if err != nil { @@ -87,5 +88,17 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { ), }) + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(7*24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return AlertLogArgs{}, &river.InsertOpts{ + Queue: QueueName, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + return nil } diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 55f218ef64..30e110e4a2 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -851,6 +851,71 @@ func (q *Queries) CalSubUserNames(ctx context.Context, dollar_1 []uuid.UUID) ([] return items, nil } +const cleanupAlertLogs = `-- name: CleanupAlertLogs :one +WITH scope AS ( + SELECT + id + FROM + alert_logs l + WHERE + l.id > $2 + ORDER BY + l.id + LIMIT $3 +), +id_range AS ( + SELECT + min(id), + max(id) + FROM + scope +), +_delete AS ( + DELETE FROM alert_logs + WHERE id = ANY ( + SELECT + id + FROM + alert_logs + WHERE + id BETWEEN ( + SELECT + min + FROM + id_range) + AND ( + SELECT + max + FROM + id_range) + AND NOT EXISTS ( + SELECT + 1 + FROM + alerts + WHERE + alert_id = id) + FOR UPDATE + SKIP LOCKED)) + SELECT + id + FROM + scope OFFSET $1- 1 + LIMIT 1 +` + +type CleanupAlertLogsParams struct { + BatchSize int32 + AfterID int64 +} + +func (q *Queries) CleanupAlertLogs(ctx context.Context, arg CleanupAlertLogsParams) (int64, error) { + row := q.db.QueryRowContext(ctx, cleanupAlertLogs, arg.BatchSize, arg.AfterID, arg.BatchSize) + var id int64 + err := row.Scan(&id) + return id, err +} + const cleanupMgrDeleteOldAlerts = `-- name: CleanupMgrDeleteOldAlerts :execrows DELETE FROM alerts WHERE id = ANY ( From df8cdcb1b4e5e78a9c13bd0631152d092c560c72 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:34:09 -0600 Subject: [PATCH 21/34] refactor: remove unused cleanupAlertLogs statement and related logic from cleanup manager --- engine/cleanupmanager/db.go | 21 +-------------------- engine/cleanupmanager/update.go | 12 ------------ 2 files changed, 1 insertion(+), 32 deletions(-) diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 56f37e8a1a..605f0c855a 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -20,12 +20,9 @@ type DB struct { cleanupSessions *sql.Stmt - cleanupAlertLogs *sql.Stmt - alertStore *alert.Store - logIndex int - logger *slog.Logger + logger *slog.Logger } // Name returns the name of the module. @@ -55,22 +52,6 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store, log *slog.L cleanupSessions: p.P(`DELETE FROM auth_user_sessions WHERE id = any(select id from auth_user_sessions where last_access_at < (now() - '30 days'::interval) LIMIT 100 for update skip locked)`), - cleanupAlertLogs: p.P(` - with - scope as (select id from alert_logs where id > $1 order by id limit 100), - id_range as (select min(id), max(id) from scope), - _delete as ( - delete from alert_logs where id = any( - select id from alert_logs - where - id between (select min from id_range) and (select max from id_range) and - not exists (select 1 from alerts where alert_id = id) - for update skip locked - ) - ) - select id from scope offset 99 - `), - alertStore: alertstore, }, p.Err } diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go index 6c4dd53982..3e0ffc817b 100644 --- a/engine/cleanupmanager/update.go +++ b/engine/cleanupmanager/update.go @@ -2,8 +2,6 @@ package cleanupmanager import ( "context" - "database/sql" - "errors" "fmt" "github.com/jackc/pgtype" @@ -54,15 +52,5 @@ func (db *DB) update(ctx context.Context) error { } } - err = tx.StmtContext(ctx, db.cleanupAlertLogs).QueryRowContext(ctx, db.logIndex).Scan(&db.logIndex) - if errors.Is(err, sql.ErrNoRows) { - // repeat - db.logIndex = 0 - err = nil - } - if err != nil { - return fmt.Errorf("cleanup alert_logs: %w", err) - } - return tx.Commit() } From 634bd2be4aa418cb67bdb2a01bcbfc2814306796 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 11:49:25 -0600 Subject: [PATCH 22/34] feat: implement timeout for CleanupAlertLogs worker to handle longer job durations --- app/initriver.go | 10 ++-------- engine/cleanupmanager/alertlogs.go | 11 +++++++++++ engine/cleanupmanager/setup.go | 7 ++++++- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/app/initriver.go b/app/initriver.go index 8d6d8da522..969f5f34e0 100644 --- a/app/initriver.go +++ b/app/initriver.go @@ -3,6 +3,7 @@ package app import ( "context" "log/slog" + "time" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverdatabasesql" @@ -84,14 +85,6 @@ func (w workerMiddlewareFunc) Work(ctx context.Context, job *rivertype.JobRow, d func (app *App) initRiver(ctx context.Context) error { app.RiverWorkers = river.NewWorkers() - // TODO: remove once a worker is added that's not behind a feature flag - // - // Without this, it will complain about no workers being registered. - river.AddWorker(app.RiverWorkers, river.WorkFunc(func(ctx context.Context, j *river.Job[noopWorker]) error { - // Do something with the job - return nil - })) - var err error app.River, err = river.NewClient(riverpgxv5.New(app.pgx), &river.Config{ // River tends to log "context canceled" errors while shutting down @@ -100,6 +93,7 @@ func (app *App) initRiver(ctx context.Context) error { Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, + RescueStuckJobsAfter: 5 * time.Minute, WorkerMiddleware: []rivertype.WorkerMiddleware{ workerMiddlewareFunc(func(ctx context.Context, doInner func(ctx context.Context) error) error { // Ensure config is set in the context for all workers. diff --git a/engine/cleanupmanager/alertlogs.go b/engine/cleanupmanager/alertlogs.go index c0b6b10734..6a678b8484 100644 --- a/engine/cleanupmanager/alertlogs.go +++ b/engine/cleanupmanager/alertlogs.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/riverqueue/river" "github.com/target/goalert/gadb" @@ -14,6 +15,16 @@ type AlertLogArgs struct{} func (AlertLogArgs) Kind() string { return "cleanup-manager-alert-logs" } +type timeoutWorker[T river.JobArgs] struct { + river.Worker[T] + timeout time.Duration +} + +// Timeout implements Worker interface. +func (w *timeoutWorker[T]) Timeout(job *river.Job[T]) time.Duration { + return w.timeout +} + // CleanupAlertLogs will remove alert log entries for deleted alerts. func (db *DB) CleanupAlertLogs(ctx context.Context, j *river.Job[AlertLogArgs]) error { var lastID int64 // start at zero, we will scan _all_ logs diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index eac9e9a311..430850baba 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -45,7 +45,9 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData)) - river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs)) + + // the alert log cleanup job can last longer than a minute (the default timeout) so we set a longer timeout + river.AddWorker(args.Workers, &timeoutWorker[AlertLogArgs]{Worker: river.WorkFunc(db.CleanupAlertLogs), timeout: time.Hour}) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 2}) if err != nil { @@ -94,6 +96,9 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { func() (river.JobArgs, *river.InsertOpts) { return AlertLogArgs{}, &river.InsertOpts{ Queue: QueueName, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + }, } }, &river.PeriodicJobOpts{RunOnStart: true}, From dff614d17507e2de02d7ed7191a06f4ed0d59e52 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 14:51:28 -0600 Subject: [PATCH 23/34] refactor: rename cleanupDays to more descriptive staleThresholdDays in cleanup manager functions --- engine/cleanupmanager/alerts.go | 4 ++-- engine/cleanupmanager/queries.sql | 14 +++++++------- engine/cleanupmanager/scheddata.go | 4 +++- gadb/queries.sql.go | 28 ++++++++++++++-------------- 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/engine/cleanupmanager/alerts.go b/engine/cleanupmanager/alerts.go index 58de9e7488..bf3416bafc 100644 --- a/engine/cleanupmanager/alerts.go +++ b/engine/cleanupmanager/alerts.go @@ -22,8 +22,8 @@ func (db *DB) CleanupAlerts(ctx context.Context, j *river.Job[AlertArgs]) error if cfg.Maintenance.AlertAutoCloseDays > 0 { err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { ids, err := gadb.New(tx).CleanupMgrFindStaleAlerts(ctx, gadb.CleanupMgrFindStaleAlertsParams{ - AutoCloseDays: int64(cfg.Maintenance.AlertAutoCloseDays), - IncludeAcked: cfg.Maintenance.AutoCloseAckedAlerts, + AutoCloseThresholdDays: int64(cfg.Maintenance.AlertAutoCloseDays), + IncludeAcked: cfg.Maintenance.AutoCloseAckedAlerts, }) if err != nil { return false, fmt.Errorf("find stale alerts: %w", err) diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index 43328f3c14..cd0bbc5d12 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -8,7 +8,7 @@ WHERE id = ANY ( alerts a WHERE status = 'closed' - AND a.created_at < now() -(sqlc.arg(cleanup_days)::bigint * '1 day'::interval) + AND a.created_at < now() -(sqlc.arg(stale_threshold_days)::bigint * '1 day'::interval) ORDER BY id LIMIT 100 @@ -24,14 +24,14 @@ FROM WHERE (a.status = 'triggered' OR (sqlc.arg(include_acked) AND a.status = 'active')) -AND created_at <= now() - '1 day'::interval * sqlc.arg(auto_close_days) +AND created_at <= now() - '1 day'::interval * sqlc.arg(auto_close_threshold_days) AND NOT EXISTS ( SELECT 1 FROM alert_logs log WHERE - timestamp > now() - '1 day'::interval * sqlc.arg(auto_close_days) + timestamp > now() - '1 day'::interval * sqlc.arg(auto_close_threshold_days) AND log.alert_id = a.id) LIMIT 100; @@ -44,7 +44,7 @@ WHERE id = ANY ( FROM user_overrides WHERE - end_time <(now() - '1 day'::interval * sqlc.arg(cleanup_days)) + end_time <(now() - '1 day'::interval * sqlc.arg(history_threshold_days)) LIMIT 100 FOR UPDATE SKIP LOCKED); @@ -58,7 +58,7 @@ WHERE id = ANY ( FROM schedule_on_call_users WHERE - end_time <(now() - '1 day'::interval * sqlc.arg(cleanup_days)) + end_time <(now() - '1 day'::interval * sqlc.arg(history_threshold_days)) LIMIT 100 FOR UPDATE SKIP LOCKED); @@ -72,7 +72,7 @@ WHERE id = ANY ( FROM ep_step_on_call_users WHERE - end_time <(now() - '1 day'::interval * sqlc.arg(cleanup_days)) + end_time <(now() - '1 day'::interval * sqlc.arg(history_threshold_days)) LIMIT 100 FOR UPDATE SKIP LOCKED); @@ -87,7 +87,7 @@ FROM WHERE data NOTNULL AND (last_cleanup_at ISNULL - OR last_cleanup_at <= now() - '1 month'::interval) + OR last_cleanup_at <= now() - '1 day'::interval * sqlc.arg(cleanup_interval_days)::int) ORDER BY last_cleanup_at ASC nulls FIRST FOR UPDATE diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index 28a20f3c33..f20d498272 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -31,7 +31,8 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg } err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { - dataRow, err := gadb.New(tx).CleanupMgrScheduleData(ctx) + // Grab the next schedule that hasn't been cleaned up in the last 30 days. + dataRow, err := gadb.New(tx).CleanupMgrScheduleData(ctx, 30) if errors.Is(err, sql.ErrNoRows) { return true, nil } @@ -52,6 +53,7 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) } + // We want to remove shifts for users that no longer exist, so to do that we'll get the set of users from the schedule data and verify them. users := collectUsers(data) var validUsers []uuid.UUID if len(users) > 0 { diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 55f218ef64..de66dc66c1 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -869,8 +869,8 @@ WHERE id = ANY ( ` // CleanupMgrDeleteOldAlerts will delete old alerts from the alerts table that are closed and older than the given number of days before now. -func (q *Queries) CleanupMgrDeleteOldAlerts(ctx context.Context, cleanupDays int64) (int64, error) { - result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldAlerts, cleanupDays) +func (q *Queries) CleanupMgrDeleteOldAlerts(ctx context.Context, staleThresholdDays int64) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldAlerts, staleThresholdDays) if err != nil { return 0, err } @@ -892,8 +892,8 @@ WHERE id = ANY ( ` // CleanupMgrDeleteOldOverrides will delete old overrides from the user_overrides table that are older than the given number of days before now. -func (q *Queries) CleanupMgrDeleteOldOverrides(ctx context.Context, cleanupDays interface{}) (int64, error) { - result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldOverrides, cleanupDays) +func (q *Queries) CleanupMgrDeleteOldOverrides(ctx context.Context, historyThresholdDays interface{}) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldOverrides, historyThresholdDays) if err != nil { return 0, err } @@ -915,8 +915,8 @@ WHERE id = ANY ( ` // CleanupMgrDeleteOldScheduleShifts will delete old schedule shifts from the schedule_on_call_users table that are older than the given number of days before now. -func (q *Queries) CleanupMgrDeleteOldScheduleShifts(ctx context.Context, cleanupDays interface{}) (int64, error) { - result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldScheduleShifts, cleanupDays) +func (q *Queries) CleanupMgrDeleteOldScheduleShifts(ctx context.Context, historyThresholdDays interface{}) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldScheduleShifts, historyThresholdDays) if err != nil { return 0, err } @@ -938,8 +938,8 @@ WHERE id = ANY ( ` // CleanupMgrDeleteOldStepShifts will delete old EP step shifts from the ep_step_on_call_users table that are older than the given number of days before now. -func (q *Queries) CleanupMgrDeleteOldStepShifts(ctx context.Context, cleanupDays interface{}) (int64, error) { - result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldStepShifts, cleanupDays) +func (q *Queries) CleanupMgrDeleteOldStepShifts(ctx context.Context, historyThresholdDays interface{}) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldStepShifts, historyThresholdDays) if err != nil { return 0, err } @@ -967,13 +967,13 @@ LIMIT 100 ` type CleanupMgrFindStaleAlertsParams struct { - IncludeAcked interface{} - AutoCloseDays interface{} + IncludeAcked interface{} + AutoCloseThresholdDays interface{} } // CleanupMgrFindStaleAlerts will find alerts that are triggered or active and have no activity in specified number of days. func (q *Queries) CleanupMgrFindStaleAlerts(ctx context.Context, arg CleanupMgrFindStaleAlertsParams) ([]int64, error) { - rows, err := q.db.QueryContext(ctx, cleanupMgrFindStaleAlerts, arg.IncludeAcked, arg.AutoCloseDays) + rows, err := q.db.QueryContext(ctx, cleanupMgrFindStaleAlerts, arg.IncludeAcked, arg.AutoCloseThresholdDays) if err != nil { return nil, err } @@ -1004,7 +1004,7 @@ FROM WHERE data NOTNULL AND (last_cleanup_at ISNULL - OR last_cleanup_at <= now() - '1 month'::interval) + OR last_cleanup_at <= now() - '1 day'::interval * $1::int) ORDER BY last_cleanup_at ASC nulls FIRST FOR UPDATE @@ -1018,8 +1018,8 @@ type CleanupMgrScheduleDataRow struct { } // CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. -func (q *Queries) CleanupMgrScheduleData(ctx context.Context) (CleanupMgrScheduleDataRow, error) { - row := q.db.QueryRowContext(ctx, cleanupMgrScheduleData) +func (q *Queries) CleanupMgrScheduleData(ctx context.Context, cleanupIntervalDays int32) (CleanupMgrScheduleDataRow, error) { + row := q.db.QueryRowContext(ctx, cleanupMgrScheduleData, cleanupIntervalDays) var i CleanupMgrScheduleDataRow err := row.Scan(&i.ScheduleID, &i.Data) return i, err From bd31e64ef63e06f41017cd0245e9d4ca7eb16446 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 14:55:08 -0600 Subject: [PATCH 24/34] docs: enhance comment for CleanupMgrScheduleData to clarify last_cleanup_at usage --- engine/cleanupmanager/queries.sql | 2 +- gadb/queries.sql.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index cd0bbc5d12..12df3babd8 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -78,7 +78,7 @@ WHERE id = ANY ( SKIP LOCKED); -- name: CleanupMgrScheduleData :one --- CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. +-- CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval. SELECT schedule_id, data diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index de66dc66c1..51fa1b1101 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -1017,7 +1017,7 @@ type CleanupMgrScheduleDataRow struct { Data json.RawMessage } -// CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. +// CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval. func (q *Queries) CleanupMgrScheduleData(ctx context.Context, cleanupIntervalDays int32) (CleanupMgrScheduleDataRow, error) { row := q.db.QueryRowContext(ctx, cleanupMgrScheduleData, cleanupIntervalDays) var i CleanupMgrScheduleDataRow From fd5ef14541976a01134984baee3adf11cfd5b7b0 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 16:18:00 -0600 Subject: [PATCH 25/34] engine/cleanupmgr: refactor schedule data cleanup logic and add job for looking up schedules needing cleanup --- engine/cleanupmanager/queries.sql | 19 ++++--- engine/cleanupmanager/scheddata.go | 86 +++++++++++++++++++++++------- engine/cleanupmanager/setup.go | 39 +++++++++++++- gadb/queries.sql.go | 62 ++++++++++++++------- 4 files changed, 160 insertions(+), 46 deletions(-) diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index 12df3babd8..99398f4156 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -77,11 +77,10 @@ WHERE id = ANY ( FOR UPDATE SKIP LOCKED); --- name: CleanupMgrScheduleData :one --- CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval. +-- name: CleanupMgrScheduleNeedsCleanup :many +-- CleanupMgrScheduleNeedsCleanup will find schedules that need to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval. SELECT - schedule_id, - data + schedule_id FROM schedule_data WHERE @@ -89,9 +88,17 @@ WHERE AND (last_cleanup_at ISNULL OR last_cleanup_at <= now() - '1 day'::interval * sqlc.arg(cleanup_interval_days)::int) ORDER BY - last_cleanup_at ASC nulls FIRST + last_cleanup_at ASC nulls FIRST; + +-- name: CleanupMgrScheduleData :one +-- CleanupMgrScheduleData will select the schedule data for the given schedule id. +SELECT + data +FROM + schedule_data +WHERE + schedule_id = $1 FOR UPDATE - SKIP LOCKED LIMIT 1; -- name: CleanupMgrUpdateScheduleData :exec diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index f20d498272..6c9b1c9ec4 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -11,16 +11,67 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/target/goalert/config" "github.com/target/goalert/gadb" "github.com/target/goalert/schedule" ) -type SchedDataArgs struct{} +type SchedDataArgs struct { + ScheduleID uuid.UUID +} func (SchedDataArgs) Kind() string { return "cleanup-manager-sched-data" } +type SchedDataLookForWorkArgs struct{} + +func (SchedDataLookForWorkArgs) Kind() string { return "cleanup-manager-sched-data-look-for-work" } + +// LookForWorkScheduleData will automatically look for schedules that need their JSON data cleaned up and insert them into the queue. +func (db *DB) LookForWorkScheduleData(ctx context.Context, j *river.Job[SchedDataLookForWorkArgs]) error { + cfg := config.FromContext(ctx) + if cfg.Maintenance.ScheduleCleanupDays <= 0 { + return nil + } + var outOfDate []uuid.UUID + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + var err error + // Grab schedules that haven't been cleaned up in the last 30 days. + outOfDate, err = gadb.New(tx).CleanupMgrScheduleNeedsCleanup(ctx, 30) + return err + }) + if errors.Is(err, sql.ErrNoRows) { + return nil + } + if err != nil { + return err + } + + var params []river.InsertManyParams + for _, id := range outOfDate { + params = append(params, river.InsertManyParams{ + Args: SchedDataArgs{ScheduleID: id}, + InsertOpts: &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityTempSched, + UniqueOpts: river.UniqueOpts{ByArgs: true}, + }, + }) + } + + if len(params) == 0 { + return nil + } + + _, err = river.ClientFromContext[pgx.Tx](ctx).InsertMany(ctx, params) + if err != nil { + return fmt.Errorf("insert many: %w", err) + } + + return nil +} + // CleanupScheduleData will automatically cleanup schedule data. // - Remove temporary-schedule shifts for users that no longer exist. // - Remove temporary-schedule shifts that occur in the past. @@ -29,28 +80,23 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg if cfg.Maintenance.ScheduleCleanupDays <= 0 { return nil } + log := db.logger.With(slog.String("schedule_id", j.Args.ScheduleID.String())) - err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) (err error) { // Grab the next schedule that hasn't been cleaned up in the last 30 days. - dataRow, err := gadb.New(tx).CleanupMgrScheduleData(ctx, 30) + rawData, err := gadb.New(tx).CleanupMgrScheduleData(ctx, j.Args.ScheduleID) if errors.Is(err, sql.ErrNoRows) { - return true, nil + return nil } if err != nil { - return false, fmt.Errorf("get schedule data: %w", err) + return fmt.Errorf("get schedule data: %w", err) } - log := db.logger.With(slog.String("schedule_id", dataRow.ScheduleID.String())) gdb := gadb.New(tx) var data schedule.Data - err = json.Unmarshal(dataRow.Data, &data) + err = json.Unmarshal(rawData, &data) if err != nil { - log.ErrorContext(ctx, - "failed to unmarshal schedule data, skipping.", - slog.String("error", err.Error())) - - // Mark as skipped so we don't keep trying to process it. - return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) + return fmt.Errorf("unmarshal schedule data: %w", err) } // We want to remove shifts for users that no longer exist, so to do that we'll get the set of users from the schedule data and verify them. @@ -59,28 +105,28 @@ func (db *DB) CleanupScheduleData(ctx context.Context, j *river.Job[SchedDataArg if len(users) > 0 { validUsers, err = gdb.CleanupMgrVerifyUsers(ctx, users) if err != nil { - return false, fmt.Errorf("lookup valid users: %w", err) + return fmt.Errorf("lookup valid users: %w", err) } } now, err := gdb.Now(ctx) if err != nil { - return false, fmt.Errorf("get current time: %w", err) + return fmt.Errorf("get current time: %w", err) } changed := cleanupData(&data, validUsers, now) if !changed { - return false, gdb.CleanupMgrScheduleDataSkip(ctx, dataRow.ScheduleID) + return gdb.CleanupMgrScheduleDataSkip(ctx, j.Args.ScheduleID) } - rawData, err := json.Marshal(data) + rawData, err = json.Marshal(data) if err != nil { - return false, fmt.Errorf("marshal schedule data: %w", err) + return fmt.Errorf("marshal schedule data: %w", err) } log.InfoContext(ctx, "Updated schedule data.") - return false, gdb.CleanupMgrUpdateScheduleData(ctx, + return gdb.CleanupMgrUpdateScheduleData(ctx, gadb.CleanupMgrUpdateScheduleDataParams{ - ScheduleID: dataRow.ScheduleID, + ScheduleID: j.Args.ScheduleID, Data: rawData, }) }) diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index a2ea7cd095..0251dddd88 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -15,6 +15,13 @@ var _ processinglock.Setupable = &DB{} const QueueName = "cleanup-manager" +const ( + PriorityAlertCleanup = iota + 1 + PrioritySchedHistory + PriorityTempSchedLFW + PriorityTempSched +) + // whileWork will run the provided function in a loop until it returns done=true. func (db *DB) whileWork(ctx context.Context, run func(ctx context.Context, tx *sql.Tx) (done bool, err error)) error { var done bool @@ -45,8 +52,9 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData)) + river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData)) - err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 2}) + err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5}) if err != nil { return fmt.Errorf("add queue: %w", err) } @@ -56,7 +64,34 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.PeriodicInterval(time.Hour), func() (river.JobArgs, *river.InsertOpts) { return AlertArgs{}, &river.InsertOpts{ - Queue: QueueName, + Queue: QueueName, + Priority: PriorityAlertCleanup, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return ShiftArgs{}, &river.InsertOpts{ + Queue: QueueName, + Priority: PrioritySchedHistory, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return SchedDataLookForWorkArgs{}, &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityTempSchedLFW, } }, &river.PeriodicJobOpts{RunOnStart: true}, diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 51fa1b1101..dd66aa74cb 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -997,32 +997,21 @@ func (q *Queries) CleanupMgrFindStaleAlerts(ctx context.Context, arg CleanupMgrF const cleanupMgrScheduleData = `-- name: CleanupMgrScheduleData :one SELECT - schedule_id, data FROM schedule_data WHERE - data NOTNULL - AND (last_cleanup_at ISNULL - OR last_cleanup_at <= now() - '1 day'::interval * $1::int) -ORDER BY - last_cleanup_at ASC nulls FIRST + schedule_id = $1 FOR UPDATE - SKIP LOCKED LIMIT 1 ` -type CleanupMgrScheduleDataRow struct { - ScheduleID uuid.UUID - Data json.RawMessage -} - -// CleanupMgrScheduleData will find the next schedule data that needs to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval. -func (q *Queries) CleanupMgrScheduleData(ctx context.Context, cleanupIntervalDays int32) (CleanupMgrScheduleDataRow, error) { - row := q.db.QueryRowContext(ctx, cleanupMgrScheduleData, cleanupIntervalDays) - var i CleanupMgrScheduleDataRow - err := row.Scan(&i.ScheduleID, &i.Data) - return i, err +// CleanupMgrScheduleData will select the schedule data for the given schedule id. +func (q *Queries) CleanupMgrScheduleData(ctx context.Context, scheduleID uuid.UUID) (json.RawMessage, error) { + row := q.db.QueryRowContext(ctx, cleanupMgrScheduleData, scheduleID) + var data json.RawMessage + err := row.Scan(&data) + return data, err } const cleanupMgrScheduleDataSkip = `-- name: CleanupMgrScheduleDataSkip :exec @@ -1040,6 +1029,43 @@ func (q *Queries) CleanupMgrScheduleDataSkip(ctx context.Context, scheduleID uui return err } +const cleanupMgrScheduleNeedsCleanup = `-- name: CleanupMgrScheduleNeedsCleanup :many +SELECT + schedule_id +FROM + schedule_data +WHERE + data NOTNULL + AND (last_cleanup_at ISNULL + OR last_cleanup_at <= now() - '1 day'::interval * $1::int) +ORDER BY + last_cleanup_at ASC nulls FIRST +` + +// CleanupMgrScheduleNeedsCleanup will find schedules that need to be cleaned up. The last_cleanup_at field is used to ensure we clean up each schedule data at most once per interval. +func (q *Queries) CleanupMgrScheduleNeedsCleanup(ctx context.Context, cleanupIntervalDays int32) ([]uuid.UUID, error) { + rows, err := q.db.QueryContext(ctx, cleanupMgrScheduleNeedsCleanup, cleanupIntervalDays) + if err != nil { + return nil, err + } + defer rows.Close() + var items []uuid.UUID + for rows.Next() { + var schedule_id uuid.UUID + if err := rows.Scan(&schedule_id); err != nil { + return nil, err + } + items = append(items, schedule_id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const cleanupMgrUpdateScheduleData = `-- name: CleanupMgrUpdateScheduleData :exec UPDATE schedule_data From b85bbdbc85fc44db07a4bbd9b54315ccd66c0f55 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Tue, 17 Dec 2024 16:57:15 -0600 Subject: [PATCH 26/34] engine/cleanupmgr: implement alert log cleanup job scheduling and refactor related queries --- engine/cleanupmanager/alertlogs.go | 79 +++++++++++++++++++++++++----- engine/cleanupmanager/queries.sql | 10 +++- engine/cleanupmanager/setup.go | 65 ++++-------------------- gadb/queries.sql.go | 35 +++++++++++-- 4 files changed, 116 insertions(+), 73 deletions(-) diff --git a/engine/cleanupmanager/alertlogs.go b/engine/cleanupmanager/alertlogs.go index 6a678b8484..004434e00b 100644 --- a/engine/cleanupmanager/alertlogs.go +++ b/engine/cleanupmanager/alertlogs.go @@ -5,36 +5,91 @@ import ( "database/sql" "errors" "fmt" - "time" + "github.com/jackc/pgx/v5" "github.com/riverqueue/river" "github.com/target/goalert/gadb" ) -type AlertLogArgs struct{} +type AlertLogLFWArgs struct{} -func (AlertLogArgs) Kind() string { return "cleanup-manager-alert-logs" } +func (AlertLogLFWArgs) Kind() string { return "cleanup-manager-alert-logs-lfw" } -type timeoutWorker[T river.JobArgs] struct { - river.Worker[T] - timeout time.Duration +type AlertLogArgs struct { + StartID int64 + EndID int64 } -// Timeout implements Worker interface. -func (w *timeoutWorker[T]) Timeout(job *river.Job[T]) time.Duration { - return w.timeout +const ( + batchSize = 5000 + blockSize = 100000 +) + +// LookForWorkAlertLogs will schedule alert log cleanup jobs for blocks of alert log IDs. +// +// The strategy here is to look for the minimum and maximum alert log IDs in the database, then schedule jobs for each `blockSize` block of IDs, +// and those jobs will then cleanup the alert logs in that range `batchSize` at a time. +func (db *DB) LookForWorkAlertLogs(ctx context.Context, j *river.Job[AlertLogLFWArgs]) error { + var min, max int64 + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + row, err := gadb.New(tx).CleanupMgrAlertLogsMinMax(ctx) + if err != nil { + return err + } + min, max = row.MinID, row.MaxID + return nil + }) + if errors.Is(err, sql.ErrNoRows) { + return nil + } + if err != nil { + return fmt.Errorf("get min/max alert log ID: %w", err) + } + + max++ + + var params []river.InsertManyParams + for i := int64(0); i < max; i += blockSize { + if i < min { + // skip sparse blocks + continue + } + + params = append(params, river.InsertManyParams{ + Args: AlertLogArgs{StartID: i, EndID: i + blockSize}, + InsertOpts: &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityAlertLogs, + UniqueOpts: river.UniqueOpts{ByArgs: true}, + }, + }) + } + + if len(params) == 0 { + return nil + } + + _, err = river.ClientFromContext[pgx.Tx](ctx).InsertMany(ctx, params) + if err != nil { + return fmt.Errorf("insert many: %w", err) + } + + return nil } +func (AlertLogArgs) Kind() string { return "cleanup-manager-alert-logs" } + // CleanupAlertLogs will remove alert log entries for deleted alerts. func (db *DB) CleanupAlertLogs(ctx context.Context, j *river.Job[AlertLogArgs]) error { - var lastID int64 // start at zero, we will scan _all_ logs + lastID := j.Args.StartID err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { db.logger.DebugContext(ctx, "Cleaning up alert logs...", "lastID", lastID) lastID, err = gadb.New(tx).CleanupAlertLogs(ctx, gadb.CleanupAlertLogsParams{ - BatchSize: 10000, - AfterID: lastID, + BatchSize: batchSize, + StartID: lastID, + EndID: j.Args.EndID, }) if errors.Is(err, sql.ErrNoRows) { return true, nil diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index 5c334285a1..c251ec9ff1 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -129,6 +129,14 @@ FROM WHERE id = ANY (sqlc.arg(user_ids)::uuid[]); +-- name: CleanupMgrAlertLogsMinMax :one +-- CleanupMgrAlertLogsMinMax will find the minimum and maximum id of the alert_logs table. +SELECT + min(id)::bigint AS min_id, + max(id)::bigint AS max_id +FROM + alert_logs; + -- name: CleanupAlertLogs :one WITH scope AS ( SELECT @@ -136,7 +144,7 @@ WITH scope AS ( FROM alert_logs l WHERE - l.id > @after_id + l.id BETWEEN @start_id AND @end_id - 1 ORDER BY l.id LIMIT @batch_size diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 243ee13ddc..5bd382fd08 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -16,10 +16,12 @@ var _ processinglock.Setupable = &DB{} const QueueName = "cleanup-manager" const ( - PriorityAlertCleanup = iota + 1 - PrioritySchedHistory - PriorityTempSchedLFW - PriorityTempSched + PriorityAlertCleanup = 1 + PrioritySchedHistory = 1 + PriorityTempSchedLFW = 2 + PriorityAlertLogsLFW = 2 + PriorityTempSched = 3 + PriorityAlertLogs = 4 ) // whileWork will run the provided function in a loop until it returns done=true. @@ -52,10 +54,9 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlerts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData)) - - // the alert log cleanup job can last longer than a minute (the default timeout) so we set a longer timeout - river.AddWorker(args.Workers, &timeoutWorker[AlertLogArgs]{Worker: river.WorkFunc(db.CleanupAlertLogs), timeout: time.Hour}) river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs)) + river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkAlertLogs)) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5}) if err != nil { @@ -101,59 +102,11 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { ), }) - args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ - river.NewPeriodicJob( - river.PeriodicInterval(24*time.Hour), - func() (river.JobArgs, *river.InsertOpts) { - return ShiftArgs{}, &river.InsertOpts{ - Queue: QueueName, - } - }, - &river.PeriodicJobOpts{RunOnStart: true}, - ), - }) - - args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ - river.NewPeriodicJob( - river.PeriodicInterval(24*time.Hour), - func() (river.JobArgs, *river.InsertOpts) { - return SchedDataArgs{}, &river.InsertOpts{ - Queue: QueueName, - } - }, - &river.PeriodicJobOpts{RunOnStart: true}, - ), - }) - - args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ - river.NewPeriodicJob( - river.PeriodicInterval(24*time.Hour), - func() (river.JobArgs, *river.InsertOpts) { - return ShiftArgs{}, &river.InsertOpts{ - Queue: QueueName, - } - }, - &river.PeriodicJobOpts{RunOnStart: true}, - ), - }) - - args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ - river.NewPeriodicJob( - river.PeriodicInterval(24*time.Hour), - func() (river.JobArgs, *river.InsertOpts) { - return SchedDataArgs{}, &river.InsertOpts{ - Queue: QueueName, - } - }, - &river.PeriodicJobOpts{RunOnStart: true}, - ), - }) - args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ river.NewPeriodicJob( river.PeriodicInterval(7*24*time.Hour), func() (river.JobArgs, *river.InsertOpts) { - return AlertLogArgs{}, &river.InsertOpts{ + return AlertLogLFWArgs{}, &river.InsertOpts{ Queue: QueueName, UniqueOpts: river.UniqueOpts{ ByArgs: true, diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 5408ca493c..de1d85e934 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -858,10 +858,10 @@ WITH scope AS ( FROM alert_logs l WHERE - l.id > $2 + l.id BETWEEN $2 AND $3- 1 ORDER BY l.id - LIMIT $3 + LIMIT $4 ), id_range AS ( SELECT @@ -906,16 +906,43 @@ _delete AS ( type CleanupAlertLogsParams struct { BatchSize int32 - AfterID int64 + StartID int64 + EndID int64 } func (q *Queries) CleanupAlertLogs(ctx context.Context, arg CleanupAlertLogsParams) (int64, error) { - row := q.db.QueryRowContext(ctx, cleanupAlertLogs, arg.BatchSize, arg.AfterID, arg.BatchSize) + row := q.db.QueryRowContext(ctx, cleanupAlertLogs, + arg.BatchSize, + arg.StartID, + arg.EndID, + arg.BatchSize, + ) var id int64 err := row.Scan(&id) return id, err } +const cleanupMgrAlertLogsMinMax = `-- name: CleanupMgrAlertLogsMinMax :one +SELECT + min(id)::bigint AS min_id, + max(id)::bigint AS max_id +FROM + alert_logs +` + +type CleanupMgrAlertLogsMinMaxRow struct { + MinID int64 + MaxID int64 +} + +// CleanupMgrAlertLogsMinMax will find the minimum and maximum id of the alert_logs table. +func (q *Queries) CleanupMgrAlertLogsMinMax(ctx context.Context) (CleanupMgrAlertLogsMinMaxRow, error) { + row := q.db.QueryRowContext(ctx, cleanupMgrAlertLogsMinMax) + var i CleanupMgrAlertLogsMinMaxRow + err := row.Scan(&i.MinID, &i.MaxID) + return i, err +} + const cleanupMgrDeleteOldAlerts = `-- name: CleanupMgrDeleteOldAlerts :execrows DELETE FROM alerts WHERE id = ANY ( From aa505b2759673ecd24e7b14ddc0fb6d54d4136ea Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 18 Dec 2024 09:06:35 -0600 Subject: [PATCH 27/34] engine/initriver: remove unused noopWorker type --- app/initriver.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/app/initriver.go b/app/initriver.go index 969f5f34e0..9683332776 100644 --- a/app/initriver.go +++ b/app/initriver.go @@ -39,10 +39,6 @@ func (r *riverErrs) HandlePanic(ctx context.Context, job *rivertype.JobRow, pani return nil } -type noopWorker struct{} - -func (noopWorker) Kind() string { return "noop" } - // ignoreCancel is a slog.Handler that ignores log records with an "error" attribute of "context canceled". type ignoreCancel struct{ h slog.Handler } From 4bee0287b6bc8a01bb978f28054462f87ec58258 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 18 Dec 2024 09:38:05 -0600 Subject: [PATCH 28/34] engine/cleanupmgr: rename LookForWorkArgs types for consistency --- engine/cleanupmanager/scheddata.go | 6 +++--- engine/cleanupmanager/setup.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/cleanupmanager/scheddata.go b/engine/cleanupmanager/scheddata.go index 6c9b1c9ec4..b7307ce245 100644 --- a/engine/cleanupmanager/scheddata.go +++ b/engine/cleanupmanager/scheddata.go @@ -24,12 +24,12 @@ type SchedDataArgs struct { func (SchedDataArgs) Kind() string { return "cleanup-manager-sched-data" } -type SchedDataLookForWorkArgs struct{} +type SchedDataLFW struct{} -func (SchedDataLookForWorkArgs) Kind() string { return "cleanup-manager-sched-data-look-for-work" } +func (SchedDataLFW) Kind() string { return "cleanup-manager-sched-data-lfw" } // LookForWorkScheduleData will automatically look for schedules that need their JSON data cleaned up and insert them into the queue. -func (db *DB) LookForWorkScheduleData(ctx context.Context, j *river.Job[SchedDataLookForWorkArgs]) error { +func (db *DB) LookForWorkScheduleData(ctx context.Context, j *river.Job[SchedDataLFW]) error { cfg := config.FromContext(ctx) if cfg.Maintenance.ScheduleCleanupDays <= 0 { return nil diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 0251dddd88..e3f3d773d2 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -89,7 +89,7 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.NewPeriodicJob( river.PeriodicInterval(24*time.Hour), func() (river.JobArgs, *river.InsertOpts) { - return SchedDataLookForWorkArgs{}, &river.InsertOpts{ + return SchedDataLFW{}, &river.InsertOpts{ Queue: QueueName, Priority: PriorityTempSchedLFW, } From acc14dab15c412e472e196611ece36d9ec15a06a Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 18 Dec 2024 14:52:09 -0600 Subject: [PATCH 29/34] feat(cleanupmanager): implement API key cleanup functionality --- engine/cleanupmanager/apikeys.go | 50 +++++++++++++++++++++++++++++ engine/cleanupmanager/queries.sql | 34 ++++++++++++++++++++ gadb/queries.sql.go | 52 +++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+) create mode 100644 engine/cleanupmanager/apikeys.go diff --git a/engine/cleanupmanager/apikeys.go b/engine/cleanupmanager/apikeys.go new file mode 100644 index 0000000000..4a991333a5 --- /dev/null +++ b/engine/cleanupmanager/apikeys.go @@ -0,0 +1,50 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "fmt" + + "github.com/riverqueue/river" + "github.com/target/goalert/config" + "github.com/target/goalert/gadb" +) + +type APIKeysArgs struct{} + +func (APIKeysArgs) Kind() string { return "cleanup-manager-api-keys" } + +// CleanupAPIKeys will revoke access to the API from unused tokens, including both user sessions and calendar subscriptions. +func (db *DB) CleanupAPIKeys(ctx context.Context, j *river.Job[ShiftArgs]) error { + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + // After 30 days, the token is no longer valid, so delete it. + // + // This is defined by how the keyring system works for session signing, and is not influenced by the APIKeyExpireDays config. + count, err := gadb.New(tx).CleanupMgrDeleteOldSessions(ctx, 30) + if err != nil { + return false, fmt.Errorf("delete old user sessions: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + cfg := config.FromContext(ctx) + if cfg.Maintenance.APIKeyExpireDays <= 0 { + return nil + } + + err = db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDisableOldCalSub(ctx, int32(cfg.Maintenance.APIKeyExpireDays)) + if err != nil { + return false, fmt.Errorf("disable unused calsub keys: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + return nil +} diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index c251ec9ff1..ad409147b2 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -189,3 +189,37 @@ _delete AS ( scope OFFSET @batch_size - 1 LIMIT 1; +-- name: CleanupMgrDeleteOldSessions :execrows +-- CleanupMgrDeleteOldSessions will delete old sessions from the auth_user_sessions table that are older than the given number of days before now. +DELETE FROM auth_user_sessions +WHERE id = ANY ( + SELECT + id + FROM + auth_user_sessions + WHERE + last_access_at <(now() - '1 day'::interval * sqlc.arg(max_session_age_days)::int) + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + +-- name: CleanupMgrDisableOldCalSub :execrows +-- CleanupMgrDeleteOldCalSub will disable old calendar subscriptions from the user_calendar_subscriptions table that are unused for at least the given number of days. +UPDATE + user_calendar_subscriptions +SET + disabled = TRUE +WHERE + id = ANY ( + SELECT + id + FROM + user_calendar_subscriptions + WHERE + greatest(last_access, last_update) <(now() - '1 day'::interval * sqlc.arg(inactivity_threshold_days)::int) + ORDER BY + id + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index de1d85e934..00775a342f 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -1015,6 +1015,29 @@ func (q *Queries) CleanupMgrDeleteOldScheduleShifts(ctx context.Context, history return result.RowsAffected() } +const cleanupMgrDeleteOldSessions = `-- name: CleanupMgrDeleteOldSessions :execrows +DELETE FROM auth_user_sessions +WHERE id = ANY ( + SELECT + id + FROM + auth_user_sessions + WHERE + last_access_at <(now() - '1 day'::interval * $1::int) + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldSessions will delete old sessions from the auth_user_sessions table that are older than the given number of days before now. +func (q *Queries) CleanupMgrDeleteOldSessions(ctx context.Context, maxSessionAgeDays int32) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldSessions, maxSessionAgeDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const cleanupMgrDeleteOldStepShifts = `-- name: CleanupMgrDeleteOldStepShifts :execrows DELETE FROM ep_step_on_call_users WHERE id = ANY ( @@ -1038,6 +1061,35 @@ func (q *Queries) CleanupMgrDeleteOldStepShifts(ctx context.Context, historyThre return result.RowsAffected() } +const cleanupMgrDisableOldCalSub = `-- name: CleanupMgrDisableOldCalSub :execrows +UPDATE + user_calendar_subscriptions +SET + disabled = TRUE +WHERE + id = ANY ( + SELECT + id + FROM + user_calendar_subscriptions + WHERE + greatest(last_access, last_update) <(now() - '1 day'::interval * $1::int) + ORDER BY + id + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldCalSub will disable old calendar subscriptions from the user_calendar_subscriptions table that are unused for at least the given number of days. +func (q *Queries) CleanupMgrDisableOldCalSub(ctx context.Context, inactivityThresholdDays int32) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDisableOldCalSub, inactivityThresholdDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const cleanupMgrFindStaleAlerts = `-- name: CleanupMgrFindStaleAlerts :many SELECT id From 77a229c62bd6044ddaa37cc3833d9c24454058ca Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Wed, 18 Dec 2024 14:54:18 -0600 Subject: [PATCH 30/34] feat(cleanupmanager): add periodic job for API key cleanup and remove unused update logic --- engine/cleanupmanager/db.go | 17 +--------- engine/cleanupmanager/setup.go | 15 +++++++++ engine/cleanupmanager/update.go | 56 --------------------------------- 3 files changed, 16 insertions(+), 72 deletions(-) delete mode 100644 engine/cleanupmanager/update.go diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 605f0c855a..fc85319a00 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -7,7 +7,6 @@ import ( "github.com/target/goalert/alert" "github.com/target/goalert/engine/processinglock" - "github.com/target/goalert/util" ) // DB handles updating escalation policies. @@ -15,11 +14,6 @@ type DB struct { db *sql.DB lock *processinglock.Lock - cleanupAPIKeys *sql.Stmt - setTimeout *sql.Stmt - - cleanupSessions *sql.Stmt - alertStore *alert.Store logger *slog.Logger @@ -38,20 +32,11 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store, log *slog.L return nil, err } - p := &util.Prepare{Ctx: ctx, DB: db} - return &DB{ db: db, lock: lock, logger: log, - // Abort any cleanup operation that takes longer than 3 seconds - // error will be logged. - setTimeout: p.P(`SET LOCAL statement_timeout = 3000`), - cleanupAPIKeys: p.P(`update user_calendar_subscriptions set disabled = true where id = any(select id from user_calendar_subscriptions where greatest(last_access, last_update) < (now() - $1::interval) order by id limit 100 for update skip locked)`), - - cleanupSessions: p.P(`DELETE FROM auth_user_sessions WHERE id = any(select id from auth_user_sessions where last_access_at < (now() - '30 days'::interval) LIMIT 100 for update skip locked)`), - alertStore: alertstore, - }, p.Err + }, nil } diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 787c34df38..fa1a8f3ef9 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -18,6 +18,7 @@ const QueueName = "cleanup-manager" const ( PriorityAlertCleanup = 1 PrioritySchedHistory = 1 + PriorityAPICleanup = 1 PriorityTempSchedLFW = 2 PriorityAlertLogsLFW = 2 PriorityTempSched = 3 @@ -57,6 +58,7 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs)) river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkAlertLogs)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAPIKeys)) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5}) if err != nil { @@ -117,5 +119,18 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { ), }) + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return APIKeysArgs{}, &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityAPICleanup, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + return nil } diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go deleted file mode 100644 index 3e0ffc817b..0000000000 --- a/engine/cleanupmanager/update.go +++ /dev/null @@ -1,56 +0,0 @@ -package cleanupmanager - -import ( - "context" - "fmt" - - "github.com/jackc/pgtype" - "github.com/target/goalert/config" - "github.com/target/goalert/permission" - "github.com/target/goalert/util/log" - "github.com/target/goalert/util/sqlutil" -) - -// UpdateAll will update the state of all active escalation policies. -func (db *DB) UpdateAll(ctx context.Context) error { - err := db.update(ctx) - return err -} - -func (db *DB) update(ctx context.Context) error { - err := permission.LimitCheckAny(ctx, permission.System) - if err != nil { - return err - } - log.Debugf(ctx, "Running cleanup operations.") - - tx, err := db.lock.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("begin tx: %w", err) - } - defer sqlutil.Rollback(ctx, "cleanup manager", tx) - - _, err = tx.StmtContext(ctx, db.setTimeout).ExecContext(ctx) - if err != nil { - return fmt.Errorf("set timeout: %w", err) - } - - _, err = tx.StmtContext(ctx, db.cleanupSessions).ExecContext(ctx) - if err != nil { - return fmt.Errorf("cleanup sessions: %w", err) - } - - cfg := config.FromContext(ctx) - - if cfg.Maintenance.APIKeyExpireDays > 0 { - var dur pgtype.Interval - dur.Days = int32(cfg.Maintenance.APIKeyExpireDays) - dur.Status = pgtype.Present - _, err = tx.StmtContext(ctx, db.cleanupAPIKeys).ExecContext(ctx, &dur) - if err != nil { - return err - } - } - - return tx.Commit() -} From 256dd0f2f060fb8c5e9940e4bda45fa38ac267bd Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 23 Dec 2024 16:58:57 -0600 Subject: [PATCH 31/34] fix merge issue --- engine/cleanupmanager/alertlogs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/cleanupmanager/alertlogs.go b/engine/cleanupmanager/alertlogs.go index 004434e00b..0d1a594695 100644 --- a/engine/cleanupmanager/alertlogs.go +++ b/engine/cleanupmanager/alertlogs.go @@ -39,7 +39,7 @@ func (db *DB) LookForWorkAlertLogs(ctx context.Context, j *river.Job[AlertLogLFW min, max = row.MinID, row.MaxID return nil }) - if errors.Is(err, sql.ErrNoRows) { + if min == 0 && max == 0 { return nil } if err != nil { From 483ebc96d53e5e7a017d8d8e01d332852d8b749a Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 23 Dec 2024 17:00:19 -0600 Subject: [PATCH 32/34] fix merge issue --- engine/cleanupmanager/queries.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index ad409147b2..7eb5e013fc 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -132,8 +132,8 @@ WHERE -- name: CleanupMgrAlertLogsMinMax :one -- CleanupMgrAlertLogsMinMax will find the minimum and maximum id of the alert_logs table. SELECT - min(id)::bigint AS min_id, - max(id)::bigint AS max_id + coalesce(min(id), 0)::bigint AS min_id, + coalesce(max(id), 0)::bigint AS max_id FROM alert_logs; From a63d12b74a875a51154def605decbe6868a6b1a6 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 23 Dec 2024 17:00:45 -0600 Subject: [PATCH 33/34] regen --- gadb/queries.sql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index 00775a342f..d481086b78 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -924,8 +924,8 @@ func (q *Queries) CleanupAlertLogs(ctx context.Context, arg CleanupAlertLogsPara const cleanupMgrAlertLogsMinMax = `-- name: CleanupMgrAlertLogsMinMax :one SELECT - min(id)::bigint AS min_id, - max(id)::bigint AS max_id + coalesce(min(id), 0)::bigint AS min_id, + coalesce(max(id), 0)::bigint AS max_id FROM alert_logs ` From 75e282593b0c9966c0cf4552e433db8e1e5a76b8 Mon Sep 17 00:00:00 2001 From: Nathaniel Caza Date: Mon, 23 Dec 2024 17:05:14 -0600 Subject: [PATCH 34/34] fix arg type --- engine/cleanupmanager/apikeys.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/cleanupmanager/apikeys.go b/engine/cleanupmanager/apikeys.go index 4a991333a5..bc1c5c12d8 100644 --- a/engine/cleanupmanager/apikeys.go +++ b/engine/cleanupmanager/apikeys.go @@ -15,7 +15,7 @@ type APIKeysArgs struct{} func (APIKeysArgs) Kind() string { return "cleanup-manager-api-keys" } // CleanupAPIKeys will revoke access to the API from unused tokens, including both user sessions and calendar subscriptions. -func (db *DB) CleanupAPIKeys(ctx context.Context, j *river.Job[ShiftArgs]) error { +func (db *DB) CleanupAPIKeys(ctx context.Context, j *river.Job[APIKeysArgs]) error { err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { // After 30 days, the token is no longer valid, so delete it. //