From ae95cff912fced6a4d15077f2a3eb0ddb51d8d0c Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Wed, 4 Dec 2024 18:18:46 +0100 Subject: [PATCH] Add schedule metrics --- cmd/ydbcp/main.go | 6 +- internal/handlers/delete_backup.go | 20 +-- internal/handlers/delete_backup_test.go | 6 +- internal/handlers/restore_backup.go | 23 +-- internal/handlers/schedule_backup.go | 17 ++- internal/handlers/schedule_backup_test.go | 6 +- internal/handlers/take_backup.go | 7 +- internal/handlers/take_backup_retry.go | 7 +- internal/handlers/take_backup_retry_test.go | 21 ++- internal/handlers/take_backup_test.go | 38 ++--- internal/metrics/metrics.go | 99 ++++++++++-- internal/metrics/metrics_moc.go | 47 ++++-- internal/types/backup_schedule.go | 8 +- internal/types/operation.go | 16 ++ .../schedule_watcher/schedule_watcher.go | 11 +- .../schedule_watcher/schedule_watcher_test.go | 142 ++++++++++++++++-- 16 files changed, 369 insertions(+), 105 deletions(-) diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 59bfb358..bcaf17f4 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -186,11 +186,9 @@ func main() { ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery) xlog.Info(ctx, "Created TtlWatcher") - backupScheduleHandler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQuery, clockwork.NewRealClock(), metrics, - ) + backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, metrics, clockwork.NewRealClock()) - schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler) + schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler, metrics, clockwork.NewRealClock()) xlog.Info(ctx, "Created ScheduleWatcher") diff --git a/internal/handlers/delete_backup.go b/internal/handlers/delete_backup.go index e7f082cb..00604e4d 100644 --- a/internal/handlers/delete_backup.go +++ b/internal/handlers/delete_backup.go @@ -25,7 +25,11 @@ func NewDBOperationHandler( mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon) + err := DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon) + if err == nil { + mon.ReportOperationMetrics(op) + } + return err } } @@ -52,7 +56,7 @@ func DBOperationHandler( return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation)) } - upsertAndReportMetrics := func(operation types.Operation, backup *types.Backup) error { + executeUpsert := func(operation types.Operation, backup *types.Backup) error { var err error if backup != nil { @@ -63,10 +67,6 @@ func DBOperationHandler( err = db.UpdateOperation(ctx, operation) } - if err == nil { - mon.ObserveOperationDuration(operation) - } - return err } @@ -90,7 +90,7 @@ func DBOperationHandler( operation.SetState(types.OperationStateError) operation.SetMessage("Backup not found") operation.GetAudit().CompletedAt = timestamppb.Now() - return upsertAndReportMetrics(operation, nil) + return executeUpsert(operation, nil) } backup := backups[0] @@ -100,14 +100,14 @@ func DBOperationHandler( operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = timestamppb.Now() - return upsertAndReportMetrics(operation, backup) + return executeUpsert(operation, backup) } if backup.Status != types.BackupStateDeleting { operation.SetState(types.OperationStateError) operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backup.Status)) operation.GetAudit().CompletedAt = timestamppb.Now() - return upsertAndReportMetrics(operation, nil) + return executeUpsert(operation, nil) } deleteBackup := func(pathPrefix string, bucket string) error { @@ -150,5 +150,5 @@ func DBOperationHandler( return fmt.Errorf("unexpected operation state %s", dbOp.State) } - return upsertAndReportMetrics(operation, backup) + return executeUpsert(operation, backup) } diff --git a/internal/handlers/delete_backup_test.go b/internal/handlers/delete_backup_test.go index b5de060b..c06c9776 100644 --- a/internal/handlers/delete_backup_test.go +++ b/internal/handlers/delete_backup_test.go @@ -152,7 +152,7 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { assert.Empty(t, objects) // check metrics - assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"]) + assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"]) } func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { @@ -233,7 +233,7 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { assert.Empty(t, objects) // check metrics - assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"]) + assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"]) } func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) { @@ -383,5 +383,5 @@ func TestDBOperationHandlerDeleteMoreThanAllowedLimit(t *testing.T) { assert.Empty(t, objects) // check metrics - assert.Equal(t, int64(objectsListSize*10), mon.GetMetrics()["storage_bytes_deleted"]) + assert.Equal(t, float64(objectsListSize*10), mon.GetMetrics()["storage_bytes_deleted"]) } diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go index 6329ba92..4c425d4a 100644 --- a/internal/handlers/restore_backup.go +++ b/internal/handlers/restore_backup.go @@ -20,7 +20,11 @@ func NewRBOperationHandler( db db.DBConnector, client client.ClientConnector, config config.Config, mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return RBOperationHandler(ctx, op, db, client, config, mon) + err := RBOperationHandler(ctx, op, db, client, config) + if err == nil { + mon.ReportOperationMetrics(op) + } + return err } } @@ -30,7 +34,6 @@ func RBOperationHandler( db db.DBConnector, client client.ClientConnector, config config.Config, - mon metrics.MetricsRegistry, ) error { xlog.Info(ctx, "RBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) @@ -56,16 +59,6 @@ func RBOperationHandler( defer func() { _ = client.Close(ctx, conn) }() - upsertAndReportMetrics := func(operation types.Operation) error { - err := db.UpdateOperation(ctx, operation) - - if err == nil { - mon.ObserveOperationDuration(operation) - } - - return err - } - ydbOpResponse, err := lookupYdbOperationStatus( ctx, client, conn, operation, mr.YdbOperationId, mr.Audit.CreatedAt, config, ) @@ -76,7 +69,7 @@ func RBOperationHandler( operation.SetState(ydbOpResponse.opState) operation.SetMessage(ydbOpResponse.opMessage) operation.GetAudit().CompletedAt = timestamppb.Now() - return upsertAndReportMetrics(operation) + return db.UpdateOperation(ctx, operation) } if ydbOpResponse.opResponse == nil { @@ -126,7 +119,7 @@ func RBOperationHandler( operation.SetState(types.OperationStateError) operation.SetMessage("Operation deadline exceeded") operation.GetAudit().CompletedAt = timestamppb.Now() - return upsertAndReportMetrics(operation) + return db.UpdateOperation(ctx, operation) } return db.UpdateOperation(ctx, operation) @@ -173,5 +166,5 @@ func RBOperationHandler( } operation.GetAudit().CompletedAt = timestamppb.Now() - return upsertAndReportMetrics(operation) + return db.UpdateOperation(ctx, operation) } diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index ba91151c..ff6e03a8 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -15,17 +15,17 @@ import ( pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) -type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule) error +type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.BackupSchedule) error func NewBackupScheduleHandler( queryBuilderFactory queries.WriteQueryBuilderFactory, - clock clockwork.Clock, mon metrics.MetricsRegistry, + clock clockwork.Clock, ) BackupScheduleHandlerType { - return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error { + return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error { return BackupScheduleHandler( ctx, driver, schedule, - queryBuilderFactory, clock, mon, + queryBuilderFactory, mon, clock, ) } } @@ -33,16 +33,15 @@ func NewBackupScheduleHandler( func BackupScheduleHandler( ctx context.Context, driver db.DBConnector, - schedule types.BackupSchedule, + schedule *types.BackupSchedule, queryBuilderFactory queries.WriteQueryBuilderFactory, - clock clockwork.Clock, mon metrics.MetricsRegistry, + clock clockwork.Clock, ) error { if schedule.Status != types.BackupScheduleStateActive { xlog.Error(ctx, "backup schedule is not active", zap.String("scheduleID", schedule.ID)) return errors.New("backup schedule is not active") } - // do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron. if schedule.NextLaunch != nil && schedule.NextLaunch.Before(clock.Now()) { backoff, err := schedule.GetCronDuration() if err != nil { @@ -84,13 +83,15 @@ func BackupScheduleHandler( zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()), ) + mon.IncScheduledBackupsCount(schedule) + err = schedule.UpdateNextLaunch(clock.Now()) if err != nil { return err } return driver.ExecuteUpsert( ctx, - queryBuilderFactory().WithCreateOperation(tbwr).WithUpdateBackupSchedule(schedule), + queryBuilderFactory().WithCreateOperation(tbwr).WithUpdateBackupSchedule(*schedule), ) } return nil diff --git a/internal/handlers/schedule_backup_test.go b/internal/handlers/schedule_backup_test.go index afd19cb0..97fd3dab 100644 --- a/internal/handlers/schedule_backup_test.go +++ b/internal/handlers/schedule_backup_test.go @@ -41,11 +41,9 @@ func TestBackupScheduleHandler(t *testing.T) { ) handler := NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, - clock, - metrics.NewMockMetricsRegistry(), + queries.NewWriteTableQueryMock, metrics.NewMockMetricsRegistry(), clock, ) - err := handler(ctx, dbConnector, schedule) + err := handler(ctx, dbConnector, &schedule) assert.Empty(t, err) // check operation status (should be running) diff --git a/internal/handlers/take_backup.go b/internal/handlers/take_backup.go index 829ffd08..ef961ed3 100644 --- a/internal/handlers/take_backup.go +++ b/internal/handlers/take_backup.go @@ -22,7 +22,11 @@ func NewTBOperationHandler( queryBuilderFactory queries.WriteQueryBuilderFactory, mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory, mon) + err := TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory, mon) + if err == nil { + mon.ReportOperationMetrics(op) + } + return err } } @@ -65,7 +69,6 @@ func TBOperationHandler( ) if err == nil { - mon.ObserveOperationDuration(operation) mon.IncCompletedBackupsCount(containerId, database, status) } diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index 865667cd..7e15bdca 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -32,7 +32,11 @@ func NewTBWROperationHandler( mon metrics.MetricsRegistry, ) types.OperationHandler { return func(ctx context.Context, op types.Operation) error { - return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock, mon) + err := TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock) + if err == nil { + mon.ReportOperationMetrics(op) + } + return err } } @@ -148,7 +152,6 @@ func TBWROperationHandler( clientConfig config.ClientConnectionConfig, queryBuilderFactory queries.WriteQueryBuilderFactory, clock clockwork.Clock, - mon metrics.MetricsRegistry, ) error { ctx = xlog.With(ctx, zap.String("OperationID", operation.GetID())) diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index 66a0944d..6d5b1563 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -237,6 +237,7 @@ func TestTBWRHandlerSuccess(t *testing.T) { clientConnector := client.NewMockClientConnector() + mon := metrics.NewMockMetricsRegistry() handler := NewTBWROperationHandler( dbConnector, clientConnector, @@ -244,7 +245,7 @@ func TestTBWRHandlerSuccess(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), - metrics.NewMockMetricsRegistry(), + mon, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -253,6 +254,8 @@ func TestTBWRHandlerSuccess(t *testing.T) { assert.Empty(t, err) assert.NotEmpty(t, op) assert.Equal(t, types.OperationStateDone, op.GetState()) + assert.Equal(t, float64(1), mon.GetMetrics()["operations_duration_seconds"]) + assert.Equal(t, float64(1), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) } func TestTBWRHandlerSkipRunning(t *testing.T) { @@ -299,6 +302,7 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { clientConnector := client.NewMockClientConnector() + mon := metrics.NewMockMetricsRegistry() handler := NewTBWROperationHandler( dbConnector, clientConnector, @@ -306,7 +310,7 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), - metrics.NewMockMetricsRegistry(), + mon, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -318,6 +322,8 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) assert.Empty(t, err) assert.Equal(t, 3, len(operations)) + assert.Equal(t, float64(0), mon.GetMetrics()["operations_duration_seconds"]) + assert.Equal(t, float64(0), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) } func TestTBWRHandlerSkipError(t *testing.T) { @@ -420,6 +426,7 @@ func TestTBWRHandlerError(t *testing.T) { clientConnector := client.NewMockClientConnector() + mon := metrics.NewMockMetricsRegistry() handler := NewTBWROperationHandler( dbConnector, clientConnector, @@ -427,7 +434,7 @@ func TestTBWRHandlerError(t *testing.T) { config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t2.AsTime()), - metrics.NewMockMetricsRegistry(), + mon, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -437,6 +444,9 @@ func TestTBWRHandlerError(t *testing.T) { assert.NotEmpty(t, op) assert.Equal(t, types.OperationStateError, op.GetState()) assert.Equal(t, fmt.Sprintf("retry attempts exceeded limit: 1. Launched operations %s", ops[0].GetID()), op.GetMessage()) + assert.Equal(t, float64(1), mon.GetMetrics()["operations_duration_seconds"]) + assert.Equal(t, float64(1), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) + } func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { @@ -620,6 +630,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) { clientConnector := client.NewMockClientConnector() + mon := metrics.NewMockMetricsRegistry() handler := NewTBWROperationHandler( dbConnector, clientConnector, @@ -632,7 +643,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) { }, queries.NewWriteTableQueryMock, clockwork.NewFakeClockAt(t1.AsTime()), - metrics.NewMockMetricsRegistry(), + mon, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -656,4 +667,6 @@ func TestTBWRHandlerFullCancel(t *testing.T) { assert.Equal(t, types.OperationStateCancelled, tb.State) assert.Equal(t, types.OperationStateCancelled, tbwr.State) assert.Equal(t, "Success", tbwr.Message) + assert.Equal(t, float64(1), mon.GetMetrics()["operations_duration_seconds"]) + assert.Equal(t, float64(1), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) } diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index 2ad69c80..434aab23 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -307,9 +307,9 @@ func TestTBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) { assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) // check metrics - assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_written"]) - assert.Equal(t, int64(1), mon.GetMetrics()["backups_succeeded_count"]) - assert.Equal(t, int64(0), mon.GetMetrics()["backups_failed_count"]) + assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, float64(1), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, float64(0), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerRunningOperationCancelled(t *testing.T) { @@ -384,9 +384,9 @@ func TestTBOperationHandlerRunningOperationCancelled(t *testing.T) { assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) // check metrics - assert.Equal(t, int64(0), mon.GetMetrics()["storage_bytes_written"]) - assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) - assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) + assert.Equal(t, float64(0), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, float64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, float64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) { @@ -463,9 +463,9 @@ func TestTBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) // check metrics - assert.Equal(t, int64(0), mon.GetMetrics()["storage_bytes_written"]) - assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) - assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) + assert.Equal(t, float64(0), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, float64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, float64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerCancellingOperationInProgress(t *testing.T) { @@ -630,9 +630,9 @@ func TestTBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) // check metrics - assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_written"]) - assert.Equal(t, int64(1), mon.GetMetrics()["backups_succeeded_count"]) - assert.Equal(t, int64(0), mon.GetMetrics()["backups_failed_count"]) + assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, float64(1), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, float64(0), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { @@ -707,9 +707,9 @@ func TestTBOperationHandlerCancellingOperationCancelled(t *testing.T) { assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) // check metrics - assert.Equal(t, int64(0), mon.GetMetrics()["storage_bytes_written"]) - assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) - assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) + assert.Equal(t, float64(0), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, float64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, float64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t *testing.T) { @@ -804,10 +804,10 @@ func TestTBOperationHandlerCancellingOperationCancelledWithRemovingDataFromS3(t assert.Equal(t, 0, len(objects)) // check metrics - assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_written"]) - assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"]) - assert.Equal(t, int64(0), mon.GetMetrics()["backups_succeeded_count"]) - assert.Equal(t, int64(1), mon.GetMetrics()["backups_failed_count"]) + assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_written"]) + assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"]) + assert.Equal(t, float64(0), mon.GetMetrics()["backups_succeeded_count"]) + assert.Equal(t, float64(1), mon.GetMetrics()["backups_failed_count"]) } func TestTBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) { diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 1030226d..535af6ce 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "net/http" "sync" @@ -23,11 +24,13 @@ type MetricsRegistry interface { IncApiCallsCounter(serviceName string, methodName string, status string) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) - ObserveOperationDuration(operation types.Operation) + ReportOperationMetrics(operation types.Operation) IncHandlerRunsCount(containerId string, operationType string) IncFailedHandlerRunsCount(containerId string, operationType string) IncSuccessfulHandlerRunsCount(containerId string, operationType string) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) + IncScheduledBackupsCount(schedule *types.BackupSchedule) + IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) } type MetricsRegistryImpl struct { @@ -53,6 +56,14 @@ type MetricsRegistryImpl struct { // backup metrics backupsFailedCount *prometheus.CounterVec backupsSucceededCount *prometheus.CounterVec + + // schedule metrics + scheduleActionFailedCount *prometheus.CounterVec + scheduleActionSucceededCount *prometheus.CounterVec + scheduleLaunchedTBWRCount *prometheus.CounterVec + scheduleFinishedTBWRCount *prometheus.CounterVec + scheduleLastBackupTimestamp *prometheus.GaugeVec + scheduleRPOMarginRatio *prometheus.GaugeVec } func (s *MetricsRegistryImpl) IncApiCallsCounter(serviceName string, methodName string, code string) { @@ -67,14 +78,27 @@ func (s *MetricsRegistryImpl) IncBytesDeletedCounter(containerId string, bucket s.bytesDeletedCounter.WithLabelValues(containerId, bucket, database).Add(float64(bytes)) } -func (s *MetricsRegistryImpl) ObserveOperationDuration(operation types.Operation) { - if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { - duration := operation.GetAudit().CompletedAt.AsTime().Sub(operation.GetAudit().CreatedAt.AsTime()) - s.operationsDuration.WithLabelValues( - operation.GetContainerID(), - operation.GetType().String(), - operation.GetState().String(), - ).Observe(duration.Seconds()) +func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) { + if !types.IsActive(operation) { + if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { + duration := operation.GetAudit().CompletedAt.AsTime().Sub(operation.GetAudit().CreatedAt.AsTime()) + s.operationsDuration.WithLabelValues( + operation.GetContainerID(), + operation.GetDatabaseName(), + operation.GetType().String(), + operation.GetState().String(), + ).Observe(duration.Seconds()) + } + if operation.GetType() == types.OperationTypeTBWR { + tbwr := operation.(*types.TakeBackupWithRetryOperation) + label := "" + if tbwr.ScheduleID != nil { + label = *tbwr.ScheduleID + } + s.scheduleFinishedTBWRCount.WithLabelValues( + operation.GetContainerID(), operation.GetDatabaseName(), label, operation.GetState().String(), + ).Inc() + } } } @@ -98,6 +122,25 @@ func (s *MetricsRegistryImpl) IncCompletedBackupsCount(containerId string, datab } } +func (s *MetricsRegistryImpl) IncScheduledBackupsCount(schedule *types.BackupSchedule) { + s.scheduleLaunchedTBWRCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc() +} + +func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) { + if err != nil { + s.scheduleActionFailedCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc() + } else { + s.scheduleActionSucceededCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc() + } + if schedule.RecoveryPoint != nil { + s.scheduleLastBackupTimestamp.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Set(float64(schedule.RecoveryPoint.Unix())) + } + info := schedule.GetBackupInfo(clock) + if info != nil { + s.scheduleRPOMarginRatio.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Set(info.LastBackupRpoMarginRatio) + } +} + func NewMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig) *MetricsRegistryImpl { s := &MetricsRegistryImpl{ reg: prometheus.NewRegistry(), @@ -127,7 +170,7 @@ func NewMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met Name: "duration_seconds", Help: "Duration of operations in seconds", Buckets: prometheus.ExponentialBuckets(10, 2, 8), - }, []string{"container_id", "type", "status"}) + }, []string{"container_id", "database", "type", "status"}) s.handlerRunsCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ Subsystem: "operation_processor", @@ -159,6 +202,42 @@ func NewMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met Help: "Total count of successful backups", }, []string{"container_id", "database"}) + s.scheduleActionFailedCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "schedules", + Name: "failed_count", + Help: "Total count of failed scheduled backup runs", + }, []string{"container_id", "database", "schedule_id"}) + + s.scheduleActionSucceededCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "schedules", + Name: "succeeded_count", + Help: "Total count of successful scheduled backup runs", + }, []string{"container_id", "database", "schedule_id"}) + + s.scheduleFinishedTBWRCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "schedules", + Name: "finished_take_backup_with_retry_count", + Help: "Total count of TakeBackupWithRetry operations finished for this schedule", + }, []string{"container_id", "database", "schedule_id", "status"}) + + s.scheduleLaunchedTBWRCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "schedules", + Name: "launched_take_backup_with_retry_count", + Help: "Total count of TakeBackupWithRetry operations launched by this schedule", + }, []string{"container_id", "database", "schedule_id"}) + + s.scheduleLastBackupTimestamp = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "schedules", + Name: "last_backup_timestamp", + Help: "Timestamp of last successful backup for this schedule", + }, []string{"container_id", "database", "schedule_id"}) + + s.scheduleRPOMarginRatio = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "schedules", + Name: "rpo_margin_ratio", + Help: "if RPO is set for schedule, calculates a ratio to which RPO is satisfied", + }, []string{"container_id", "database", "schedule_id"}) + mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(s.reg, promhttp.HandlerOpts{Registry: s.reg})) diff --git a/internal/metrics/metrics_moc.go b/internal/metrics/metrics_moc.go index 14dfe368..4daa68c5 100644 --- a/internal/metrics/metrics_moc.go +++ b/internal/metrics/metrics_moc.go @@ -1,15 +1,24 @@ package metrics import ( + "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "ydbcp/internal/types" ) type MockMetricsRegistry struct { - metrics map[string]int64 + metrics map[string]float64 } -func (s *MockMetricsRegistry) GetMetrics() map[string]int64 { +func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) { + if code == Ydb.StatusIds_SUCCESS { + s.metrics["backups_succeeded_count"]++ + } else { + s.metrics["backups_failed_count"]++ + } +} + +func (s *MockMetricsRegistry) GetMetrics() map[string]float64 { return s.metrics } @@ -18,15 +27,20 @@ func (s *MockMetricsRegistry) IncApiCallsCounter(serviceName string, methodName } func (s *MockMetricsRegistry) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) { - s.metrics["storage_bytes_written"] += bytes + s.metrics["storage_bytes_written"] += float64(bytes) } func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) { - s.metrics["storage_bytes_deleted"] += bytes + s.metrics["storage_bytes_deleted"] += float64(bytes) } -func (s *MockMetricsRegistry) ObserveOperationDuration(operation types.Operation) { - s.metrics["operations_duration_seconds"]++ +func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) { + if !types.IsActive(operation) { + s.metrics["operations_duration_seconds"]++ + if operation.GetType() == types.OperationTypeTBWR { + s.metrics["schedule_finished_take_backup_with_retry_count"]++ + } + } } func (s *MockMetricsRegistry) IncHandlerRunsCount(containerId string, operationType string) { @@ -41,16 +55,27 @@ func (s *MockMetricsRegistry) IncSuccessfulHandlerRunsCount(containerId string, s.metrics["operation_processor_handler_runs_successful_count"]++ } -func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) { - if code == Ydb.StatusIds_SUCCESS { - s.metrics["backups_succeeded_count"]++ +func (s *MockMetricsRegistry) IncScheduledBackupsCount(schedule *types.BackupSchedule) { + s.metrics["schedules_launched_take_backup_with_retry_count"]++ +} + +func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) { + if err != nil { + s.metrics["schedules_failed_count"]++ } else { - s.metrics["backups_failed_count"]++ + s.metrics["schedules_succeeded_count"]++ + } + if schedule.RecoveryPoint != nil { + s.metrics["schedules_last_backup_timestamp"] = float64(schedule.RecoveryPoint.Unix()) + if schedule.ScheduleSettings.RecoveryPointObjective != nil { + info := schedule.GetBackupInfo(clock) + s.metrics["schedules_recovery_point_objective"] = info.LastBackupRpoMarginRatio + } } } func NewMockMetricsRegistry() *MockMetricsRegistry { return &MockMetricsRegistry{ - metrics: make(map[string]int64), + metrics: make(map[string]float64), } } diff --git a/internal/types/backup_schedule.go b/internal/types/backup_schedule.go index 37da86d2..30c1b898 100644 --- a/internal/types/backup_schedule.go +++ b/internal/types/backup_schedule.go @@ -45,7 +45,7 @@ func ParseCronExpr(str string) (*cronexpr.Expression, error) { return cronexpr.Parse(str) } -func (b *BackupSchedule) Proto(clock clockwork.Clock) *pb.BackupSchedule { +func (b *BackupSchedule) GetBackupInfo(clock clockwork.Clock) *pb.ScheduledBackupInfo { var backupInfo *pb.ScheduledBackupInfo if b.LastSuccessfulBackupID != nil { backupInfo = &pb.ScheduledBackupInfo{ @@ -61,6 +61,10 @@ func (b *BackupSchedule) Proto(clock clockwork.Clock) *pb.BackupSchedule { } } } + return backupInfo +} + +func (b *BackupSchedule) Proto(clock clockwork.Clock) *pb.BackupSchedule { var nextLaunchTs *timestamppb.Timestamp nextLaunchTs = nil if b.NextLaunch != nil { @@ -77,7 +81,7 @@ func (b *BackupSchedule) Proto(clock clockwork.Clock) *pb.BackupSchedule { SourcePaths: b.SourcePaths, SourcePathsToExclude: b.SourcePathsToExclude, NextLaunch: nextLaunchTs, - LastSuccessfulBackupInfo: backupInfo, + LastSuccessfulBackupInfo: b.GetBackupInfo(clock), } if b.Name != nil { schedule.ScheduleName = *b.Name diff --git a/internal/types/operation.go b/internal/types/operation.go index 8e24a223..f65992e7 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -31,6 +31,7 @@ type Operation interface { GetAudit() *pb.AuditInfo GetUpdatedAt() *timestamppb.Timestamp SetUpdatedAt(t *timestamppb.Timestamp) + GetDatabaseName() string Copy() Operation Proto() *pb.Operation } @@ -85,6 +86,9 @@ func (o *TakeBackupOperation) GetUpdatedAt() *timestamppb.Timestamp { func (o *TakeBackupOperation) SetUpdatedAt(t *timestamppb.Timestamp) { o.UpdatedAt = t } +func (o *TakeBackupOperation) GetDatabaseName() string { + return o.YdbConnectionParams.DatabaseName +} func (o *TakeBackupOperation) Copy() Operation { copy := *o return © @@ -165,6 +169,9 @@ func (o *RestoreBackupOperation) GetUpdatedAt() *timestamppb.Timestamp { func (o *RestoreBackupOperation) SetUpdatedAt(t *timestamppb.Timestamp) { o.UpdatedAt = t } +func (o *RestoreBackupOperation) GetDatabaseName() string { + return o.YdbConnectionParams.DatabaseName +} func (o *RestoreBackupOperation) Copy() Operation { copy := *o return © @@ -236,6 +243,9 @@ func (o *DeleteBackupOperation) GetUpdatedAt() *timestamppb.Timestamp { func (o *DeleteBackupOperation) SetUpdatedAt(t *timestamppb.Timestamp) { o.UpdatedAt = t } +func (o *DeleteBackupOperation) GetDatabaseName() string { + return o.YdbConnectionParams.DatabaseName +} func (o *DeleteBackupOperation) Copy() Operation { copy := *o return © @@ -302,6 +312,9 @@ func (o *TakeBackupWithRetryOperation) GetUpdatedAt() *timestamppb.Timestamp { func (o *TakeBackupWithRetryOperation) SetUpdatedAt(t *timestamppb.Timestamp) { o.UpdatedAt = t } +func (o *TakeBackupWithRetryOperation) GetDatabaseName() string { + return o.YdbConnectionParams.DatabaseName +} func (o *TakeBackupWithRetryOperation) Copy() Operation { copy := *o return © @@ -392,6 +405,9 @@ func (o *GenericOperation) GetUpdatedAt() *timestamppb.Timestamp { func (o *GenericOperation) SetUpdatedAt(t *timestamppb.Timestamp) { o.UpdatedAt = t } +func (o *GenericOperation) GetDatabaseName() string { + return "" +} func (o *GenericOperation) Copy() Operation { copy := *o return © diff --git a/internal/watchers/schedule_watcher/schedule_watcher.go b/internal/watchers/schedule_watcher/schedule_watcher.go index f4290450..e5b19482 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher.go +++ b/internal/watchers/schedule_watcher/schedule_watcher.go @@ -2,6 +2,7 @@ package schedule_watcher import ( "context" + "github.com/jonboulle/clockwork" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" "sync" @@ -9,6 +10,7 @@ import ( "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/handlers" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" "ydbcp/internal/watchers" @@ -19,13 +21,15 @@ func NewScheduleWatcher( wg *sync.WaitGroup, db db.DBConnector, handler handlers.BackupScheduleHandlerType, + mon metrics.MetricsRegistry, + clock clockwork.Clock, options ...watchers.Option, ) *watchers.WatcherImpl { return watchers.NewWatcher( ctx, wg, func(ctx context.Context, period time.Duration) { - ScheduleWatcherAction(ctx, period, db, handler) + ScheduleWatcherAction(ctx, period, db, handler, clock, mon) }, time.Minute, "BackupSchedule", @@ -38,6 +42,8 @@ func ScheduleWatcherAction( period time.Duration, db db.DBConnector, handler handlers.BackupScheduleHandlerType, + clock clockwork.Clock, + mon metrics.MetricsRegistry, ) { ctx, cancel := context.WithTimeout(baseCtx, period) defer cancel() @@ -63,7 +69,8 @@ func ScheduleWatcherAction( } for _, schedule := range schedules { - err = handler(ctx, db, *schedule) + err = handler(ctx, db, schedule) + mon.IncScheduleCounters(schedule, clock, err) if err != nil { xlog.Error(ctx, "error handling backup schedule", zap.String("scheduleID", schedule.ID), zap.Error(err)) } diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index 14654b2b..7d81f15d 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -4,6 +4,7 @@ import ( "context" "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/durationpb" "sync" "testing" "time" @@ -60,10 +61,9 @@ func TestScheduleWatcherSimple(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) + metricsMock := metrics.NewMockMetricsRegistry() handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, - clock, - metrics.NewMockMetricsRegistry(), + queries.NewWriteTableQueryMock, metricsMock, clock, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -72,6 +72,8 @@ func TestScheduleWatcherSimple(t *testing.T) { &wg, dbConnector, handler, + metricsMock, + clock, watchers.WithTickerProvider(tickerProvider), watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted), ) @@ -114,6 +116,9 @@ func TestScheduleWatcherSimple(t *testing.T) { assert.NotEmpty(t, schedules) assert.Equal(t, len(schedules), 1) assert.Equal(t, *schedules[0].NextLaunch, now.Add(time.Minute)) + + assert.Equal(t, float64(1), metricsMock.GetMetrics()["schedules_succeeded_count"]) + assert.Equal(t, float64(1), metricsMock.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) } func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { @@ -169,10 +174,9 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) + metricsMock := metrics.NewMockMetricsRegistry() handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, - clock, - metrics.NewMockMetricsRegistry(), + queries.NewWriteTableQueryMock, metricsMock, clock, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -181,6 +185,8 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { &wg, dbConnector, handler, + metricsMock, + clock, watchers.WithTickerProvider(tickerProvider), watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted), ) @@ -231,6 +237,8 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { assert.Equal(t, len(schedules), 2) assert.Equal(t, *schedules[0].NextLaunch, m[schedules[0].ID]) assert.Equal(t, *schedules[1].NextLaunch, m[schedules[1].ID]) + assert.Equal(t, float64(2), metricsMock.GetMetrics()["schedules_succeeded_count"]) + assert.Equal(t, float64(1), metricsMock.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) } func TestScheduleWatcherTwoBackups(t *testing.T) { @@ -286,10 +294,10 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) + metricsMock := metrics.NewMockMetricsRegistry() + handler := handlers.NewBackupScheduleHandler( - queries.NewWriteTableQueryMock, - clock, - metrics.NewMockMetricsRegistry(), + queries.NewWriteTableQueryMock, metricsMock, clock, ) scheduleWatcherActionCompleted := make(chan struct{}) @@ -298,6 +306,8 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { &wg, dbConnector, handler, + metricsMock, + clock, watchers.WithTickerProvider(tickerProvider), watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted), ) @@ -351,4 +361,118 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { assert.Equal(t, len(schedules), 2) assert.Equal(t, *schedules[0].NextLaunch, m[schedules[0].ID]) assert.Equal(t, *schedules[1].NextLaunch, m[schedules[1].ID]) + assert.Equal(t, float64(2), metricsMock.GetMetrics()["schedules_succeeded_count"]) + assert.Equal(t, float64(2), metricsMock.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) +} + +func TestAllScheduleMetrics(t *testing.T) { + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Prepare fake ticker + clock := clockwork.NewFakeClockAt(fourPM) + var fakeTicker *ticker.FakeTicker + tickerInitialized := make(chan struct{}) + tickerProvider := func(duration time.Duration) ticker.Ticker { + assert.Empty(t, fakeTicker, "ticker reuse") + fakeTicker = ticker.NewFakeTicker(duration) + tickerInitialized <- struct{}{} + return fakeTicker + } + + backupID := "1" + rp := fourPM.Add(time.Minute * 30) + nl := fourPM.Add(time.Hour) + clock.Advance(time.Hour + time.Minute) + s1 := types.BackupSchedule{ + ID: "1", + ContainerID: "abcde", + Status: types.BackupScheduleStateActive, + DatabaseName: "mydb", + DatabaseEndpoint: "mydb.valid.com", + SourcePaths: []string{"/path/to/table"}, + ScheduleSettings: &pb.BackupScheduleSettings{ + SchedulePattern: &pb.BackupSchedulePattern{Crontab: "* * * * *"}, //every minute + RecoveryPointObjective: durationpb.New(time.Hour), + }, + RecoveryPoint: &rp, + LastSuccessfulBackupID: &backupID, + NextLaunch: &nl, + } + + opMap := make(map[string]types.Operation) + backupMap := make(map[string]types.Backup) + scheduleMap := make(map[string]types.BackupSchedule) + scheduleMap[s1.ID] = s1 + dbConnector := db.NewMockDBConnector( + db.WithBackups(backupMap), + db.WithOperations(opMap), + db.WithBackupSchedules(scheduleMap), + ) + + metricsMock := metrics.NewMockMetricsRegistry() + + handler := handlers.NewBackupScheduleHandler( + queries.NewWriteTableQueryMock, metricsMock, clock, + ) + + scheduleWatcherActionCompleted := make(chan struct{}) + _ = NewScheduleWatcher( + ctx, + &wg, + dbConnector, + handler, + metricsMock, + clock, + watchers.WithTickerProvider(tickerProvider), + watchers.WithActionCompletedChannel(&scheduleWatcherActionCompleted), + ) + + // Wait for the ticker to be initialized + select { + case <-ctx.Done(): + t.Error("ticker not initialized") + case <-tickerInitialized: + assert.Equal(t, fakeTicker.Period, time.Minute, "incorrect period") + } + + fakeTicker.Send(clock.Now()) + + // Wait for the watcher action to be completed + select { + case <-ctx.Done(): + t.Error("action wasn't completed") + case <-scheduleWatcherActionCompleted: + cancel() + } + + wg.Wait() + + m := map[string]time.Time{ + "1": clock.Now().Add(time.Minute), + } + + // check operation status (should be pending) + ops, err := dbConnector.SelectOperations(ctx, &queries.ReadTableQueryImpl{}) + assert.Empty(t, err) + assert.NotEmpty(t, ops) + assert.Equal(t, len(ops), 1) + for _, op := range ops { + assert.Equal(t, types.OperationStateRunning, op.GetState()) + assert.Equal(t, types.OperationTypeTBWR, op.GetType()) + _, ok := m[*op.(*types.TakeBackupWithRetryOperation).ScheduleID] + assert.True(t, ok) + } + + // check schedule next launch + schedules, err := dbConnector.SelectBackupSchedules(ctx, &queries.ReadTableQueryImpl{}) + assert.Empty(t, err) + assert.NotEmpty(t, schedules) + assert.Equal(t, len(schedules), 1) + assert.Equal(t, m[schedules[0].ID], *schedules[0].NextLaunch) + assert.Equal(t, float64(1), metricsMock.GetMetrics()["schedules_succeeded_count"]) + assert.Equal(t, float64(1), metricsMock.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) + assert.Equal(t, float64(schedules[0].RecoveryPoint.Unix()), metricsMock.GetMetrics()["schedules_last_backup_timestamp"]) + assert.Equal(t, 0.5166666666666667, metricsMock.GetMetrics()["schedules_recovery_point_objective"]) }