diff --git a/engine/cleanupmanager/apikeys.go b/engine/cleanupmanager/apikeys.go new file mode 100644 index 0000000000..bc1c5c12d8 --- /dev/null +++ b/engine/cleanupmanager/apikeys.go @@ -0,0 +1,50 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "fmt" + + "github.com/riverqueue/river" + "github.com/target/goalert/config" + "github.com/target/goalert/gadb" +) + +type APIKeysArgs struct{} + +func (APIKeysArgs) Kind() string { return "cleanup-manager-api-keys" } + +// CleanupAPIKeys will revoke access to the API from unused tokens, including both user sessions and calendar subscriptions. +func (db *DB) CleanupAPIKeys(ctx context.Context, j *river.Job[APIKeysArgs]) error { + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + // After 30 days, the token is no longer valid, so delete it. + // + // This is defined by how the keyring system works for session signing, and is not influenced by the APIKeyExpireDays config. + count, err := gadb.New(tx).CleanupMgrDeleteOldSessions(ctx, 30) + if err != nil { + return false, fmt.Errorf("delete old user sessions: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + cfg := config.FromContext(ctx) + if cfg.Maintenance.APIKeyExpireDays <= 0 { + return nil + } + + err = db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + count, err := gadb.New(tx).CleanupMgrDisableOldCalSub(ctx, int32(cfg.Maintenance.APIKeyExpireDays)) + if err != nil { + return false, fmt.Errorf("disable unused calsub keys: %w", err) + } + return count < 100, nil + }) + if err != nil { + return err + } + + return nil +} diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 605f0c855a..fc85319a00 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -7,7 +7,6 @@ import ( "github.com/target/goalert/alert" "github.com/target/goalert/engine/processinglock" - "github.com/target/goalert/util" ) // DB handles updating escalation policies. @@ -15,11 +14,6 @@ type DB struct { db *sql.DB lock *processinglock.Lock - cleanupAPIKeys *sql.Stmt - setTimeout *sql.Stmt - - cleanupSessions *sql.Stmt - alertStore *alert.Store logger *slog.Logger @@ -38,20 +32,11 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store, log *slog.L return nil, err } - p := &util.Prepare{Ctx: ctx, DB: db} - return &DB{ db: db, lock: lock, logger: log, - // Abort any cleanup operation that takes longer than 3 seconds - // error will be logged. - setTimeout: p.P(`SET LOCAL statement_timeout = 3000`), - cleanupAPIKeys: p.P(`update user_calendar_subscriptions set disabled = true where id = any(select id from user_calendar_subscriptions where greatest(last_access, last_update) < (now() - $1::interval) order by id limit 100 for update skip locked)`), - - cleanupSessions: p.P(`DELETE FROM auth_user_sessions WHERE id = any(select id from auth_user_sessions where last_access_at < (now() - '30 days'::interval) LIMIT 100 for update skip locked)`), - alertStore: alertstore, - }, p.Err + }, nil } diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index ae34196666..7eb5e013fc 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -189,3 +189,37 @@ _delete AS ( scope OFFSET @batch_size - 1 LIMIT 1; +-- name: CleanupMgrDeleteOldSessions :execrows +-- CleanupMgrDeleteOldSessions will delete old sessions from the auth_user_sessions table that are older than the given number of days before now. +DELETE FROM auth_user_sessions +WHERE id = ANY ( + SELECT + id + FROM + auth_user_sessions + WHERE + last_access_at <(now() - '1 day'::interval * sqlc.arg(max_session_age_days)::int) + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + +-- name: CleanupMgrDisableOldCalSub :execrows +-- CleanupMgrDeleteOldCalSub will disable old calendar subscriptions from the user_calendar_subscriptions table that are unused for at least the given number of days. +UPDATE + user_calendar_subscriptions +SET + disabled = TRUE +WHERE + id = ANY ( + SELECT + id + FROM + user_calendar_subscriptions + WHERE + greatest(last_access, last_update) <(now() - '1 day'::interval * sqlc.arg(inactivity_threshold_days)::int) + ORDER BY + id + LIMIT 100 + FOR UPDATE + SKIP LOCKED); + diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index 787c34df38..fa1a8f3ef9 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -18,6 +18,7 @@ const QueueName = "cleanup-manager" const ( PriorityAlertCleanup = 1 PrioritySchedHistory = 1 + PriorityAPICleanup = 1 PriorityTempSchedLFW = 2 PriorityAlertLogsLFW = 2 PriorityTempSched = 3 @@ -57,6 +58,7 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs)) river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkAlertLogs)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAPIKeys)) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5}) if err != nil { @@ -117,5 +119,18 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { ), }) + args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ + river.NewPeriodicJob( + river.PeriodicInterval(24*time.Hour), + func() (river.JobArgs, *river.InsertOpts) { + return APIKeysArgs{}, &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityAPICleanup, + } + }, + &river.PeriodicJobOpts{RunOnStart: true}, + ), + }) + return nil } diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go deleted file mode 100644 index 3e0ffc817b..0000000000 --- a/engine/cleanupmanager/update.go +++ /dev/null @@ -1,56 +0,0 @@ -package cleanupmanager - -import ( - "context" - "fmt" - - "github.com/jackc/pgtype" - "github.com/target/goalert/config" - "github.com/target/goalert/permission" - "github.com/target/goalert/util/log" - "github.com/target/goalert/util/sqlutil" -) - -// UpdateAll will update the state of all active escalation policies. -func (db *DB) UpdateAll(ctx context.Context) error { - err := db.update(ctx) - return err -} - -func (db *DB) update(ctx context.Context) error { - err := permission.LimitCheckAny(ctx, permission.System) - if err != nil { - return err - } - log.Debugf(ctx, "Running cleanup operations.") - - tx, err := db.lock.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("begin tx: %w", err) - } - defer sqlutil.Rollback(ctx, "cleanup manager", tx) - - _, err = tx.StmtContext(ctx, db.setTimeout).ExecContext(ctx) - if err != nil { - return fmt.Errorf("set timeout: %w", err) - } - - _, err = tx.StmtContext(ctx, db.cleanupSessions).ExecContext(ctx) - if err != nil { - return fmt.Errorf("cleanup sessions: %w", err) - } - - cfg := config.FromContext(ctx) - - if cfg.Maintenance.APIKeyExpireDays > 0 { - var dur pgtype.Interval - dur.Days = int32(cfg.Maintenance.APIKeyExpireDays) - dur.Status = pgtype.Present - _, err = tx.StmtContext(ctx, db.cleanupAPIKeys).ExecContext(ctx, &dur) - if err != nil { - return err - } - } - - return tx.Commit() -} diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index ab21f275ad..d481086b78 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -1015,6 +1015,29 @@ func (q *Queries) CleanupMgrDeleteOldScheduleShifts(ctx context.Context, history return result.RowsAffected() } +const cleanupMgrDeleteOldSessions = `-- name: CleanupMgrDeleteOldSessions :execrows +DELETE FROM auth_user_sessions +WHERE id = ANY ( + SELECT + id + FROM + auth_user_sessions + WHERE + last_access_at <(now() - '1 day'::interval * $1::int) + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldSessions will delete old sessions from the auth_user_sessions table that are older than the given number of days before now. +func (q *Queries) CleanupMgrDeleteOldSessions(ctx context.Context, maxSessionAgeDays int32) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDeleteOldSessions, maxSessionAgeDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const cleanupMgrDeleteOldStepShifts = `-- name: CleanupMgrDeleteOldStepShifts :execrows DELETE FROM ep_step_on_call_users WHERE id = ANY ( @@ -1038,6 +1061,35 @@ func (q *Queries) CleanupMgrDeleteOldStepShifts(ctx context.Context, historyThre return result.RowsAffected() } +const cleanupMgrDisableOldCalSub = `-- name: CleanupMgrDisableOldCalSub :execrows +UPDATE + user_calendar_subscriptions +SET + disabled = TRUE +WHERE + id = ANY ( + SELECT + id + FROM + user_calendar_subscriptions + WHERE + greatest(last_access, last_update) <(now() - '1 day'::interval * $1::int) + ORDER BY + id + LIMIT 100 + FOR UPDATE + SKIP LOCKED) +` + +// CleanupMgrDeleteOldCalSub will disable old calendar subscriptions from the user_calendar_subscriptions table that are unused for at least the given number of days. +func (q *Queries) CleanupMgrDisableOldCalSub(ctx context.Context, inactivityThresholdDays int32) (int64, error) { + result, err := q.db.ExecContext(ctx, cleanupMgrDisableOldCalSub, inactivityThresholdDays) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + const cleanupMgrFindStaleAlerts = `-- name: CleanupMgrFindStaleAlerts :many SELECT id