Skip to content

Commit

Permalink
Add schedule metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Dec 4, 2024
1 parent 6738b02 commit ae95cff
Show file tree
Hide file tree
Showing 16 changed files with 369 additions and 105 deletions.
6 changes: 2 additions & 4 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,9 @@ func main() {
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)
xlog.Info(ctx, "Created TtlWatcher")

backupScheduleHandler := handlers.NewBackupScheduleHandler(
queries.NewWriteTableQuery, clockwork.NewRealClock(), metrics,
)
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, metrics, clockwork.NewRealClock())

schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler)
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler, metrics, clockwork.NewRealClock())

xlog.Info(ctx, "Created ScheduleWatcher")

Expand Down
20 changes: 10 additions & 10 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ func NewDBOperationHandler(
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon)
err := DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon)
if err == nil {
mon.ReportOperationMetrics(op)
}
return err
}
}

Expand All @@ -52,7 +56,7 @@ func DBOperationHandler(
return fmt.Errorf("can't cast operation to DeleteBackupOperation %s", types.OperationToString(operation))
}

upsertAndReportMetrics := func(operation types.Operation, backup *types.Backup) error {
executeUpsert := func(operation types.Operation, backup *types.Backup) error {
var err error

if backup != nil {
Expand All @@ -63,10 +67,6 @@ func DBOperationHandler(
err = db.UpdateOperation(ctx, operation)
}

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

Expand All @@ -90,7 +90,7 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Backup not found")
operation.GetAudit().CompletedAt = timestamppb.Now()
return upsertAndReportMetrics(operation, nil)
return executeUpsert(operation, nil)
}

backup := backups[0]
Expand All @@ -100,14 +100,14 @@ func DBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return upsertAndReportMetrics(operation, backup)
return executeUpsert(operation, backup)
}

if backup.Status != types.BackupStateDeleting {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Unexpected backup status: %s", backup.Status))
operation.GetAudit().CompletedAt = timestamppb.Now()
return upsertAndReportMetrics(operation, nil)
return executeUpsert(operation, nil)
}

deleteBackup := func(pathPrefix string, bucket string) error {
Expand Down Expand Up @@ -150,5 +150,5 @@ func DBOperationHandler(
return fmt.Errorf("unexpected operation state %s", dbOp.State)
}

return upsertAndReportMetrics(operation, backup)
return executeUpsert(operation, backup)
}
6 changes: 3 additions & 3 deletions internal/handlers/delete_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {
assert.Empty(t, objects)

// check metrics
assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"])
assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"])
}

func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
assert.Empty(t, objects)

// check metrics
assert.Equal(t, int64(450), mon.GetMetrics()["storage_bytes_deleted"])
assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"])
}

func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) {
Expand Down Expand Up @@ -383,5 +383,5 @@ func TestDBOperationHandlerDeleteMoreThanAllowedLimit(t *testing.T) {
assert.Empty(t, objects)

// check metrics
assert.Equal(t, int64(objectsListSize*10), mon.GetMetrics()["storage_bytes_deleted"])
assert.Equal(t, float64(objectsListSize*10), mon.GetMetrics()["storage_bytes_deleted"])
}
23 changes: 8 additions & 15 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ func NewRBOperationHandler(
db db.DBConnector, client client.ClientConnector, config config.Config, mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return RBOperationHandler(ctx, op, db, client, config, mon)
err := RBOperationHandler(ctx, op, db, client, config)
if err == nil {
mon.ReportOperationMetrics(op)
}
return err
}
}

