diff --git a/engine/compatmanager/db.go b/engine/compatmanager/db.go index 72292694a6..87a998657e 100644 --- a/engine/compatmanager/db.go +++ b/engine/compatmanager/db.go @@ -6,7 +6,6 @@ import ( "github.com/target/goalert/engine/processinglock" "github.com/target/goalert/notification/slack" - "github.com/target/goalert/util" ) // DB handles keeping compatibility-related data in sync. @@ -15,13 +14,6 @@ type DB struct { lock *processinglock.Lock cs *slack.ChannelSender - - slackSubMissingCM *sql.Stmt - updateSubCMID *sql.Stmt - insertCM *sql.Stmt - - cmMissingSub *sql.Stmt - insertSub *sql.Stmt } // Name returns the name of the module. @@ -37,50 +29,9 @@ func NewDB(ctx context.Context, db *sql.DB, cs *slack.ChannelSender) (*DB, error return nil, err } - p := &util.Prepare{Ctx: ctx, DB: db} - return &DB{ db: db, lock: lock, cs: cs, - - // get all entries missing cm_id where provider_id starts with "slack:" - slackSubMissingCM: p.P(` - select id, user_id, subject_id, provider_id from auth_subjects where - provider_id like 'slack:%' and cm_id is null - for update skip locked - limit 10 - `), - - // update cm_id for a given user_id and subject_id - updateSubCMID: p.P(` - update auth_subjects - set cm_id = ( - select id from user_contact_methods - where type = 'SLACK_DM' and value = $2 - ) where id = $1 - `), - - insertCM: p.P(` - insert into user_contact_methods (id, name, type, value, user_id, pending) - values ($1, $2, $3, $4, $5, false) - on conflict (type, value) do nothing - `), - - // find verified contact methods (disabled false) with no auth subject - cmMissingSub: p.P(` - select id, user_id, value from user_contact_methods where - type = 'SLACK_DM' and not disabled and not exists ( - select 1 from auth_subjects where cm_id = user_contact_methods.id - ) - for update skip locked - limit 10 - `), - - insertSub: p.P(` - insert into auth_subjects (user_id, subject_id, provider_id, cm_id) - values ($1, $2, $3, $4) - on conflict (subject_id, provider_id) do update set user_id = $1, cm_id = $4 - `), - }, p.Err + }, nil } diff --git a/engine/compatmanager/queries.sql b/engine/compatmanager/queries.sql new file mode 100644 index 0000000000..87c7f49d52 --- /dev/null +++ b/engine/compatmanager/queries.sql @@ -0,0 +1,66 @@ +-- name: CompatAuthSubSlackMissingCM :many +-- Get up to 10 auth_subjects (slack only) missing a contact method. +SELECT + * +FROM + auth_subjects +WHERE + provider_id LIKE 'slack:%' + AND cm_id IS NULL +FOR UPDATE + SKIP LOCKED +LIMIT 10; + +-- name: CompatAuthSubSetCMID :exec +-- Updates the contact method id for an auth_subject with the given destination. +UPDATE + auth_subjects +SET + cm_id =( + SELECT + id + FROM + user_contact_methods + WHERE + type = 'SLACK_DM' + AND value = $2) +WHERE + auth_subjects.id = $1; + +-- name: CompatInsertUserCM :exec +-- Inserts a new contact method for a user. +INSERT INTO user_contact_methods(id, name, type, value, user_id, pending) + VALUES ($1, $2, $3, $4, $5, FALSE) +ON CONFLICT (type, value) + DO NOTHING; + +-- name: CompatCMMissingSub :many +-- Get up to 10 contact methods missing an auth_subjects link. +SELECT + id, + user_id, + value +FROM + user_contact_methods +WHERE + type = 'SLACK_DM' + AND NOT disabled + AND NOT EXISTS ( + SELECT + 1 + FROM + auth_subjects + WHERE + cm_id = user_contact_methods.id) +FOR UPDATE + SKIP LOCKED +LIMIT 10; + +-- name: CompatUpsertAuthSubject :exec +-- Inserts a new auth_subject for a user. +INSERT INTO auth_subjects(user_id, subject_id, provider_id, cm_id) + VALUES ($1, $2, $3, $4) +ON CONFLICT (subject_id, provider_id) + DO UPDATE SET + user_id = $1, cm_id = $4; + diff --git a/engine/compatmanager/update.go b/engine/compatmanager/update.go index 679b67530c..746ed19783 100644 --- a/engine/compatmanager/update.go +++ b/engine/compatmanager/update.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/google/uuid" + "github.com/target/goalert/gadb" "github.com/target/goalert/permission" "github.com/target/goalert/util/log" "github.com/target/goalert/util/sqlutil" @@ -39,39 +40,26 @@ func (db *DB) updateAuthSubjects(ctx context.Context) error { } defer sqlutil.Rollback(ctx, "engine: update auth subjects", tx) - type cm struct { - ID uuid.UUID - UserID uuid.UUID - SlackUserID string - SlackTeamID string - } - - var cms []cm - rows, err := tx.StmtContext(ctx, db.cmMissingSub).QueryContext(ctx) + q := gadb.New(tx) + rows, err := q.CompatCMMissingSub(ctx) if err != nil { return fmt.Errorf("query: %w", err) } - for rows.Next() { - var c cm - err = rows.Scan(&c.ID, &c.UserID, &c.SlackUserID) + for _, row := range rows { + u, err := db.cs.User(ctx, row.Value) if err != nil { - return fmt.Errorf("scan: %w", err) - } - - u, err := db.cs.User(ctx, c.SlackUserID) - if err != nil { - log.Log(ctx, fmt.Errorf("update auth subjects: lookup Slack user (%s): %w", c.SlackUserID, err)) + log.Log(ctx, fmt.Errorf("update auth subjects: lookup Slack user (%s): %w", row.Value, err)) continue } - c.SlackTeamID = u.TeamID - cms = append(cms, c) - } - - for _, c := range cms { - _, err = tx.StmtContext(ctx, db.insertSub).ExecContext(ctx, c.UserID, c.SlackUserID, "slack:"+c.SlackTeamID, c.ID) + err = q.CompatUpsertAuthSubject(ctx, gadb.CompatUpsertAuthSubjectParams{ + UserID: row.UserID, + ProviderID: "slack:" + u.TeamID, + SubjectID: u.ID, + CmID: uuid.NullUUID{UUID: row.ID, Valid: true}, + }) if err != nil { - return fmt.Errorf("insert: %w", err) + return fmt.Errorf("upsert auth subject: %w", err) } } @@ -83,6 +71,11 @@ func (db *DB) updateAuthSubjects(ctx context.Context) error { return nil } +// updateContactMethods will create contact methods for associated auth_subjects (e.g. Slack direct message). +// +// To do this, we look for auth_subjects that are missing the contact method ID +// field (`cm_id`) for slack, and create a Slack DM contact method for the user +// associated with the record. func (db *DB) updateContactMethods(ctx context.Context) error { tx, err := db.lock.BeginTx(ctx, nil) if err != nil { @@ -90,29 +83,13 @@ func (db *DB) updateContactMethods(ctx context.Context) error { } defer sqlutil.Rollback(ctx, "engine: update contact methods", tx) - type sub struct { - ID int - UserID string - SubjectID string - ProviderID string - } - - var subs []sub - rows, err := tx.StmtContext(ctx, db.slackSubMissingCM).QueryContext(ctx) + q := gadb.New(tx) + rows, err := q.CompatAuthSubSlackMissingCM(ctx) if err != nil { return fmt.Errorf("query: %w", err) } - for rows.Next() { - var s sub - err = rows.Scan(&s.ID, &s.UserID, &s.SubjectID, &s.ProviderID) - if err != nil { - return fmt.Errorf("scan: %w", err) - } - subs = append(subs, s) - } - - for _, s := range subs { + for _, s := range rows { // provider id contains the team id in the format "slack:team_id" // but we need to store the contact method id in the format "team_id:subject_id" teamID := strings.TrimPrefix(s.ProviderID, "slack:") @@ -123,12 +100,21 @@ func (db *DB) updateContactMethods(ctx context.Context) error { continue } - _, err = tx.StmtContext(ctx, db.insertCM).ExecContext(ctx, uuid.New(), team.Name, "SLACK_DM", value, s.UserID) + err = q.CompatInsertUserCM(ctx, gadb.CompatInsertUserCMParams{ + ID: uuid.New(), + Name: team.Name, + Type: gadb.EnumUserContactMethodTypeSLACKDM, + Value: value, + UserID: s.UserID, + }) if err != nil { return fmt.Errorf("insert cm: %w", err) } - _, err = tx.StmtContext(ctx, db.updateSubCMID).ExecContext(ctx, s.ID, value) + err = q.CompatAuthSubSetCMID(ctx, gadb.CompatAuthSubSetCMIDParams{ + ID: s.ID, + Value: value, + }) if err != nil { return fmt.Errorf("update sub cm_id: %w", err) } diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index f653d991ce..02292e44a0 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -835,6 +835,181 @@ func (q *Queries) CalSubUserNames(ctx context.Context, dollar_1 []uuid.UUID) ([] return items, nil } +const compatAuthSubSetCMID = `-- name: CompatAuthSubSetCMID :exec +UPDATE + auth_subjects +SET + cm_id =( + SELECT + id + FROM + user_contact_methods + WHERE + type = 'SLACK_DM' + AND value = $2) +WHERE + auth_subjects.id = $1 +` + +type CompatAuthSubSetCMIDParams struct { + ID int64 + Value string +} + +// Updates the contact method id for an auth_subject with the given destination. +func (q *Queries) CompatAuthSubSetCMID(ctx context.Context, arg CompatAuthSubSetCMIDParams) error { + _, err := q.db.ExecContext(ctx, compatAuthSubSetCMID, arg.ID, arg.Value) + return err +} + +const compatAuthSubSlackMissingCM = `-- name: CompatAuthSubSlackMissingCM :many +SELECT + cm_id, id, provider_id, subject_id, user_id +FROM + auth_subjects +WHERE + provider_id LIKE 'slack:%' + AND cm_id IS NULL +FOR UPDATE + SKIP LOCKED +LIMIT 10 +` + +// Get up to 10 auth_subjects (slack only) missing a contact method. +func (q *Queries) CompatAuthSubSlackMissingCM(ctx context.Context) ([]AuthSubject, error) { + rows, err := q.db.QueryContext(ctx, compatAuthSubSlackMissingCM) + if err != nil { + return nil, err + } + defer rows.Close() + var items []AuthSubject + for rows.Next() { + var i AuthSubject + if err := rows.Scan( + &i.CmID, + &i.ID, + &i.ProviderID, + &i.SubjectID, + &i.UserID, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const compatCMMissingSub = `-- name: CompatCMMissingSub :many +SELECT + id, + user_id, + value +FROM + user_contact_methods +WHERE + type = 'SLACK_DM' + AND NOT disabled + AND NOT EXISTS ( + SELECT + 1 + FROM + auth_subjects + WHERE + cm_id = user_contact_methods.id) +FOR UPDATE + SKIP LOCKED +LIMIT 10 +` + +type CompatCMMissingSubRow struct { + ID uuid.UUID + UserID uuid.UUID + Value string +} + +// Get up to 10 contact methods missing an auth_subjects link. +func (q *Queries) CompatCMMissingSub(ctx context.Context) ([]CompatCMMissingSubRow, error) { + rows, err := q.db.QueryContext(ctx, compatCMMissingSub) + if err != nil { + return nil, err + } + defer rows.Close() + var items []CompatCMMissingSubRow + for rows.Next() { + var i CompatCMMissingSubRow + if err := rows.Scan(&i.ID, &i.UserID, &i.Value); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const compatInsertUserCM = `-- name: CompatInsertUserCM :exec +INSERT INTO user_contact_methods(id, name, type, value, user_id, pending) + VALUES ($1, $2, $3, $4, $5, FALSE) +ON CONFLICT (type, value) + DO NOTHING +` + +type CompatInsertUserCMParams struct { + ID uuid.UUID + Name string + Type EnumUserContactMethodType + Value string + UserID uuid.UUID +} + +// Inserts a new contact method for a user. +func (q *Queries) CompatInsertUserCM(ctx context.Context, arg CompatInsertUserCMParams) error { + _, err := q.db.ExecContext(ctx, compatInsertUserCM, + arg.ID, + arg.Name, + arg.Type, + arg.Value, + arg.UserID, + ) + return err +} + +const compatUpsertAuthSubject = `-- name: CompatUpsertAuthSubject :exec +INSERT INTO auth_subjects(user_id, subject_id, provider_id, cm_id) + VALUES ($1, $2, $3, $4) +ON CONFLICT (subject_id, provider_id) + DO UPDATE SET + user_id = $1, cm_id = $4 +` + +type CompatUpsertAuthSubjectParams struct { + UserID uuid.UUID + SubjectID string + ProviderID string + CmID uuid.NullUUID +} + +// Inserts a new auth_subject for a user. +func (q *Queries) CompatUpsertAuthSubject(ctx context.Context, arg CompatUpsertAuthSubjectParams) error { + _, err := q.db.ExecContext(ctx, compatUpsertAuthSubject, + arg.UserID, + arg.SubjectID, + arg.ProviderID, + arg.CmID, + ) + return err +} + const contactMethodAdd = `-- name: ContactMethodAdd :exec INSERT INTO user_contact_methods(id, name, dest, disabled, user_id, enable_status_updates) VALUES ($1, $2, $3, $4, $5, $6) diff --git a/sqlc.yaml b/sqlc.yaml index d46710722a..fe8bf70507 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -21,6 +21,7 @@ sql: - alert/queries.sql - notice/queries.sql - graphql2/graphqlapp/queries.sql + - engine/compatmanager/queries.sql - engine/statusmgr/queries.sql - engine/message/queries.sql - engine/schedulemanager/queries.sql