-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add maintenance mode for upgrades #2211
base: main
Are you sure you want to change the base?
Conversation
fb17402
to
ec62418
Compare
} | ||
|
||
func UpdatePeerDBMaintenanceModeEnabled(ctx context.Context, pool *pgxpool.Pool, enabled bool) error { | ||
return UpdateDynamicSetting(ctx, pool, "PEERDB_MAINTENANCE_MODE_ENABLED", ptr.String(strconv.FormatBool(enabled))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is controlled by system I'm not sure we want to expose it to user where they can toggle it, in which case it shouldn't be in dynconf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be helpful for manual intervention or maintenance. We can hide it later from dynamic config if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking it'd be nice to be able to auth into ui with peerdb/clickhouse email & have access to extended functionality. Then this could be in catalog & have ui for us
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, maybe via oauth where different roles can map to different permissions like superadmin, admin or readonly
logEvery time.Duration, | ||
alertEvery time.Duration, | ||
) (protos.FlowStatus, error) { | ||
// In case a mirror was just kicked off, it shows up in the running state, we wait for a bit before checking for snapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this really shouldn't happen, seems like a bug
b9db36e
to
ce78585
Compare
- also shift to tickers for waiting
09f7882
to
1d2b024
Compare
|
||
flowStatus, err := RunEveryIntervalUntilFinish(ctx, func() (bool, protos.FlowStatus, error) { | ||
activity.RecordHeartbeat(ctx, fmt.Sprintf("Waiting for mirror %s to finish snapshot", mirror.MirrorName)) | ||
mirrorStatus, err = a.getMirrorStatus(ctx, mirror) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mirrorStatus, err = a.getMirrorStatus(ctx, mirror) | |
mirrorStatus, err := a.getMirrorStatus(ctx, mirror) |
mirrorStatus, err := a.getMirrorStatus(ctx, mirror) | ||
if err != nil { | ||
return mirrorStatus, err | ||
} | ||
|
||
slog.Info("Checking and waiting if mirror is snapshot", "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "status", | ||
mirrorStatus.String()) | ||
if mirrorStatus != protos.FlowStatus_STATUS_SNAPSHOT && mirrorStatus != protos.FlowStatus_STATUS_SETUP { | ||
return mirrorStatus, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mirrorStatus, err := a.getMirrorStatus(ctx, mirror) | |
if err != nil { | |
return mirrorStatus, err | |
} | |
slog.Info("Checking and waiting if mirror is snapshot", "mirror", mirror.MirrorName, "workflowId", mirror.WorkflowId, "status", | |
mirrorStatus.String()) | |
if mirrorStatus != protos.FlowStatus_STATUS_SNAPSHOT && mirrorStatus != protos.FlowStatus_STATUS_SETUP { | |
return mirrorStatus, nil | |
} |
err = model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.PauseSignal) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err = model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.PauseSignal) | |
if err != nil { | |
if err := model.FlowSignal.SignalClientWorkflow(ctx, a.TemporalClient, mirror.WorkflowId, "", model.PauseSignal); err != nil { |
elsewhere too
row := pool.QueryRow(ctx, ` | ||
select cli_version, api_version, skipped, skipped_reason | ||
from maintenance.start_maintenance_outputs | ||
order by created_at desc | ||
limit 1 | ||
`) | ||
var result StartMaintenanceResult | ||
err = row.Scan(&result.CLIVersion, &result.APIVersion, &result.Skipped, &result.SkippedReason) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row := pool.QueryRow(ctx, ` | |
select cli_version, api_version, skipped, skipped_reason | |
from maintenance.start_maintenance_outputs | |
order by created_at desc | |
limit 1 | |
`) | |
var result StartMaintenanceResult | |
err = row.Scan(&result.CLIVersion, &result.APIVersion, &result.Skipped, &result.SkippedReason) | |
if err != nil { | |
var result StartMaintenanceResult | |
if err := pool.QueryRow(ctx, ` | |
select cli_version, api_version, skipped, skipped_reason | |
from maintenance.start_maintenance_outputs | |
order by created_at desc | |
limit 1 | |
`).Scan(&result.CLIVersion, &result.APIVersion, &result.Skipped, &result.SkippedReason); err != nil { |
var state protos.FlowStatus | ||
err = res.Get(&state) | ||
if err != nil { | ||
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) | |
slog.Error("failed to get status in workflow with ID "+workflowID, slog.Any("error", err))) |
err = res.Get(&state) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
err = res.Get(&state) | |
if err != nil { | |
if err := res.Get(&state); err != nil { |
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) | ||
return protos.FlowStatus_STATUS_UNKNOWN, | ||
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slog.Error(fmt.Sprintf("failed to get status in workflow with ID %s: %s", workflowID, err.Error())) | |
return protos.FlowStatus_STATUS_UNKNOWN, | |
fmt.Errorf("failed to get status in workflow with ID %s: %w", workflowID, err) | |
slog.Error(fmt.Sprintf("failed to query status in workflow with ID %s: %s", workflowID, err.Error())) | |
return protos.FlowStatus_STATUS_UNKNOWN, | |
fmt.Errorf("failed to query status in workflow with ID %s: %w", workflowID, err) |
makes error messages distinct so that when looking up error messages in code we know which line was hit
PEERDB_MAINTENANCE_MODE_ENABLED
)StartMaintenance
- for pre-upgrade, responsible forEndMaintenance
- for post-upgrade, responsible forStart
andEnd
), mirrors cannot be mutated/created in any way,Ready
/Maintenance
which can be used for UI changes later.There are 2 ways to trigger these 2 workflows:
maintenance
entrypoint with the respective argsA new task queue is added so that the maintenance tasks can be spun up even during pre-upgrade hooks (from version earlier than ones containing this PR) and this also ensures that always the latest version of the maintenance flows run irrespective of the old version.