Expand All @@ -30,7 +34,6 @@ func RBOperationHandler(
db db.DBConnector,
client client.ClientConnector,
config config.Config,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "RBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand All @@ -56,16 +59,6 @@ func RBOperationHandler(

defer func() { _ = client.Close(ctx, conn) }()

upsertAndReportMetrics := func(operation types.Operation) error {
err := db.UpdateOperation(ctx, operation)

if err == nil {
mon.ObserveOperationDuration(operation)
}

return err
}

ydbOpResponse, err := lookupYdbOperationStatus(
ctx, client, conn, operation, mr.YdbOperationId, mr.Audit.CreatedAt, config,
)
Expand All @@ -76,7 +69,7 @@ func RBOperationHandler(
operation.SetState(ydbOpResponse.opState)
operation.SetMessage(ydbOpResponse.opMessage)
operation.GetAudit().CompletedAt = timestamppb.Now()
return upsertAndReportMetrics(operation)
return db.UpdateOperation(ctx, operation)
}

if ydbOpResponse.opResponse == nil {
Expand Down Expand Up @@ -126,7 +119,7 @@ func RBOperationHandler(
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
operation.GetAudit().CompletedAt = timestamppb.Now()
return upsertAndReportMetrics(operation)
return db.UpdateOperation(ctx, operation)
}

return db.UpdateOperation(ctx, operation)
Expand Down Expand Up @@ -173,5 +166,5 @@ func RBOperationHandler(
}

operation.GetAudit().CompletedAt = timestamppb.Now()
return upsertAndReportMetrics(operation)
return db.UpdateOperation(ctx, operation)
}
17 changes: 9 additions & 8 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,33 @@ import (
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

type BackupScheduleHandlerType func(context.Context, db.DBConnector, types.BackupSchedule) error
type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.BackupSchedule) error

func NewBackupScheduleHandler(
queryBuilderFactory queries.WriteQueryBuilderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
clock clockwork.Clock,
) BackupScheduleHandlerType {
return func(ctx context.Context, driver db.DBConnector, schedule types.BackupSchedule) error {
return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, driver, schedule,
queryBuilderFactory, clock, mon,
queryBuilderFactory, mon, clock,
)
}
}

func BackupScheduleHandler(
ctx context.Context,
driver db.DBConnector,
schedule types.BackupSchedule,
schedule *types.BackupSchedule,
queryBuilderFactory queries.WriteQueryBuilderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
clock clockwork.Clock,
) error {
if schedule.Status != types.BackupScheduleStateActive {
xlog.Error(ctx, "backup schedule is not active", zap.String("scheduleID", schedule.ID))
return errors.New("backup schedule is not active")
}
// do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron.
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(clock.Now()) {
backoff, err := schedule.GetCronDuration()
if err != nil {
Expand Down Expand Up @@ -84,13 +83,15 @@ func BackupScheduleHandler(
zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()),
)

mon.IncScheduledBackupsCount(schedule)

err = schedule.UpdateNextLaunch(clock.Now())
if err != nil {
return err
}
return driver.ExecuteUpsert(
ctx,
queryBuilderFactory().WithCreateOperation(tbwr).WithUpdateBackupSchedule(schedule),
queryBuilderFactory().WithCreateOperation(tbwr).WithUpdateBackupSchedule(*schedule),
)
}
return nil
Expand Down
6 changes: 2 additions & 4 deletions internal/handlers/schedule_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ func TestBackupScheduleHandler(t *testing.T) {
)

handler := NewBackupScheduleHandler(
queries.NewWriteTableQueryMock,
clock,
metrics.NewMockMetricsRegistry(),
queries.NewWriteTableQueryMock, metrics.NewMockMetricsRegistry(), clock,
)
err := handler(ctx, dbConnector, schedule)
err := handler(ctx, dbConnector, &schedule)
assert.Empty(t, err)

// check operation status (should be running)
Expand Down
7 changes: 5 additions & 2 deletions internal/handlers/take_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ func NewTBOperationHandler(
queryBuilderFactory queries.WriteQueryBuilderFactory, mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory, mon)
err := TBOperationHandler(ctx, op, db, client, s3, config, queryBuilderFactory, mon)
if err == nil {
mon.ReportOperationMetrics(op)
}
return err
}
}

