From c5f95294592fe59d1c19200cc8169a363e9eb912 Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 11:46:15 +0200 Subject: [PATCH 01/13] replace *BackupRoutineHandler with interface --- internal/server/handlers/backup.go | 2 +- pkg/model/current_backup.go | 2 + pkg/service/backup_backend.go | 35 +++------------ pkg/service/backup_job.go | 34 ++++++++++----- pkg/service/backup_routine_handler.go | 19 ++++---- pkg/service/backup_routine_handler_test.go | 23 +++++----- pkg/service/backup_scheduler.go | 51 ++++------------------ pkg/service/backup_scheduler_test.go | 46 ++----------------- pkg/service/config_applier.go | 4 +- pkg/service/metrics.go | 2 +- 10 files changed, 67 insertions(+), 151 deletions(-) diff --git a/internal/server/handlers/backup.go b/internal/server/handlers/backup.go index bb228988..a6949c49 100644 --- a/internal/server/handlers/backup.go +++ b/internal/server/handlers/backup.go @@ -311,7 +311,7 @@ func (s *Service) GetCurrentBackupInfo(w http.ResponseWriter, r *http.Request) { return } - currentBackups := dto.NewCurrentBackupsFromModel(handler.GetCurrentStat()) + currentBackups := dto.NewCurrentBackupsFromModel(handler.CurrentStat()) response, err := dto.Serialize(currentBackups, dto.JSON) if err != nil { hLogger.Error("failed to marshal statistics", diff --git a/pkg/model/current_backup.go b/pkg/model/current_backup.go index 704848c5..b7890252 100644 --- a/pkg/model/current_backup.go +++ b/pkg/model/current_backup.go @@ -10,6 +10,8 @@ type CurrentBackups struct { Full *RunningJob // Incremental represents the state of an incremental backup. Nil if no incremental backup is running. Incremental *RunningJob + // LastRunTime: the last time when a backup was run + LastRunTime LastBackupRun } // RunningJob tracks progress of currently running job. diff --git a/pkg/service/backup_backend.go b/pkg/service/backup_backend.go index 2ab9068b..034e84cc 100644 --- a/pkg/service/backup_backend.go +++ b/pkg/service/backup_backend.go @@ -34,24 +34,21 @@ func newBackend(routineName string, storage model.Storage) *BackupBackend { } } -func (b *BackupBackend) findLastRun(ctx context.Context) lastBackupRun { +func (b *BackupBackend) findLastRun(ctx context.Context) model.LastBackupRun { fullBackupList, _ := b.FullBackupList(ctx, model.TimeBounds{}) lastFullBackup := lastBackupTime(fullBackupList) - incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.NewTimeBoundsFrom(lastFullBackup)) + incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.TimeBounds{FromTime: lastFullBackup}) lastIncrBackup := lastBackupTime(incrementalBackupList) - return lastBackupRun{ - full: lastFullBackup, - incremental: lastIncrBackup, - } + return model.NewLastRun(lastFullBackup, lastIncrBackup) } -func lastBackupTime(b []model.BackupDetails) time.Time { +func lastBackupTime(b []model.BackupDetails) *time.Time { if len(b) > 0 { - return latestBackupBeforeTime(b, nil)[0].Created + return &latestBackupBeforeTime(b, nil)[0].Created } - return time.Time{} + return nil } func (b *BackupBackend) writeBackupMetadata(ctx context.Context, path string, metadata model.BackupMetadata) error { @@ -206,23 +203,3 @@ func (b *BackupBackend) packageFiles(buffers []*bytes.Buffer) ([]byte, error) { return buf.Bytes(), nil } - -// lastBackupRun stores the last run times for both full and incremental backups. -type lastBackupRun struct { - // Last time the full backup was performed. - full time.Time - // Last time the incremental backup was performed. - incremental time.Time -} - -func (r *lastBackupRun) noFullBackup() bool { - return r.full.Equal(time.Time{}) -} - -func (r *lastBackupRun) lastAnyRun() time.Time { - if r.incremental.After(r.full) { - return r.incremental - } - - return r.full -} diff --git a/pkg/service/backup_job.go b/pkg/service/backup_job.go index 7fcb10ed..b8e6b364 100644 --- a/pkg/service/backup_job.go +++ b/pkg/service/backup_job.go @@ -5,25 +5,33 @@ import ( "fmt" "log/slog" "sync/atomic" + "time" + "github.com/aerospike/aerospike-backup-service/v2/pkg/model" "github.com/aerospike/aerospike-backup-service/v2/pkg/util" "github.com/reugn/go-quartz/quartz" ) +type backupRunner interface { + runFullBackup(context.Context, time.Time) + runIncrementalBackup(context.Context, time.Time) + Cancel() + CurrentStat() *model.CurrentBackups +} + // backupJob implements the quartz.Job interface. type backupJob struct { - handler *BackupRoutineHandler - jobType jobType - isRunning atomic.Bool + handler backupRunner + jobType jobType + isRunning atomic.Bool + routineName string + logger *slog.Logger } var _ quartz.Job = (*backupJob)(nil) // Execute is called by a Scheduler when the Trigger associated with this job fires. func (j *backupJob) Execute(ctx context.Context) error { - logger := slog.Default().With(slog.String("routine", j.handler.routineName), - slog.Any("type", j.jobType)) - if j.isRunning.CompareAndSwap(false, true) { defer j.isRunning.Store(false) switch j.jobType { @@ -32,10 +40,10 @@ func (j *backupJob) Execute(ctx context.Context) error { case jobTypeIncremental: j.handler.runIncrementalBackup(ctx, util.NowWithZeroNanoseconds()) default: - logger.Error("Unsupported backup type") + j.logger.Error("Unsupported backup type") } } else { - logger.Debug("Backup is currently in progress, skipping it") + j.logger.Debug("Backup is currently in progress, skipping it") incrementSkippedCounters(j.jobType) } @@ -53,13 +61,15 @@ func incrementSkippedCounters(jobType jobType) { // Description returns the description of the backup job. func (j *backupJob) Description() string { - return fmt.Sprintf("%s %s backup job", j.handler.routineName, j.jobType) + return fmt.Sprintf("%s %s backup job", j.routineName, j.jobType) } // newBackupJob creates a new backup job. -func newBackupJob(handler *BackupRoutineHandler, jobType jobType) quartz.Job { +func newBackupJob(handler backupRunner, jobType jobType, routineName string) quartz.Job { return &backupJob{ - handler: handler, - jobType: jobType, + handler: handler, + jobType: jobType, + routineName: routineName, + logger: slog.Default().With(slog.String("routine", routineName), slog.Any("type", jobType)), } } diff --git a/pkg/service/backup_routine_handler.go b/pkg/service/backup_routine_handler.go index 4465f0d9..56a2da81 100644 --- a/pkg/service/backup_routine_handler.go +++ b/pkg/service/backup_routine_handler.go @@ -26,7 +26,7 @@ type BackupRoutineHandler struct { namespaces []string storage model.Storage secretAgent *model.SecretAgent - lastRun lastBackupRun + lastRun model.LastBackupRun retry executor clientManager aerospike.ClientManager logger *slog.Logger @@ -79,7 +79,7 @@ type ClusterConfigWriter interface { } // BackupHandlerHolder stores backupHandlers by routine name -type BackupHandlerHolder map[string]*BackupRoutineHandler +type BackupHandlerHolder map[string]backupRunner // newBackupRoutineHandler returns a new BackupRoutineHandler instance. func newBackupRoutineHandler( @@ -88,7 +88,7 @@ func newBackupRoutineHandler( backupService Backup, routineName string, backupBackend *BackupBackend, - lastRun lastBackupRun, + lastRun model.LastBackupRun, ) *BackupRoutineHandler { backupRoutine := config.BackupRoutines[routineName] backupPolicy := backupRoutine.BackupPolicy @@ -158,7 +158,7 @@ func (h *BackupRoutineHandler) runFullBackupInternal(ctx context.Context, now ti return err } - h.lastRun.full = now + h.lastRun.Full = &now h.clusterConfigWriter.Write(ctx, client.AerospikeClient(), now) @@ -225,8 +225,8 @@ func (h *BackupRoutineHandler) createTimebounds(fullBackup bool, now time.Time) ) if !fullBackup { - lastRun := h.lastRun.lastAnyRun() - fromTime = &lastRun + lastRun := h.lastRun.LastAnyRun() + fromTime = lastRun } if h.backupFullPolicy.IsSealedOrDefault() { @@ -292,7 +292,7 @@ func (h *BackupRoutineHandler) runIncrementalBackup(ctx context.Context, now tim } func (h *BackupRoutineHandler) skipIncrementalBackup() bool { - if h.lastRun.noFullBackup() { + if h.lastRun.NoFullBackup() { h.logger.Debug("Skip incremental backup until initial full backup is done") return true } @@ -328,7 +328,7 @@ func (h *BackupRoutineHandler) runIncrementalBackupInternal(ctx context.Context, return err } - h.lastRun.incremental = now + h.lastRun.Incremental = &now return nil } @@ -372,10 +372,11 @@ func (h *BackupRoutineHandler) waitForIncrementalBackups(ctx context.Context) er return aggregatedErr } -func (h *BackupRoutineHandler) GetCurrentStat() *model.CurrentBackups { +func (h *BackupRoutineHandler) CurrentStat() *model.CurrentBackups { return &model.CurrentBackups{ Full: currentBackupStatus(h.fullBackupHandlers), Incremental: currentBackupStatus(h.incrBackupHandlers), + LastRunTime: h.lastRun, } } diff --git a/pkg/service/backup_routine_handler_test.go b/pkg/service/backup_routine_handler_test.go index 119b08ea..51db7875 100644 --- a/pkg/service/backup_routine_handler_test.go +++ b/pkg/service/backup_routine_handler_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/aerospike/aerospike-backup-service/v2/pkg/model" + "github.com/aerospike/aerospike-backup-service/v2/pkg/util" "github.com/aerospike/backup-go" "github.com/aerospike/backup-go/models" "github.com/stretchr/testify/assert" @@ -113,7 +114,7 @@ func setupTestHandler( backupFullPolicy: &model.BackupPolicy{}, fullBackupHandlers: make(map[string]CancelableBackupHandler), incrBackupHandlers: make(map[string]CancelableBackupHandler), - lastRun: lastBackupRun{}, + lastRun: model.LastBackupRun{}, storage: &model.LocalStorage{Path: "/tmp"}, logger: slog.Default(), retry: &simpleExecutor{}, @@ -222,7 +223,7 @@ func TestRunIncrementalBackup_NoFullBackupYet(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = lastBackupRun{} // Ensure empty lastRun + handler.lastRun = model.LastBackupRun{} // Ensure empty lastRun handler.runIncrementalBackup(context.Background(), time.Now()) @@ -238,8 +239,8 @@ func TestRunIncrementalBackup_SkipIfFullBackupInProgress(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = lastBackupRun{ - full: time.Now(), // Set last full run + handler.lastRun = model.LastBackupRun{ + Full: util.Ptr(time.Now()), // Set last full run } handler.fullBackupHandlers["ns1"] = &mockBackupHandler{} @@ -258,8 +259,8 @@ func TestRunIncrementalBackup_SkipIfIncrementalBackupInProgress(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = lastBackupRun{ - full: time.Now(), // Set last full run + handler.lastRun = model.LastBackupRun{ + Full: util.Ptr(time.Now()), // Set last full run } handler.incrBackupHandlers["test"] = &mockBackupHandler{} @@ -278,8 +279,8 @@ func TestRunIncrementalBackup_ClientError(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = lastBackupRun{ - full: time.Now(), + handler.lastRun = model.LastBackupRun{ + Full: util.Ptr(time.Now()), } expectedErr := errors.New("client error") @@ -301,8 +302,8 @@ func TestRunIncrementalBackup_Success(t *testing.T) { handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) now := time.Now() lastRun := now.Add(-1 * time.Hour) - handler.lastRun = lastBackupRun{ - full: lastRun, + handler.lastRun = model.LastBackupRun{ + Full: &lastRun, } backupHandler := new(mockBackupHandler) @@ -346,7 +347,7 @@ func TestRunIncrementalBackup_Success(t *testing.T) { clientManager.AssertExpectations(t) backupService.AssertExpectations(t) backupHandler.AssertExpectations(t) - assert.Equal(t, now, handler.lastRun.incremental) + assert.Equal(t, now, *handler.CurrentStat().LastRunTime.Incremental) } func TestRunFullBackup_PartialFailure(t *testing.T) { diff --git a/pkg/service/backup_scheduler.go b/pkg/service/backup_scheduler.go index b865ed62..06214fc1 100644 --- a/pkg/service/backup_scheduler.go +++ b/pkg/service/backup_scheduler.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "log/slog" "sync" "time" @@ -99,54 +98,36 @@ func scheduleRoutines( } func scheduleFullBackup( - scheduler Scheduler, handler *BackupRoutineHandler, interval string, routineName string, + scheduler Scheduler, handler backupRunner, interval string, routineName string, ) error { fullCronTrigger, err := quartz.NewCronTrigger(interval) if err != nil { return err } - fullJob := newBackupJob(handler, jobTypeFull) + fullJob := newBackupJob(handler, jobTypeFull, routineName) fullJobDetail := quartz.NewJobDetail(fullJob, fullJobKey(routineName)) - - if err = scheduler.ScheduleJob(fullJobDetail, fullCronTrigger); err != nil { - return err - } - jobStore.put(fullJobDetail.JobKey().String(), fullJobDetail) - if needToRunFullBackupNow(handler.lastRun.full, fullCronTrigger) { - slog.Debug("Schedule initial full backup", "name", routineName) - fullJobDetail := quartz.NewJobDetail( - fullJob, - quartz.NewJobKey(routineName), - ) - if err = scheduler.ScheduleJob(fullJobDetail, quartz.NewRunOnceTrigger(0)); err != nil { - return err - } - } - return nil + + return scheduler.ScheduleJob(fullJobDetail, fullCronTrigger) } func scheduleIncrementalBackup( - scheduler Scheduler, handler *BackupRoutineHandler, interval string, routineName string, + scheduler Scheduler, handler backupRunner, interval string, routineName string, ) error { incrCronTrigger, err := quartz.NewCronTrigger(interval) if err != nil { return err } - incrementalJob := newBackupJob(handler, jobTypeIncremental) + incrementalJob := newBackupJob(handler, jobTypeIncremental, routineName) incrJobDetail := quartz.NewJobDetail( incrementalJob, incrJobKey(routineName), ) - - if err = scheduler.ScheduleJob(incrJobDetail, incrCronTrigger); err != nil { - return err - } - jobStore.put(incrJobDetail.JobKey().String(), incrJobDetail) - return nil + + return scheduler.ScheduleJob(incrJobDetail, incrCronTrigger) } func incrJobKey(routineName string) *quartz.JobKey { @@ -163,19 +144,3 @@ func adhocKey(name string) *quartz.JobKey { jobName := fmt.Sprintf("%s-adhoc-%d", name, time.Now().UnixMilli()) return quartz.NewJobKeyWithGroup(jobName, string(quartzGroupAdHoc)) } - -func needToRunFullBackupNow(lastFullRun time.Time, trigger *quartz.CronTrigger) bool { - if lastFullRun.Equal(time.Time{}) { - return true // no previous run - } - - fireTimeNano, err := trigger.NextFireTime(lastFullRun.UnixNano()) - if err != nil { - return true // some error, run backup to be safe - } - if time.Unix(0, fireTimeNano).Before(time.Now()) { - return true // next scheduled backup is in the past - } - - return false -} diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index 256f6970..b28b0c8e 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -5,52 +5,12 @@ import ( "time" "github.com/aerospike/aerospike-backup-service/v2/pkg/model" + "github.com/aerospike/aerospike-backup-service/v2/pkg/util" "github.com/reugn/go-quartz/quartz" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestNeedToRunFullBackupNow(t *testing.T) { - tests := []struct { - name string - lastFullRun time.Time - trigger *quartz.CronTrigger - expected bool - }{ - { - name: "NoPreviousBackup", - lastFullRun: time.Time{}, - trigger: newTrigger(""), - expected: true, - }, - { - name: "DueForBackup", - lastFullRun: time.Now().Add(-25 * time.Hour), - trigger: newTrigger("@daily"), - expected: true, - }, - { - name: "NoNeedForBackup", - lastFullRun: time.Now().Add(-10 * time.Second), - trigger: newTrigger("@daily"), - expected: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := needToRunFullBackupNow(tt.lastFullRun, tt.trigger); got != tt.expected { - t.Errorf("needToRunFullBackupNow() = %v, want %v", got, tt.expected) - } - }) - } -} - -func newTrigger(expression string) *quartz.CronTrigger { - trigger, _ := quartz.NewCronTrigger(expression) - return trigger -} - // MockScheduler implements quartz.Scheduler for testing. type MockScheduler struct { mock.Mock @@ -75,8 +35,8 @@ func TestDisabledRoutine(t *testing.T) { handlers := BackupHandlerHolder{ "routine1": &BackupRoutineHandler{}, - "routine2": &BackupRoutineHandler{lastRun: lastBackupRun{ - full: time.Now(), + "routine2": &BackupRoutineHandler{lastRun: model.LastBackupRun{ + Full: util.Ptr(time.Now()), }}, } diff --git a/pkg/service/config_applier.go b/pkg/service/config_applier.go index 521fad39..d3c88b9e 100644 --- a/pkg/service/config_applier.go +++ b/pkg/service/config_applier.go @@ -123,9 +123,9 @@ func makeHandler( backend, _ := backends.Get(routineName) // try to reuse lastRun from previous handler if it exists. - var lastRun lastBackupRun + var lastRun model.LastBackupRun if old, ok := oldHandlers[routineName]; ok { - lastRun = old.lastRun + lastRun = old.CurrentStat().LastRunTime } else { lastRun = backend.findLastRun(ctx) // this scan can take some time. } diff --git a/pkg/service/metrics.go b/pkg/service/metrics.go index 606f5189..54ed8021 100644 --- a/pkg/service/metrics.go +++ b/pkg/service/metrics.go @@ -121,7 +121,7 @@ func (mc *MetricsCollector) collectBackupMetrics() { backupProgress.Reset() for routineName, handler := range mc.backupHandler { - currentStat := handler.GetCurrentStat() + currentStat := handler.CurrentStat() // Update Full backup metric if running if currentStat.Full != nil { From 79bf8f3c74e2f3560e93176399ac79ae97bf0c33 Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 11:47:24 +0200 Subject: [PATCH 02/13] add lst run --- pkg/model/last_run.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 pkg/model/last_run.go diff --git a/pkg/model/last_run.go b/pkg/model/last_run.go new file mode 100644 index 00000000..222036fd --- /dev/null +++ b/pkg/model/last_run.go @@ -0,0 +1,30 @@ +package model + +import "time" + +// LastBackupRun stores the last run times for both full and incremental backups. +type LastBackupRun struct { + // Last time the Full backup was performed. + Full *time.Time + // Last time the Incremental backup was performed. + Incremental *time.Time +} + +func NewLastRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) LastBackupRun { + return LastBackupRun{ + Full: lastFullBackup, + Incremental: lastIncrBackup, + } +} + +func (r *LastBackupRun) NoFullBackup() bool { + return r.Full == nil +} + +func (r *LastBackupRun) LastAnyRun() *time.Time { + if r.Incremental != nil && r.Full != nil && r.Incremental.After(*r.Full) { + return r.Incremental + } + + return r.Full +} From 30e79723f07eede8e9a3811d4651988733195f1c Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 12:13:52 +0200 Subject: [PATCH 03/13] add last run to cureent state --- docs/docs.go | 8 ++++++++ docs/openapi.json | 8 ++++++++ docs/openapi.yaml | 12 ++++++++++++ pkg/dto/current_backup.go | 8 ++++++++ 4 files changed, 36 insertions(+) diff --git a/docs/docs.go b/docs/docs.go index c3b536f5..bc688479 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -2085,6 +2085,14 @@ const docTemplate = `{ "$ref": "#/definitions/dto.RunningJob" } ] + }, + "last-full": { + "description": "LastFull: the timestamp of the last successful full backup.\nA nil value indicates that there has never been a full backup.", + "type": "string" + }, + "last-incremental": { + "description": "LastIncremental: the timestamp of the last successful incremental backup.\nA nil value indicates that there has never been an incremental backup.", + "type": "string" } } }, diff --git a/docs/openapi.json b/docs/openapi.json index 97bef681..580ab510 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -2265,6 +2265,14 @@ } ], "description" : "Incremental represents the state of an incremental backup. Nil if no incremental backup is running.", "type" : "object" + }, + "last-full" : { + "description" : "LastFull: the timestamp of the last successful full backup.\nA nil value indicates that there has never been a full backup.", + "type" : "string" + }, + "last-incremental" : { + "description" : "LastIncremental: the timestamp of the last successful incremental backup.\nA nil value indicates that there has never been an incremental backup.", + "type" : "string" } }, "type" : "object" diff --git a/docs/openapi.yaml b/docs/openapi.yaml index ea852a1e..1aff77cb 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -1781,7 +1781,9 @@ components: dto.CurrentBackups: example: incremental: "{}" + last-incremental: last-incremental full: "{}" + last-full: last-full properties: full: allOf: @@ -1795,6 +1797,16 @@ components: description: Incremental represents the state of an incremental backup. Nil if no incremental backup is running. type: object + last-full: + description: |- + LastFull: the timestamp of the last successful full backup. + A nil value indicates that there has never been a full backup. + type: string + last-incremental: + description: |- + LastIncremental: the timestamp of the last successful incremental backup. + A nil value indicates that there has never been an incremental backup. + type: string type: object dto.EncryptionPolicy: description: EncryptionPolicy contains backup encryption information. diff --git a/pkg/dto/current_backup.go b/pkg/dto/current_backup.go index 9f0e546a..0ca0c02d 100644 --- a/pkg/dto/current_backup.go +++ b/pkg/dto/current_backup.go @@ -12,6 +12,12 @@ type CurrentBackups struct { Full *RunningJob `json:"full,omitempty"` // Incremental represents the state of an incremental backup. Nil if no incremental backup is running. Incremental *RunningJob `json:"incremental,omitempty"` + // LastFull: the timestamp of the last successful full backup. + // A nil value indicates that there has never been a full backup. + LastFull *time.Time `json:"last-full,omitempty"` + // LastIncremental: the timestamp of the last successful incremental backup. + // A nil value indicates that there has never been an incremental backup. + LastIncremental *time.Time `json:"last-incremental,omitempty"` } func NewCurrentBackupsFromModel(m *model.CurrentBackups) *CurrentBackups { @@ -27,6 +33,8 @@ func NewCurrentBackupsFromModel(m *model.CurrentBackups) *CurrentBackups { func (c *CurrentBackups) fromModel(m *model.CurrentBackups) { c.Full = NewRunningJobFromModel(m.Full) c.Incremental = NewRunningJobFromModel(m.Incremental) + c.LastFull = m.LastRunTime.Full + c.LastIncremental = m.LastRunTime.Incremental } // RunningJob tracks progress of currently running job. From 37cdf36f2a9638c9235e97337e6ffb816795097d Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 12:40:26 +0200 Subject: [PATCH 04/13] use routines --- pkg/service/backup_scheduler.go | 4 ++-- pkg/service/backup_scheduler_test.go | 2 +- pkg/service/config_applier.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/service/backup_scheduler.go b/pkg/service/backup_scheduler.go index 06214fc1..82cc5f24 100644 --- a/pkg/service/backup_scheduler.go +++ b/pkg/service/backup_scheduler.go @@ -71,10 +71,10 @@ func NewScheduler(ctx context.Context) quartz.Scheduler { // scheduleRoutines schedules the given handlers using the scheduler. func scheduleRoutines( - scheduler Scheduler, config *model.Config, handlers BackupHandlerHolder, + scheduler Scheduler, routines map[string]*model.BackupRoutine, handlers BackupHandlerHolder, ) error { var errs error - for routineName, routine := range config.BackupRoutines { + for routineName, routine := range routines { if routine.Disabled { continue } diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index b28b0c8e..fa43d04e 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -40,7 +40,7 @@ func TestDisabledRoutine(t *testing.T) { }}, } - err := scheduleRoutines(mockScheduler, config, handlers) + err := scheduleRoutines(mockScheduler, config.BackupRoutines, handlers) require.NoError(t, err) mockScheduler.AssertNumberOfCalls(t, "ScheduleJob", 1) diff --git a/pkg/service/config_applier.go b/pkg/service/config_applier.go index d3c88b9e..06f73577 100644 --- a/pkg/service/config_applier.go +++ b/pkg/service/config_applier.go @@ -59,7 +59,7 @@ func (a *DefaultConfigApplier) ApplyNewConfig(ctx context.Context) error { (a.handlerHolder)[k] = v } - err = scheduleRoutines(a.scheduler, a.config, a.handlerHolder) + err = scheduleRoutines(a.scheduler, a.config.BackupRoutines, a.handlerHolder) if err != nil { return fmt.Errorf("failed to schedule periodic backups: %w", err) } From 72424e2154ff94d7b27a0ae821cebead934f529b Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 14:10:46 +0200 Subject: [PATCH 05/13] add unit test --- pkg/service/backup_scheduler_test.go | 79 ++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index fa43d04e..ce812cb5 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -1,6 +1,7 @@ package service import ( + "context" "testing" "time" @@ -45,3 +46,81 @@ func TestDisabledRoutine(t *testing.T) { require.NoError(t, err) mockScheduler.AssertNumberOfCalls(t, "ScheduleJob", 1) } + +// MockBackupRunner is a mock implementation of backupRunner interface +type MockBackupRunner struct { + mock.Mock +} + +func (m *MockBackupRunner) runFullBackup(ctx context.Context, t time.Time) { + m.Called(ctx, t) +} + +func (m *MockBackupRunner) runIncrementalBackup(ctx context.Context, t time.Time) { + m.Called(ctx, t) +} + +func (m *MockBackupRunner) Cancel() { + m.Called() +} + +func (m *MockBackupRunner) CurrentStat() *model.CurrentBackups { + args := m.Called() + return args.Get(0).(*model.CurrentBackups) +} + +func TestScheduleRoutines(t *testing.T) { + holder := BackupHandlerHolder{ + "routine": &MockBackupRunner{}, + "disabled-routine": &MockBackupRunner{}, + "full-only": &MockBackupRunner{}, + } + tests := []struct { + name string + routines map[string]*model.BackupRoutine + expectedCalls int + }{ + { + name: "successful scheduling of full and incremental backups", + routines: map[string]*model.BackupRoutine{ + "routine": { + IntervalCron: "0 0 * * * *", + IncrIntervalCron: "0 */6 * * * *", + }, + }, + expectedCalls: 2, // One for full backup, one for incremental + }, + { + name: "skip disabled routine", + routines: map[string]*model.BackupRoutine{ + "disabled-routine": { + IntervalCron: "0 0 * * * *", + IncrIntervalCron: "0 */6 * * * *", + Disabled: true, + }, + }, + expectedCalls: 0, // No calls expected for disabled routine + }, + { + name: "full backup only", + routines: map[string]*model.BackupRoutine{ + "full-only": { + IntervalCron: "0 0 * * * *", + }, + }, + expectedCalls: 1, // One call for full backup only + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheduler := new(MockScheduler) + scheduler.On("ScheduleJob", mock.Anything, mock.Anything).Return(nil) + + err := scheduleRoutines(scheduler, tt.routines, holder) + + require.NoError(t, err) + scheduler.AssertNumberOfCalls(t, "ScheduleJob", tt.expectedCalls) + }) + } +} From 69ee08ab5501476d20e5886797ceb895dc407dcd Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 15:04:04 +0200 Subject: [PATCH 06/13] don't save incremental jobs to store --- pkg/service/backup_scheduler.go | 57 ++++++++++++---------------- pkg/service/backup_scheduler_test.go | 18 ++++++--- 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/pkg/service/backup_scheduler.go b/pkg/service/backup_scheduler.go index 82cc5f24..4497ebf8 100644 --- a/pkg/service/backup_scheduler.go +++ b/pkg/service/backup_scheduler.go @@ -46,7 +46,7 @@ func NewAdHocFullBackupJobForRoutine(routineName string) *quartz.JobDetail { jobStore.Lock() defer jobStore.Unlock() - key := fullJobKey(routineName).String() + key := jobKey(routineName, jobTypeFull).String() job := jobStore.jobs[key] if job == nil { return nil @@ -86,57 +86,50 @@ func scheduleRoutines( continue } - if routine.IncrIntervalCron != "" { - // schedule an incremental backup job for the routine - if err := scheduleIncrementalBackup(scheduler, handler, routine.IncrIntervalCron, routineName); err != nil { - errs = errors.Join(errs, fmt.Errorf("failed to schedule incremental backup: %w", err)) - continue - } + // schedule an incremental backup job for the routine + if err := scheduleIncrementalBackup(scheduler, handler, routine.IncrIntervalCron, routineName); err != nil { + errs = errors.Join(errs, fmt.Errorf("failed to schedule incremental backup: %w", err)) } } + return errs } func scheduleFullBackup( scheduler Scheduler, handler backupRunner, interval string, routineName string, ) error { - fullCronTrigger, err := quartz.NewCronTrigger(interval) - if err != nil { - return err - } - - fullJob := newBackupJob(handler, jobTypeFull, routineName) - fullJobDetail := quartz.NewJobDetail(fullJob, fullJobKey(routineName)) - jobStore.put(fullJobDetail.JobKey().String(), fullJobDetail) - - return scheduler.ScheduleJob(fullJobDetail, fullCronTrigger) + job := createJobDetail(handler, routineName, jobTypeFull) + jobStore.put(job.JobKey().String(), job) + return schedule(scheduler, interval, job) } func scheduleIncrementalBackup( scheduler Scheduler, handler backupRunner, interval string, routineName string, ) error { - incrCronTrigger, err := quartz.NewCronTrigger(interval) - if err != nil { - return err + if len(interval) == 0 { // no need to schedule if there is no interval set + return nil } - incrementalJob := newBackupJob(handler, jobTypeIncremental, routineName) - incrJobDetail := quartz.NewJobDetail( - incrementalJob, - incrJobKey(routineName), - ) - jobStore.put(incrJobDetail.JobKey().String(), incrJobDetail) + job := createJobDetail(handler, routineName, jobTypeIncremental) + return schedule(scheduler, interval, job) +} - return scheduler.ScheduleJob(incrJobDetail, incrCronTrigger) +func createJobDetail(handler backupRunner, routineName string, jobType jobType) *quartz.JobDetail { + job := newBackupJob(handler, jobType, routineName) + return quartz.NewJobDetail(job, jobKey(routineName, jobType)) } -func incrJobKey(routineName string) *quartz.JobKey { - jobName := fmt.Sprintf("%s-%s", routineName, jobTypeIncremental) - return quartz.NewJobKeyWithGroup(jobName, string(quartzGroupScheduled)) +func schedule(scheduler Scheduler, interval string, jobDetail *quartz.JobDetail) error { + cronTrigger, err := quartz.NewCronTrigger(interval) + if err != nil { + return err + } + + return scheduler.ScheduleJob(jobDetail, cronTrigger) } -func fullJobKey(routineName string) *quartz.JobKey { - jobName := fmt.Sprintf("%s-%s", routineName, jobTypeFull) +func jobKey(routineName string, jobType jobType) *quartz.JobKey { + jobName := fmt.Sprintf("%s-%s", routineName, jobType) return quartz.NewJobKeyWithGroup(jobName, string(quartzGroupScheduled)) } diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index ce812cb5..9fb1f0e5 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -76,9 +76,10 @@ func TestScheduleRoutines(t *testing.T) { "full-only": &MockBackupRunner{}, } tests := []struct { - name string - routines map[string]*model.BackupRoutine - expectedCalls int + name string + routines map[string]*model.BackupRoutine + expectedCalls int + expectedJobsAdded int }{ { name: "successful scheduling of full and incremental backups", @@ -88,7 +89,8 @@ func TestScheduleRoutines(t *testing.T) { IncrIntervalCron: "0 */6 * * * *", }, }, - expectedCalls: 2, // One for full backup, one for incremental + expectedCalls: 2, // One for full backup, one for incremental + expectedJobsAdded: 1, // Only full backup job added }, { name: "skip disabled routine", @@ -99,7 +101,8 @@ func TestScheduleRoutines(t *testing.T) { Disabled: true, }, }, - expectedCalls: 0, // No calls expected for disabled routine + expectedCalls: 0, // No calls expected for disabled routine + expectedJobsAdded: 0, }, { name: "full backup only", @@ -108,7 +111,8 @@ func TestScheduleRoutines(t *testing.T) { IntervalCron: "0 0 * * * *", }, }, - expectedCalls: 1, // One call for full backup only + expectedCalls: 1, // One call for full backup only + expectedJobsAdded: 1, // Only full backup job added }, } @@ -117,10 +121,12 @@ func TestScheduleRoutines(t *testing.T) { scheduler := new(MockScheduler) scheduler.On("ScheduleJob", mock.Anything, mock.Anything).Return(nil) + jobsBefore := len(jobStore.jobs) err := scheduleRoutines(scheduler, tt.routines, holder) require.NoError(t, err) scheduler.AssertNumberOfCalls(t, "ScheduleJob", tt.expectedCalls) + require.Equal(t, len(jobStore.jobs), jobsBefore+tt.expectedJobsAdded) }) } } From 6d9e503f1c01b43b9785da694b1124e5099b5510 Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 15:19:28 +0200 Subject: [PATCH 07/13] clear jobs --- pkg/service/backup_scheduler.go | 8 ++++++++ pkg/service/backup_scheduler_test.go | 3 +-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/service/backup_scheduler.go b/pkg/service/backup_scheduler.go index 4497ebf8..308b137f 100644 --- a/pkg/service/backup_scheduler.go +++ b/pkg/service/backup_scheduler.go @@ -41,6 +41,12 @@ func (b *backupJobs) put(key string, value *quartz.JobDetail) { b.jobs[key] = value } +func (b *backupJobs) clear() { + b.Lock() + defer b.Unlock() + clear(b.jobs) +} + // NewAdHocFullBackupJobForRoutine returns a new full backup job for the routine name. func NewAdHocFullBackupJobForRoutine(routineName string) *quartz.JobDetail { jobStore.Lock() @@ -73,6 +79,8 @@ func NewScheduler(ctx context.Context) quartz.Scheduler { func scheduleRoutines( scheduler Scheduler, routines map[string]*model.BackupRoutine, handlers BackupHandlerHolder, ) error { + jobStore.clear() + var errs error for routineName, routine := range routines { if routine.Disabled { diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index 9fb1f0e5..0fd62bdf 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -121,12 +121,11 @@ func TestScheduleRoutines(t *testing.T) { scheduler := new(MockScheduler) scheduler.On("ScheduleJob", mock.Anything, mock.Anything).Return(nil) - jobsBefore := len(jobStore.jobs) err := scheduleRoutines(scheduler, tt.routines, holder) require.NoError(t, err) scheduler.AssertNumberOfCalls(t, "ScheduleJob", tt.expectedCalls) - require.Equal(t, len(jobStore.jobs), jobsBefore+tt.expectedJobsAdded) + require.Equal(t, len(jobStore.jobs), tt.expectedJobsAdded) }) } } From e1b26fb4f7e407a1bb913e4559fd836e24a7c784 Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 16:09:23 +0200 Subject: [PATCH 08/13] add interface description --- pkg/service/backup_job.go | 9 --------- pkg/service/backup_routine_handler.go | 16 +++++++++++++++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/service/backup_job.go b/pkg/service/backup_job.go index b8e6b364..4fa4a6f0 100644 --- a/pkg/service/backup_job.go +++ b/pkg/service/backup_job.go @@ -5,20 +5,11 @@ import ( "fmt" "log/slog" "sync/atomic" - "time" - "github.com/aerospike/aerospike-backup-service/v2/pkg/model" "github.com/aerospike/aerospike-backup-service/v2/pkg/util" "github.com/reugn/go-quartz/quartz" ) -type backupRunner interface { - runFullBackup(context.Context, time.Time) - runIncrementalBackup(context.Context, time.Time) - Cancel() - CurrentStat() *model.CurrentBackups -} - // backupJob implements the quartz.Job interface. type backupJob struct { handler backupRunner diff --git a/pkg/service/backup_routine_handler.go b/pkg/service/backup_routine_handler.go index 56a2da81..22540b77 100644 --- a/pkg/service/backup_routine_handler.go +++ b/pkg/service/backup_routine_handler.go @@ -78,7 +78,21 @@ type ClusterConfigWriter interface { Write(ctx context.Context, client backup.AerospikeClient, timestamp time.Time) } -// BackupHandlerHolder stores backupHandlers by routine name +// backupRunner runs backup operations. +type backupRunner interface { + // runFullBackup starts full backup. + runFullBackup(context.Context, time.Time) + // runIncrementalBackup starts incremental backup. + runIncrementalBackup(context.Context, time.Time) + // Cancel cancels all running backup jobs. + Cancel() + // CurrentStat returns current status of backup routines. + CurrentStat() *model.CurrentBackups +} + +var _ backupRunner = (*BackupRoutineHandler)(nil) + +// BackupHandlerHolder stores backupRunners by routine name type BackupHandlerHolder map[string]backupRunner // newBackupRoutineHandler returns a new BackupRoutineHandler instance. From ce1f834ed0d976ff91ee54519f64e474ffab0ebe Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 20:14:54 +0200 Subject: [PATCH 09/13] renamings --- pkg/model/{last_run.go => last_backup_run.go} | 10 ++++++---- pkg/service/backup_backend.go | 2 +- pkg/service/backup_routine_handler.go | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) rename pkg/model/{last_run.go => last_backup_run.go} (83%) diff --git a/pkg/model/last_run.go b/pkg/model/last_backup_run.go similarity index 83% rename from pkg/model/last_run.go rename to pkg/model/last_backup_run.go index 222036fd..02d88890 100644 --- a/pkg/model/last_run.go +++ b/pkg/model/last_backup_run.go @@ -1,6 +1,8 @@ package model -import "time" +import ( + "time" +) // LastBackupRun stores the last run times for both full and incremental backups. type LastBackupRun struct { @@ -10,8 +12,8 @@ type LastBackupRun struct { Incremental *time.Time } -func NewLastRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) LastBackupRun { - return LastBackupRun{ +func NewLastRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) *LastBackupRun { + return &LastBackupRun{ Full: lastFullBackup, Incremental: lastIncrBackup, } @@ -21,7 +23,7 @@ func (r *LastBackupRun) NoFullBackup() bool { return r.Full == nil } -func (r *LastBackupRun) LastAnyRun() *time.Time { +func (r *LastBackupRun) LatestRun() *time.Time { if r.Incremental != nil && r.Full != nil && r.Incremental.After(*r.Full) { return r.Incremental } diff --git a/pkg/service/backup_backend.go b/pkg/service/backup_backend.go index 034e84cc..a8dd4cb3 100644 --- a/pkg/service/backup_backend.go +++ b/pkg/service/backup_backend.go @@ -40,7 +40,7 @@ func (b *BackupBackend) findLastRun(ctx context.Context) model.LastBackupRun { incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.TimeBounds{FromTime: lastFullBackup}) lastIncrBackup := lastBackupTime(incrementalBackupList) - return model.NewLastRun(lastFullBackup, lastIncrBackup) + return *model.NewLastRun(lastFullBackup, lastIncrBackup) } func lastBackupTime(b []model.BackupDetails) *time.Time { diff --git a/pkg/service/backup_routine_handler.go b/pkg/service/backup_routine_handler.go index 22540b77..f7ac82c7 100644 --- a/pkg/service/backup_routine_handler.go +++ b/pkg/service/backup_routine_handler.go @@ -239,7 +239,7 @@ func (h *BackupRoutineHandler) createTimebounds(fullBackup bool, now time.Time) ) if !fullBackup { - lastRun := h.lastRun.LastAnyRun() + lastRun := h.lastRun.LatestRun() fromTime = lastRun } From bf22cba1c87a741858eaf0fdd5a6a7d7480a644d Mon Sep 17 00:00:00 2001 From: akorotkov Date: Tue, 24 Dec 2024 20:16:25 +0200 Subject: [PATCH 10/13] comments --- pkg/model/current_backup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/model/current_backup.go b/pkg/model/current_backup.go index b7890252..984079b0 100644 --- a/pkg/model/current_backup.go +++ b/pkg/model/current_backup.go @@ -10,7 +10,7 @@ type CurrentBackups struct { Full *RunningJob // Incremental represents the state of an incremental backup. Nil if no incremental backup is running. Incremental *RunningJob - // LastRunTime: the last time when a backup was run + // LastRunTime contains information about the latest run time for both full and incremental backups. LastRunTime LastBackupRun } From afc186cf4afe1ecab08fab96e0c0867ac2482c79 Mon Sep 17 00:00:00 2001 From: akorotkov Date: Wed, 25 Dec 2024 10:09:57 +0200 Subject: [PATCH 11/13] fix datarace --- pkg/model/current_backup.go | 2 +- pkg/model/last_backup_run.go | 24 ++++++++++++++++++---- pkg/service/backup_backend.go | 4 ++-- pkg/service/backup_routine_handler.go | 8 ++++---- pkg/service/backup_routine_handler_test.go | 12 +++++------ pkg/service/backup_scheduler_test.go | 2 +- pkg/service/config_applier.go | 2 +- 7 files changed, 35 insertions(+), 19 deletions(-) diff --git a/pkg/model/current_backup.go b/pkg/model/current_backup.go index 984079b0..9dc3ce58 100644 --- a/pkg/model/current_backup.go +++ b/pkg/model/current_backup.go @@ -11,7 +11,7 @@ type CurrentBackups struct { // Incremental represents the state of an incremental backup. Nil if no incremental backup is running. Incremental *RunningJob // LastRunTime contains information about the latest run time for both full and incremental backups. - LastRunTime LastBackupRun + LastRunTime *LastBackupRun } // RunningJob tracks progress of currently running job. diff --git a/pkg/model/last_backup_run.go b/pkg/model/last_backup_run.go index 02d88890..d74ca5da 100644 --- a/pkg/model/last_backup_run.go +++ b/pkg/model/last_backup_run.go @@ -1,14 +1,14 @@ package model import ( + "sync" "time" ) // LastBackupRun stores the last run times for both full and incremental backups. type LastBackupRun struct { - // Last time the Full backup was performed. - Full *time.Time - // Last time the Incremental backup was performed. + mu sync.RWMutex + Full *time.Time Incremental *time.Time } @@ -20,13 +20,29 @@ func NewLastRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) *LastBacku } func (r *LastBackupRun) NoFullBackup() bool { + r.mu.RLock() + defer r.mu.RUnlock() return r.Full == nil } func (r *LastBackupRun) LatestRun() *time.Time { + r.mu.RLock() + defer r.mu.RUnlock() + if r.Incremental != nil && r.Full != nil && r.Incremental.After(*r.Full) { return r.Incremental } - return r.Full } + +func (r *LastBackupRun) SetFullBackupTime(t *time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + r.Full = t +} + +func (r *LastBackupRun) SetIncrementalBackupTime(t *time.Time) { + r.mu.Lock() + defer r.mu.Unlock() + r.Incremental = t +} diff --git a/pkg/service/backup_backend.go b/pkg/service/backup_backend.go index a8dd4cb3..4c194dbf 100644 --- a/pkg/service/backup_backend.go +++ b/pkg/service/backup_backend.go @@ -34,13 +34,13 @@ func newBackend(routineName string, storage model.Storage) *BackupBackend { } } -func (b *BackupBackend) findLastRun(ctx context.Context) model.LastBackupRun { +func (b *BackupBackend) findLastRun(ctx context.Context) *model.LastBackupRun { fullBackupList, _ := b.FullBackupList(ctx, model.TimeBounds{}) lastFullBackup := lastBackupTime(fullBackupList) incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.TimeBounds{FromTime: lastFullBackup}) lastIncrBackup := lastBackupTime(incrementalBackupList) - return *model.NewLastRun(lastFullBackup, lastIncrBackup) + return model.NewLastRun(lastFullBackup, lastIncrBackup) } func lastBackupTime(b []model.BackupDetails) *time.Time { diff --git a/pkg/service/backup_routine_handler.go b/pkg/service/backup_routine_handler.go index f7ac82c7..24d2eb31 100644 --- a/pkg/service/backup_routine_handler.go +++ b/pkg/service/backup_routine_handler.go @@ -26,7 +26,7 @@ type BackupRoutineHandler struct { namespaces []string storage model.Storage secretAgent *model.SecretAgent - lastRun model.LastBackupRun + lastRun *model.LastBackupRun retry executor clientManager aerospike.ClientManager logger *slog.Logger @@ -102,7 +102,7 @@ func newBackupRoutineHandler( backupService Backup, routineName string, backupBackend *BackupBackend, - lastRun model.LastBackupRun, + lastRun *model.LastBackupRun, ) *BackupRoutineHandler { backupRoutine := config.BackupRoutines[routineName] backupPolicy := backupRoutine.BackupPolicy @@ -172,7 +172,7 @@ func (h *BackupRoutineHandler) runFullBackupInternal(ctx context.Context, now ti return err } - h.lastRun.Full = &now + h.lastRun.SetFullBackupTime(&now) h.clusterConfigWriter.Write(ctx, client.AerospikeClient(), now) @@ -342,7 +342,7 @@ func (h *BackupRoutineHandler) runIncrementalBackupInternal(ctx context.Context, return err } - h.lastRun.Incremental = &now + h.lastRun.SetIncrementalBackupTime(&now) return nil } diff --git a/pkg/service/backup_routine_handler_test.go b/pkg/service/backup_routine_handler_test.go index 51db7875..2558bdf9 100644 --- a/pkg/service/backup_routine_handler_test.go +++ b/pkg/service/backup_routine_handler_test.go @@ -114,7 +114,7 @@ func setupTestHandler( backupFullPolicy: &model.BackupPolicy{}, fullBackupHandlers: make(map[string]CancelableBackupHandler), incrBackupHandlers: make(map[string]CancelableBackupHandler), - lastRun: model.LastBackupRun{}, + lastRun: &model.LastBackupRun{}, storage: &model.LocalStorage{Path: "/tmp"}, logger: slog.Default(), retry: &simpleExecutor{}, @@ -223,7 +223,7 @@ func TestRunIncrementalBackup_NoFullBackupYet(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = model.LastBackupRun{} // Ensure empty lastRun + handler.lastRun = &model.LastBackupRun{} // Ensure empty lastRun handler.runIncrementalBackup(context.Background(), time.Now()) @@ -239,7 +239,7 @@ func TestRunIncrementalBackup_SkipIfFullBackupInProgress(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = model.LastBackupRun{ + handler.lastRun = &model.LastBackupRun{ Full: util.Ptr(time.Now()), // Set last full run } @@ -259,7 +259,7 @@ func TestRunIncrementalBackup_SkipIfIncrementalBackupInProgress(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = model.LastBackupRun{ + handler.lastRun = &model.LastBackupRun{ Full: util.Ptr(time.Now()), // Set last full run } @@ -279,7 +279,7 @@ func TestRunIncrementalBackup_ClientError(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = model.LastBackupRun{ + handler.lastRun = &model.LastBackupRun{ Full: util.Ptr(time.Now()), } @@ -302,7 +302,7 @@ func TestRunIncrementalBackup_Success(t *testing.T) { handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) now := time.Now() lastRun := now.Add(-1 * time.Hour) - handler.lastRun = model.LastBackupRun{ + handler.lastRun = &model.LastBackupRun{ Full: &lastRun, } diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index 0fd62bdf..5fd5165d 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -36,7 +36,7 @@ func TestDisabledRoutine(t *testing.T) { handlers := BackupHandlerHolder{ "routine1": &BackupRoutineHandler{}, - "routine2": &BackupRoutineHandler{lastRun: model.LastBackupRun{ + "routine2": &BackupRoutineHandler{lastRun: &model.LastBackupRun{ Full: util.Ptr(time.Now()), }}, } diff --git a/pkg/service/config_applier.go b/pkg/service/config_applier.go index 06f73577..d3d128ea 100644 --- a/pkg/service/config_applier.go +++ b/pkg/service/config_applier.go @@ -123,7 +123,7 @@ func makeHandler( backend, _ := backends.Get(routineName) // try to reuse lastRun from previous handler if it exists. - var lastRun model.LastBackupRun + var lastRun *model.LastBackupRun if old, ok := oldHandlers[routineName]; ok { lastRun = old.CurrentStat().LastRunTime } else { From 611c884ad71161426579356b2e3c1a86769b844e Mon Sep 17 00:00:00 2001 From: akorotkov Date: Wed, 25 Dec 2024 10:11:02 +0200 Subject: [PATCH 12/13] update docs --- pkg/model/last_backup_run.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/model/last_backup_run.go b/pkg/model/last_backup_run.go index d74ca5da..d313bd9f 100644 --- a/pkg/model/last_backup_run.go +++ b/pkg/model/last_backup_run.go @@ -7,8 +7,10 @@ import ( // LastBackupRun stores the last run times for both full and incremental backups. type LastBackupRun struct { - mu sync.RWMutex - Full *time.Time + mu sync.RWMutex + // Last time the Full backup was performed. + Full *time.Time + // Last time the Incremental backup was performed. Incremental *time.Time } From 26d1482459beb63ef7225d130c666f3adaeb9b50 Mon Sep 17 00:00:00 2001 From: akorotkov Date: Wed, 25 Dec 2024 11:58:21 +0200 Subject: [PATCH 13/13] make fields private --- pkg/dto/current_backup.go | 4 +-- pkg/model/last_backup_run.go | 34 +++++++++++++++------- pkg/service/backup_backend.go | 2 +- pkg/service/backup_routine_handler_test.go | 18 ++++-------- pkg/service/backup_scheduler_test.go | 4 +-- 5 files changed, 32 insertions(+), 30 deletions(-) diff --git a/pkg/dto/current_backup.go b/pkg/dto/current_backup.go index 0ca0c02d..368e36ff 100644 --- a/pkg/dto/current_backup.go +++ b/pkg/dto/current_backup.go @@ -33,8 +33,8 @@ func NewCurrentBackupsFromModel(m *model.CurrentBackups) *CurrentBackups { func (c *CurrentBackups) fromModel(m *model.CurrentBackups) { c.Full = NewRunningJobFromModel(m.Full) c.Incremental = NewRunningJobFromModel(m.Incremental) - c.LastFull = m.LastRunTime.Full - c.LastIncremental = m.LastRunTime.Incremental + c.LastFull = m.LastRunTime.FullBackupTime() + c.LastIncremental = m.LastRunTime.IncrementalBackupTime() } // RunningJob tracks progress of currently running job. diff --git a/pkg/model/last_backup_run.go b/pkg/model/last_backup_run.go index d313bd9f..b3a22cf1 100644 --- a/pkg/model/last_backup_run.go +++ b/pkg/model/last_backup_run.go @@ -9,42 +9,54 @@ import ( type LastBackupRun struct { mu sync.RWMutex // Last time the Full backup was performed. - Full *time.Time + full *time.Time // Last time the Incremental backup was performed. - Incremental *time.Time + incremental *time.Time } -func NewLastRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) *LastBackupRun { +func NewLastBackupRun(lastFullBackup *time.Time, lastIncrBackup *time.Time) *LastBackupRun { return &LastBackupRun{ - Full: lastFullBackup, - Incremental: lastIncrBackup, + full: lastFullBackup, + incremental: lastIncrBackup, } } func (r *LastBackupRun) NoFullBackup() bool { r.mu.RLock() defer r.mu.RUnlock() - return r.Full == nil + return r.full == nil } func (r *LastBackupRun) LatestRun() *time.Time { r.mu.RLock() defer r.mu.RUnlock() - if r.Incremental != nil && r.Full != nil && r.Incremental.After(*r.Full) { - return r.Incremental + if r.incremental != nil && r.full != nil && r.incremental.After(*r.full) { + return r.incremental } - return r.Full + return r.full } func (r *LastBackupRun) SetFullBackupTime(t *time.Time) { r.mu.Lock() defer r.mu.Unlock() - r.Full = t + r.full = t } func (r *LastBackupRun) SetIncrementalBackupTime(t *time.Time) { r.mu.Lock() defer r.mu.Unlock() - r.Incremental = t + r.incremental = t +} + +func (r *LastBackupRun) FullBackupTime() *time.Time { + r.mu.RLock() + defer r.mu.RUnlock() + return r.full +} + +func (r *LastBackupRun) IncrementalBackupTime() *time.Time { + r.mu.RLock() + defer r.mu.RUnlock() + return r.incremental } diff --git a/pkg/service/backup_backend.go b/pkg/service/backup_backend.go index 4c194dbf..c5018b1b 100644 --- a/pkg/service/backup_backend.go +++ b/pkg/service/backup_backend.go @@ -40,7 +40,7 @@ func (b *BackupBackend) findLastRun(ctx context.Context) *model.LastBackupRun { incrementalBackupList, _ := b.IncrementalBackupList(ctx, model.TimeBounds{FromTime: lastFullBackup}) lastIncrBackup := lastBackupTime(incrementalBackupList) - return model.NewLastRun(lastFullBackup, lastIncrBackup) + return model.NewLastBackupRun(lastFullBackup, lastIncrBackup) } func lastBackupTime(b []model.BackupDetails) *time.Time { diff --git a/pkg/service/backup_routine_handler_test.go b/pkg/service/backup_routine_handler_test.go index 2558bdf9..8004a334 100644 --- a/pkg/service/backup_routine_handler_test.go +++ b/pkg/service/backup_routine_handler_test.go @@ -239,9 +239,7 @@ func TestRunIncrementalBackup_SkipIfFullBackupInProgress(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = &model.LastBackupRun{ - Full: util.Ptr(time.Now()), // Set last full run - } + handler.lastRun = model.NewLastBackupRun(util.Ptr(time.Now()), nil) handler.fullBackupHandlers["ns1"] = &mockBackupHandler{} @@ -259,9 +257,7 @@ func TestRunIncrementalBackup_SkipIfIncrementalBackupInProgress(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = &model.LastBackupRun{ - Full: util.Ptr(time.Now()), // Set last full run - } + handler.lastRun = model.NewLastBackupRun(util.Ptr(time.Now()), nil) handler.incrBackupHandlers["test"] = &mockBackupHandler{} @@ -279,9 +275,7 @@ func TestRunIncrementalBackup_ClientError(t *testing.T) { retentionManager := new(mockRetentionManager) handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) - handler.lastRun = &model.LastBackupRun{ - Full: util.Ptr(time.Now()), - } + handler.lastRun = model.NewLastBackupRun(util.Ptr(time.Now()), nil) expectedErr := errors.New("client error") clientManager.On("GetClient", mock.Anything).Return(nil, expectedErr) @@ -302,9 +296,7 @@ func TestRunIncrementalBackup_Success(t *testing.T) { handler := setupTestHandler(backupService, clientManager, metadataWriter, configWriter, retentionManager) now := time.Now() lastRun := now.Add(-1 * time.Hour) - handler.lastRun = &model.LastBackupRun{ - Full: &lastRun, - } + handler.lastRun = model.NewLastBackupRun(&lastRun, nil) backupHandler := new(mockBackupHandler) stats := &models.BackupStats{} @@ -347,7 +339,7 @@ func TestRunIncrementalBackup_Success(t *testing.T) { clientManager.AssertExpectations(t) backupService.AssertExpectations(t) backupHandler.AssertExpectations(t) - assert.Equal(t, now, *handler.CurrentStat().LastRunTime.Incremental) + assert.Equal(t, now, *handler.CurrentStat().LastRunTime.IncrementalBackupTime()) } func TestRunFullBackup_PartialFailure(t *testing.T) { diff --git a/pkg/service/backup_scheduler_test.go b/pkg/service/backup_scheduler_test.go index 5fd5165d..090dba95 100644 --- a/pkg/service/backup_scheduler_test.go +++ b/pkg/service/backup_scheduler_test.go @@ -36,9 +36,7 @@ func TestDisabledRoutine(t *testing.T) { handlers := BackupHandlerHolder{ "routine1": &BackupRoutineHandler{}, - "routine2": &BackupRoutineHandler{lastRun: &model.LastBackupRun{ - Full: util.Ptr(time.Now()), - }}, + "routine2": &BackupRoutineHandler{lastRun: model.NewLastBackupRun(util.Ptr(time.Now()), nil)}, } err := scheduleRoutines(mockScheduler, config.BackupRoutines, handlers)