Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APPS-1417 Respect scheduled full backup timing #291

Merged
merged 13 commits into from
Dec 25, 2024
8 changes: 8 additions & 0 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,14 @@ const docTemplate = `{
"$ref": "#/definitions/dto.RunningJob"
}
]
},
"last-full": {
"description": "LastFull: the timestamp of the last successful full backup.\nA nil value indicates that there has never been a full backup.",
"type": "string"
},
"last-incremental": {
"description": "LastIncremental: the timestamp of the last successful incremental backup.\nA nil value indicates that there has never been an incremental backup.",
"type": "string"
}
}
},
Expand Down
8 changes: 8 additions & 0 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -2265,6 +2265,14 @@
} ],
"description" : "Incremental represents the state of an incremental backup. Nil if no incremental backup is running.",
"type" : "object"
},
"last-full" : {
"description" : "LastFull: the timestamp of the last successful full backup.\nA nil value indicates that there has never been a full backup.",
"type" : "string"
},
"last-incremental" : {
"description" : "LastIncremental: the timestamp of the last successful incremental backup.\nA nil value indicates that there has never been an incremental backup.",
"type" : "string"
}
},
"type" : "object"
Expand Down
12 changes: 12 additions & 0 deletions docs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1781,7 +1781,9 @@ components:
dto.CurrentBackups:
example:
incremental: "{}"
last-incremental: last-incremental
full: "{}"
last-full: last-full
properties:
full:
allOf:
Expand All @@ -1795,6 +1797,16 @@ components:
description: Incremental represents the state of an incremental backup.
Nil if no incremental backup is running.
type: object
last-full:
description: |-
LastFull: the timestamp of the last successful full backup.
A nil value indicates that there has never been a full backup.
type: string
last-incremental:
description: |-
LastIncremental: the timestamp of the last successful incremental backup.
A nil value indicates that there has never been an incremental backup.
type: string
type: object
dto.EncryptionPolicy:
description: EncryptionPolicy contains backup encryption information.
Expand Down
2 changes: 1 addition & 1 deletion internal/server/handlers/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *Service) GetCurrentBackupInfo(w http.ResponseWriter, r *http.Request) {
return
}

currentBackups := dto.NewCurrentBackupsFromModel(handler.GetCurrentStat())
currentBackups := dto.NewCurrentBackupsFromModel(handler.CurrentStat())
response, err := dto.Serialize(currentBackups, dto.JSON)
if err != nil {
hLogger.Error("failed to marshal statistics",
Expand Down
8 changes: 8 additions & 0 deletions pkg/dto/current_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ type CurrentBackups struct {
Full *RunningJob `json:"full,omitempty"`
// Incremental represents the state of an incremental backup. Nil if no incremental backup is running.
Incremental *RunningJob `json:"incremental,omitempty"`
// LastFull: the timestamp of the last successful full backup.
// A nil value indicates that there has never been a full backup.
LastFull *time.Time `json:"last-full,omitempty"`
// LastIncremental: the timestamp of the last successful incremental backup.
// A nil value indicates that there has never been an incremental backup.
LastIncremental *time.Time `json:"last-incremental,omitempty"`
}

func NewCurrentBackupsFromModel(m *model.CurrentBackups) *CurrentBackups {
Expand All @@ -27,6 +33,8 @@ func NewCurrentBackupsFromModel(m *model.CurrentBackups) *CurrentBackups {
func (c *CurrentBackups) fromModel(m *model.CurrentBackups) {
c.Full = NewRunningJobFromModel(m.Full)
c.Incremental = NewRunningJobFromModel(m.Incremental)
c.LastFull = m.LastRunTime.Full
c.LastIncremental = m.LastRunTime.Incremental
}

// RunningJob tracks progress of currently running job.
Expand Down
2 changes: 2 additions & 0 deletions pkg/model/current_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ type CurrentBackups struct {
Full *RunningJob
// Incremental represents the state of an incremental backup. Nil if no incremental backup is running.
Incremental *RunningJob
// LastRunTime: the last time when a backup was run
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LastRunTime contains information about the latest run time for both full and incremental backups.

LastRunTime LastBackupRun
}

// RunningJob tracks progress of currently running job.
Expand Down
30 changes: 30 additions & 0 deletions pkg/model/last_run.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last_backup_run.go

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package model

import "time"

// LastBackupRun stores the last run times for both full and incremental backups.
type LastBackupRun struct {
// Last time the Full backup was performed.
Full *time.Time
// Last time the Incremental backup was performed.
Incremental *time.Time
}

func NewLastRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) LastBackupRun {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming to MakeLastBackupRun or CreateLastBackupRun, since it is not a good practice to return a value for New constructors.

return LastBackupRun{
Full: lastFullBackup,
Incremental: lastIncrBackup,
}
}

func (r *LastBackupRun) NoFullBackup() bool {
return r.Full == nil
}

func (r *LastBackupRun) LastAnyRun() *time.Time {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming to LatestRun.

if r.Incremental != nil && r.Full != nil && r.Incremental.After(*r.Full) {
return r.Incremental
}

return r.Full
}
35 changes: 6 additions & 29 deletions pkg/service/backup_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,21 @@ func newBackend(routineName string, storage model.Storage) *BackupBackend {
}
}

func (b *BackupBackend) findLastRun(ctx context.Context) lastBackupRun {
func (b *BackupBackend) findLastRun(ctx context.Context) model.LastBackupRun {
fullBackupList, _ := b.FullBackupList(ctx, model.TimeBounds{})
lastFullBackup := lastBackupTime(fullBackupList)
incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.NewTimeBoundsFrom(lastFullBackup))
incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.TimeBounds{FromTime: lastFullBackup})
lastIncrBackup := lastBackupTime(incrementalBackupList)

return lastBackupRun{
full: lastFullBackup,
incremental: lastIncrBackup,
}
return model.NewLastRun(lastFullBackup, lastIncrBackup)
}

func lastBackupTime(b []model.BackupDetails) time.Time {
func lastBackupTime(b []model.BackupDetails) *time.Time {
if len(b) > 0 {
return latestBackupBeforeTime(b, nil)[0].Created
return &latestBackupBeforeTime(b, nil)[0].Created
}

return time.Time{}
return nil
}

func (b *BackupBackend) writeBackupMetadata(ctx context.Context, path string, metadata model.BackupMetadata) error {
Expand Down Expand Up @@ -206,23 +203,3 @@ func (b *BackupBackend) packageFiles(buffers []*bytes.Buffer) ([]byte, error) {

return buf.Bytes(), nil
}

// lastBackupRun stores the last run times for both full and incremental backups.
type lastBackupRun struct {
// Last time the full backup was performed.
full time.Time
// Last time the incremental backup was performed.
incremental time.Time
}

func (r *lastBackupRun) noFullBackup() bool {
return r.full.Equal(time.Time{})
}

func (r *lastBackupRun) lastAnyRun() time.Time {
if r.incremental.After(r.full) {
return r.incremental
}

return r.full
}
25 changes: 13 additions & 12 deletions pkg/service/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ import (

// backupJob implements the quartz.Job interface.
type backupJob struct {
handler *BackupRoutineHandler
jobType jobType
isRunning atomic.Bool
handler backupRunner
jobType jobType
isRunning atomic.Bool
routineName string
logger *slog.Logger
}

var _ quartz.Job = (*backupJob)(nil)

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (j *backupJob) Execute(ctx context.Context) error {
logger := slog.Default().With(slog.String("routine", j.handler.routineName),
slog.Any("type", j.jobType))

if j.isRunning.CompareAndSwap(false, true) {
defer j.isRunning.Store(false)
switch j.jobType {
Expand All @@ -32,10 +31,10 @@ func (j *backupJob) Execute(ctx context.Context) error {
case jobTypeIncremental:
j.handler.runIncrementalBackup(ctx, util.NowWithZeroNanoseconds())
default:
logger.Error("Unsupported backup type")
j.logger.Error("Unsupported backup type")
}
} else {
logger.Debug("Backup is currently in progress, skipping it")
j.logger.Debug("Backup is currently in progress, skipping it")
incrementSkippedCounters(j.jobType)
}

Expand All @@ -53,13 +52,15 @@ func incrementSkippedCounters(jobType jobType) {

// Description returns the description of the backup job.
func (j *backupJob) Description() string {
return fmt.Sprintf("%s %s backup job", j.handler.routineName, j.jobType)
return fmt.Sprintf("%s %s backup job", j.routineName, j.jobType)
}

// newBackupJob creates a new backup job.
func newBackupJob(handler *BackupRoutineHandler, jobType jobType) quartz.Job {
func newBackupJob(handler backupRunner, jobType jobType, routineName string) quartz.Job {
return &backupJob{
handler: handler,
jobType: jobType,
handler: handler,
jobType: jobType,
routineName: routineName,
logger: slog.Default().With(slog.String("routine", routineName), slog.Any("type", jobType)),
}
}
35 changes: 25 additions & 10 deletions pkg/service/backup_routine_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type BackupRoutineHandler struct {
namespaces []string
storage model.Storage
secretAgent *model.SecretAgent
lastRun lastBackupRun
lastRun model.LastBackupRun
retry executor
clientManager aerospike.ClientManager
logger *slog.Logger
Expand Down Expand Up @@ -78,8 +78,22 @@ type ClusterConfigWriter interface {
Write(ctx context.Context, client backup.AerospikeClient, timestamp time.Time)
}

// BackupHandlerHolder stores backupHandlers by routine name
type BackupHandlerHolder map[string]*BackupRoutineHandler
// backupRunner runs backup operations.
type backupRunner interface {
// runFullBackup starts full backup.
runFullBackup(context.Context, time.Time)
// runIncrementalBackup starts incremental backup.
runIncrementalBackup(context.Context, time.Time)
// Cancel cancels all running backup jobs.
Cancel()
// CurrentStat returns current status of backup routines.
CurrentStat() *model.CurrentBackups
}

var _ backupRunner = (*BackupRoutineHandler)(nil)

// BackupHandlerHolder stores backupRunners by routine name
type BackupHandlerHolder map[string]backupRunner

// newBackupRoutineHandler returns a new BackupRoutineHandler instance.
func newBackupRoutineHandler(
Expand All @@ -88,7 +102,7 @@ func newBackupRoutineHandler(
backupService Backup,
routineName string,
backupBackend *BackupBackend,
lastRun lastBackupRun,
lastRun model.LastBackupRun,
) *BackupRoutineHandler {
backupRoutine := config.BackupRoutines[routineName]
backupPolicy := backupRoutine.BackupPolicy
Expand Down Expand Up @@ -158,7 +172,7 @@ func (h *BackupRoutineHandler) runFullBackupInternal(ctx context.Context, now ti
return err
}

h.lastRun.full = now
h.lastRun.Full = &now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a new thing, but it is better to synchronize writes and reads to prevent data races.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No backup of the same type for the same routine will run simultaneously, no need for lock

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two jobs scheduled for the full and incremental backup that may concurrently access those variables.


h.clusterConfigWriter.Write(ctx, client.AerospikeClient(), now)

Expand Down Expand Up @@ -225,8 +239,8 @@ func (h *BackupRoutineHandler) createTimebounds(fullBackup bool, now time.Time)
)

if !fullBackup {
lastRun := h.lastRun.lastAnyRun()
fromTime = &lastRun
lastRun := h.lastRun.LastAnyRun()
fromTime = lastRun
}

if h.backupFullPolicy.IsSealedOrDefault() {
Expand Down Expand Up @@ -292,7 +306,7 @@ func (h *BackupRoutineHandler) runIncrementalBackup(ctx context.Context, now tim
}

func (h *BackupRoutineHandler) skipIncrementalBackup() bool {
if h.lastRun.noFullBackup() {
if h.lastRun.NoFullBackup() {
h.logger.Debug("Skip incremental backup until initial full backup is done")
return true
}
Expand Down Expand Up @@ -328,7 +342,7 @@ func (h *BackupRoutineHandler) runIncrementalBackupInternal(ctx context.Context,
return err
}

h.lastRun.incremental = now
h.lastRun.Incremental = &now
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider synchronizing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No backup of the same type for the same routine will run simultaneously, no need for lock

return nil
}

Expand Down Expand Up @@ -372,10 +386,11 @@ func (h *BackupRoutineHandler) waitForIncrementalBackups(ctx context.Context) er
return aggregatedErr
}

func (h *BackupRoutineHandler) GetCurrentStat() *model.CurrentBackups {
func (h *BackupRoutineHandler) CurrentStat() *model.CurrentBackups {
return &model.CurrentBackups{
Full: currentBackupStatus(h.fullBackupHandlers),
Incremental: currentBackupStatus(h.incrBackupHandlers),
LastRunTime: h.lastRun,
}
}

Expand Down
Loading
Loading