Expand Down Expand Up @@ -65,7 +69,6 @@ func TBOperationHandler(
)

if err == nil {
mon.ObserveOperationDuration(operation)
mon.IncCompletedBackupsCount(containerId, database, status)
}

Expand Down
7 changes: 5 additions & 2 deletions internal/handlers/take_backup_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ func NewTBWROperationHandler(
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock, mon)
err := TBWROperationHandler(ctx, op, db, client, s3, clientConfig, queryBuilderFactory, clock)
if err == nil {
mon.ReportOperationMetrics(op)
}
return err
}
}

Expand Down Expand Up @@ -148,7 +152,6 @@ func TBWROperationHandler(
clientConfig config.ClientConnectionConfig,
queryBuilderFactory queries.WriteQueryBuilderFactory,
clock clockwork.Clock,
mon metrics.MetricsRegistry,
) error {
ctx = xlog.With(ctx, zap.String("OperationID", operation.GetID()))

Expand Down
21 changes: 17 additions & 4 deletions internal/handlers/take_backup_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,15 @@ func TestTBWRHandlerSuccess(t *testing.T) {

clientConnector := client.NewMockClientConnector()

mon := metrics.NewMockMetricsRegistry()
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
config.S3Config{},
config.ClientConnectionConfig{},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
metrics.NewMockMetricsRegistry(),
mon,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand All @@ -253,6 +254,8 @@ func TestTBWRHandlerSuccess(t *testing.T) {
assert.Empty(t, err)
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStateDone, op.GetState())
assert.Equal(t, float64(1), mon.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(1), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
}

func TestTBWRHandlerSkipRunning(t *testing.T) {
Expand Down Expand Up @@ -299,14 +302,15 @@ func TestTBWRHandlerSkipRunning(t *testing.T) {

clientConnector := client.NewMockClientConnector()

mon := metrics.NewMockMetricsRegistry()
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
config.S3Config{},
config.ClientConnectionConfig{},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
metrics.NewMockMetricsRegistry(),
mon,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand All @@ -318,6 +322,8 @@ func TestTBWRHandlerSkipRunning(t *testing.T) {
operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery())
assert.Empty(t, err)
assert.Equal(t, 3, len(operations))
assert.Equal(t, float64(0), mon.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(0), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
}

func TestTBWRHandlerSkipError(t *testing.T) {
Expand Down Expand Up @@ -420,14 +426,15 @@ func TestTBWRHandlerError(t *testing.T) {

clientConnector := client.NewMockClientConnector()

mon := metrics.NewMockMetricsRegistry()
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
config.S3Config{},
config.ClientConnectionConfig{},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t2.AsTime()),
metrics.NewMockMetricsRegistry(),
mon,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand All @@ -437,6 +444,9 @@ func TestTBWRHandlerError(t *testing.T) {
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStateError, op.GetState())
assert.Equal(t, fmt.Sprintf("retry attempts exceeded limit: 1. Launched operations %s", ops[0].GetID()), op.GetMessage())
assert.Equal(t, float64(1), mon.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(1), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"])

}

func TestTBWRHandlerAlwaysRunOnce(t *testing.T) {
Expand Down Expand Up @@ -620,6 +630,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) {

clientConnector := client.NewMockClientConnector()

mon := metrics.NewMockMetricsRegistry()
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
Expand All @@ -632,7 +643,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) {
},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
metrics.NewMockMetricsRegistry(),
mon,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand All @@ -656,4 +667,6 @@ func TestTBWRHandlerFullCancel(t *testing.T) {
assert.Equal(t, types.OperationStateCancelled, tb.State)
assert.Equal(t, types.OperationStateCancelled, tbwr.State)
assert.Equal(t, "Success", tbwr.Message)
assert.Equal(t, float64(1), mon.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(1), mon.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
}
Loading

0 comments on commit ae95cff

Please sign in to comment.