diff --git a/cmd/integration/make_backup/main.go b/cmd/integration/make_backup/main.go index 7db401bf..ae3619c2 100644 --- a/cmd/integration/make_backup/main.go +++ b/cmd/integration/make_backup/main.go @@ -2,6 +2,12 @@ package main import ( "context" + "errors" + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/balancers" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" @@ -9,20 +15,124 @@ import ( "strings" "time" "ydbcp/cmd/integration/common" - "ydbcp/internal/types" + "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "google.golang.org/grpc" ) const ( - containerID = "abcde" - databaseName = "/local" - ydbcpEndpoint = "0.0.0.0:50051" - databaseEndpoint = "grpcs://local-ydb:2135" + containerID = "abcde" + databaseName = "/local" + ydbcpEndpoint = "0.0.0.0:50051" + databaseEndpoint = "grpcs://local-ydb:2135" + invalidDatabaseEndpoint = "xzche" ) +func OpenYdb() *ydb.Driver { + dialTimeout := time.Second * 5 + opts := []ydb.Option{ + ydb.WithDialTimeout(dialTimeout), + ydb.WithTLSSInsecureSkipVerify(), + ydb.WithBalancer(balancers.SingleConn()), + ydb.WithAnonymousCredentials(), + } + driver, err := ydb.Open(context.Background(), databaseEndpoint+"/"+databaseName, opts...) + if err != nil { + log.Panicf("failed to open database: %v", err) + } + return driver +} + +func TestInvalidDatabaseBackup(client pb.BackupServiceClient, opClient pb.OperationServiceClient) { + driver := OpenYdb() + opID := types.GenerateObjectID() + insertTBWRquery := fmt.Sprintf(` +UPSERT INTO Operations +(id, type, container_id, database, endpoint, created_at, status, retries, retries_count) +VALUES +("%s", "TBWR", "%s", "%s", "%s", CurrentUTCTimestamp(), "RUNNING", 0, 3) +`, opID, containerID, databaseName, invalidDatabaseEndpoint) + err := driver.Table().Do( + context.Background(), func(ctx context.Context, s table.Session) error { + _, res, err := s.Execute( + ctx, + table.TxControl( + table.BeginTx( + table.WithSerializableReadWrite(), + ), + table.CommitTx(), + ), + insertTBWRquery, + nil, + ) + if err != nil { + return err + } + defer func(res result.Result) { + err = res.Close() + if err != nil { + xlog.Error(ctx, "Error closing transaction result") + } + }(res) // result must be closed + if res.ResultSetCount() != 0 { + return errors.New("expected 0 result set") + } + return res.Err() + }, + ) + if err != nil { + log.Panicf("failed to initialize YDBCP db: %v", err) + } + op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{ + Id: opID, + }) + if err != nil { + log.Panicf("failed to get operation: %v", err) + } + if op.GetType() != types.OperationTypeTBWR.String() { + log.Panicf("unexpected operation type: %v", op.GetType()) + } + time.Sleep(time.Second * 10) // to wait for four operation handlers + + backups, err := client.ListBackups( + context.Background(), &pb.ListBackupsRequest{ + ContainerId: containerID, + DatabaseNameMask: "%", + }, + ) + if err != nil { + log.Panicf("failed to list backups: %v", err) + } + if len(backups.Backups) != 0 { + log.Panicf("expected no backups by this time, got %v", backups.Backups) + } + ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{ + ContainerId: containerID, + DatabaseNameMask: databaseName, + OperationTypes: []string{types.OperationTypeTB.String()}, + }) + if err != nil { + log.Panicf("failed to list operations: %v", err) + } + if len(ops.Operations) != 0 { + log.Panicf("expected zero TB operations, got %d", len(ops.Operations)) + } + tbwr, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{ + Id: opID, + }) + if err != nil { + log.Panicf("failed to list operations: %v", err) + } + if tbwr.Status != pb.Operation_ERROR { + log.Panicf("unexpected operation status: %v", tbwr.Status) + } + if tbwr.Message != "retry attempts exceeded limit: 3." { + log.Panicf("unexpected operation message: %v", tbwr.Message) + } +} + func main() { conn := common.CreateGRPCClient(ydbcpEndpoint) defer func(conn *grpc.ClientConn) { @@ -63,6 +173,8 @@ func main() { log.Panicf("unexpected error code: %v", err) } + TestInvalidDatabaseBackup(client, opClient) + tbwr, err := client.MakeBackup( context.Background(), &pb.MakeBackupRequest{ ContainerId: containerID, @@ -84,7 +196,7 @@ func main() { if op.GetType() != types.OperationTypeTBWR.String() { log.Panicf("unexpected operation type: %v", op.GetType()) } - time.Sleep(time.Second * 11) // to wait for operation handler + time.Sleep(time.Second * 3) // to wait for operation handler backups, err = client.ListBackups( context.Background(), &pb.ListBackupsRequest{ ContainerId: containerID, @@ -133,7 +245,7 @@ func main() { if !done { log.Panicln("failed to complete a backup in 30 seconds") } - time.Sleep(time.Second * 11) // to wait for operation handler + time.Sleep(time.Second * 3) // to wait for operation handler tbwr, err = opClient.GetOperation(context.Background(), &pb.GetOperationRequest{ Id: op.Id, }) diff --git a/cmd/integration/orm/main.go b/cmd/integration/orm/main.go index 0254b297..118dadb3 100644 --- a/cmd/integration/orm/main.go +++ b/cmd/integration/orm/main.go @@ -55,6 +55,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation { }, ScheduleID: &schedule, Ttl: &ttl, + Retries: 0, RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 5}}, }, { @@ -78,6 +79,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation { }, ScheduleID: &schedule, Ttl: &ttl, + Retries: 0, RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(ttl)}}, }, } @@ -119,6 +121,7 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati log.Panicf("operation %v corrupted after update and read\ngot %v", operation, *tbwr) } operation.Message = "xxx" + operation.IncRetries() err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation)) if err != nil { log.Panicf("failed to insert operation: %v", err) @@ -127,6 +130,9 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati if "xxx" != tbwr.Message { log.Panicf("operation %v did not change after update", *tbwr) } + if tbwr.Retries != 1 { + log.Panicf("operation %v did not update retries", *tbwr) + } } func main() { diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 7dad3ccd..f9d55537 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -179,7 +179,7 @@ func main() { os.Exit(1) } - processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry) + processor.NewOperationProcessor(ctx, &wg, configInstance.ProcessorIntervalSeconds, dbConnector, handlersRegistry) xlog.Info(ctx, "Initialized OperationProcessor") ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery) xlog.Info(ctx, "Created TtlWatcher") diff --git a/internal/backup_operations/make_backup.go b/internal/backup_operations/make_backup.go index 91bb2a54..2a2a41bd 100644 --- a/internal/backup_operations/make_backup.go +++ b/internal/backup_operations/make_backup.go @@ -2,6 +2,8 @@ package backup_operations import ( "context" + "errors" + "fmt" "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-sdk/v3" "path" @@ -137,11 +139,48 @@ func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, cli if len(pathsForExport) == 0 { xlog.Error(ctx, "empty list of paths for export") - return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export") + return nil, NewEmptyDatabaseError(codes.FailedPrecondition, "empty list of paths for export") } return pathsForExport, nil } +type ClientConnectionError struct { + err error +} + +func NewClientConnectionError(code codes.Code, message string) *ClientConnectionError { + return &ClientConnectionError{err: status.Errorf(code, message)} +} + +func (e ClientConnectionError) Error() string { + return e.err.Error() +} + +type EmptyDatabaseError struct { + err error +} + +func (e EmptyDatabaseError) Error() string { + return e.err.Error() +} + +func NewEmptyDatabaseError(code codes.Code, message string) *EmptyDatabaseError { + return &EmptyDatabaseError{err: status.Errorf(code, message)} +} + +func ErrToStatus(err error) error { + var ce *ClientConnectionError + var ee *EmptyDatabaseError + + if errors.As(err, &ce) { + return ce.err + } + if errors.As(err, &ee) { + return ee.err + } + return err +} + func MakeBackup( ctx context.Context, clientConn client.ClientConnector, @@ -161,9 +200,9 @@ func MakeBackup( "endpoint of database is invalid or not allowed", zap.String("DatabaseEndpoint", req.DatabaseEndpoint), ) - return nil, nil, status.Errorf( - codes.InvalidArgument, - "endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint, + return nil, nil, NewClientConnectionError( + codes.FailedPrecondition, + fmt.Sprintf("endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint), ) } @@ -176,7 +215,7 @@ func MakeBackup( client, err := clientConn.Open(ctx, dsn) if err != nil { xlog.Error(ctx, "can't open client connection", zap.Error(err)) - return nil, nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn) + return nil, nil, NewClientConnectionError(codes.Unknown, fmt.Sprintf("can't open client connection, dsn %s", dsn)) } defer func() { if err := clientConn.Close(ctx, client); err != nil { diff --git a/internal/config/config.go b/internal/config/config.go index 4d2b61cf..5e7a2805 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -64,14 +64,15 @@ type MetricsServerConfig struct { } type Config struct { - DBConnection YDBConnectionConfig `yaml:"db_connection"` - ClientConnection ClientConnectionConfig `yaml:"client_connection"` - S3 S3Config `yaml:"s3"` - OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"` - Auth AuthConfig `yaml:"auth"` - GRPCServer GRPCServerConfig `yaml:"grpc_server"` - MetricsServer MetricsServerConfig `yaml:"metrics_server"` - SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"` + DBConnection YDBConnectionConfig `yaml:"db_connection"` + ClientConnection ClientConnectionConfig `yaml:"client_connection"` + S3 S3Config `yaml:"s3"` + OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"` + Auth AuthConfig `yaml:"auth"` + GRPCServer GRPCServerConfig `yaml:"grpc_server"` + MetricsServer MetricsServerConfig `yaml:"metrics_server"` + SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"` + ProcessorIntervalSeconds int64 `yaml:"processor_interval_seconds" default:"10"` } var ( diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 112965cd..43ebc895 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -247,9 +247,11 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries. defer c.guard.Unlock() queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock) - if queryBuilderMock.Operation != nil { - c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation - metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(*queryBuilderMock.Operation) + if queryBuilderMock.Operations != nil { + for _, op := range queryBuilderMock.Operations { + c.operations[op.GetID()] = op + metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(op) + } } if queryBuilderMock.Backup != nil { c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 37e2652c..90c6184e 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -152,6 +152,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { parentOperationID *string scheduleID *string ttl *time.Duration + retries *uint32 retriesCount *uint32 maxBackoff *time.Duration ) @@ -176,6 +177,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { named.Optional("parent_operation_id", &parentOperationID), named.Optional("schedule_id", &scheduleID), named.Optional("ttl", &ttl), + named.Optional("retries", &retries), named.Optional("retries_count", &retriesCount), named.Optional("retries_max_backoff", &maxBackoff), ) @@ -277,6 +279,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { Retries: &pb.RetryConfig_Count{Count: *retriesCount}, } } + retryNum := 0 + if retries != nil { + retryNum = int(*retries) + } return &types.TakeBackupWithRetryOperation{ TakeBackupOperation: types.TakeBackupOperation{ ID: operationId, @@ -294,6 +300,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { }, ScheduleID: scheduleID, Ttl: ttl, + Retries: retryNum, RetryConfig: retryConfig, }, nil } diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 62adaf8f..cd5dd942 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -155,6 +155,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle table_types.StringValueFromString(strings.Join(tbwr.SourcePathsToExclude, ",")), ) } + d.AddValueParam("$retries", table_types.Uint32Value(uint32(tbwr.Retries))) if tbwr.RetryConfig != nil { switch r := tbwr.RetryConfig.Retries.(type) { case *pb.RetryConfig_Count: @@ -270,6 +271,9 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()), ) } + if tbwr, ok := operation.(*types.TakeBackupWithRetryOperation); ok { + d.AddValueParam("$retries", table_types.Uint32Value(uint32(tbwr.Retries))) + } return d } diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go index 2797e374..f4b390e7 100644 --- a/internal/connectors/db/yql/queries/write_mock.go +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -7,7 +7,7 @@ import ( ) type WriteTableQueryMock struct { - Operation *types.Operation + Operations []types.Operation Backup *types.Backup BackupSchedule *types.BackupSchedule } @@ -19,10 +19,10 @@ func NewWriteTableQueryMock() WriteTableQuery { } func (w *WriteTableQueryMock) GetOperations() []types.Operation { - if w.Operation == nil { + if w.Operations == nil { return nil } else { - return []types.Operation{*w.Operation} + return w.Operations } } @@ -36,7 +36,7 @@ func (w *WriteTableQueryMock) WithCreateBackup(backup types.Backup) WriteTableQu } func (w *WriteTableQueryMock) WithCreateOperation(operation types.Operation) WriteTableQuery { - w.Operation = &operation + w.Operations = append(w.Operations, operation) return w } @@ -51,7 +51,7 @@ func (w *WriteTableQueryMock) WithUpdateBackup(backup types.Backup) WriteTableQu } func (w *WriteTableQueryMock) WithUpdateOperation(operation types.Operation) WriteTableQuery { - w.Operation = &operation + w.Operations = append(w.Operations, operation) return w } diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index 416ea3bc..e9c04a76 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -58,6 +58,7 @@ CREATE TABLE Operations ( --used only in TBWR schedule_id String, ttl Interval, + retries Uint32, retries_count Uint32, retries_max_backoff Interval, diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index 08a37b66..a96587c6 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -73,21 +73,22 @@ func exp(p int) time.Duration { return time.Duration(math.Pow(BACKOFF_EXP, float64(p))) } -func shouldRetry(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clock clockwork.Clock) *time.Time { +func shouldRetry(config *pb.RetryConfig, count int, firstStart time.Time, lastEnd *time.Time, clock clockwork.Clock) *time.Time { if config == nil { + if count == 0 { + t := clock.Now() + return &t + } return nil } - ops := len(tbOps) - lastEnd := tbOps[ops-1].Audit.CompletedAt.AsTime() - firstStart := tbOps[0].Audit.CreatedAt.AsTime() - if ops == INTERNAL_MAX_RETRIES { + if count == INTERNAL_MAX_RETRIES { return nil } switch r := config.Retries.(type) { case *pb.RetryConfig_Count: { - if int(r.Count) == ops { + if r.Count == uint32(count) { return nil } } @@ -101,31 +102,43 @@ func shouldRetry(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clo return nil } - res := lastEnd.Add(exp(ops) * MIN_BACKOFF) - return &res + var t time.Time + if lastEnd != nil { + t = *lastEnd + t = t.Add(exp(count) * MIN_BACKOFF) + } else { + t = clock.Now() + } + return &t } -func HandleTbOps(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clock clockwork.Clock) (RetryDecision, error) { - //select last tbOp. - //if nothing, run new, skip +func MakeRetryDecision(ctx context.Context, tbwr *types.TakeBackupWithRetryOperation, tbOp *types.TakeBackupOperation, clock clockwork.Clock) (RetryDecision, error) { + //retrieve last tbOp run time //if there is a tbOp, check its status //if success: set success to itself //if cancelled: set error to itself - //if error: - //if we can retry it: retry, skip + //if error (or no tbOp): + //if we can retry: retry, skip //if no more retries: set error to itself - if len(tbOps) == 0 { - return RunNewTb, nil + lastState := types.OperationStateError //if no tbOp, same logic applies as after error + var lastTime time.Time + if tbOp != nil { + lastState = tbOp.State + lastTime = tbOp.Audit.CompletedAt.AsTime() } - last := tbOps[len(tbOps)-1] - switch last.State { + switch lastState { case types.OperationStateDone: return Success, nil case types.OperationStateError: { - t := shouldRetry(config, tbOps, clock) + //t := shouldRetryWithOps(ctx, tbwr.Config, tbOps, clock) + if tbwr.Audit == nil { + xlog.Error(ctx, "no audit for tbwr operation, unable to calculate retry need") + return Skip, fmt.Errorf("no audit for OperationID: %s", tbwr.ID) + } + t := shouldRetry(tbwr.RetryConfig, tbwr.Retries, tbwr.Audit.CreatedAt.AsTime(), &lastTime, clock) if t != nil { - if clock.Now().After(*t) { + if clock.Now().Compare(*t) >= 0 { return RunNewTb, nil } else { return Skip, nil @@ -142,6 +155,35 @@ func HandleTbOps(config *pb.RetryConfig, tbOps []*types.TakeBackupOperation, clo return Error, errors.New("unexpected tb op status") } +func setErrorToRetryOperation( + ctx context.Context, + tbwr *types.TakeBackupWithRetryOperation, + ops []types.Operation, + clock clockwork.Clock, +) { + operationIDs := strings.Join(func() []string { + var ids []string + for _, item := range ops { + ids = append(ids, item.GetID()) + } + return ids + }(), ", ") + tbwr.State = types.OperationStateError + now := clock.Now() + tbwr.UpdatedAt = timestamppb.New(now) + tbwr.Audit.CompletedAt = timestamppb.New(now) + + tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", tbwr.Retries) + fields := []zap.Field{ + zap.Int("RetriesCount", len(ops)), + } + if len(ops) > 0 { + tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs) + fields = append(fields, zap.String("OperationIDs", operationIDs)) + } + xlog.Error(ctx, "retry attempts exceeded limit for TBWR operation", fields...) +} + func TBWROperationHandler( ctx context.Context, operation types.Operation, @@ -169,14 +211,20 @@ func TBWROperationHandler( Field: "parent_operation_id", Values: []table_types.Value{table_types.StringValueFromString(tbwr.ID)}, }), - queries.WithOrderBy(queries.OrderSpec{ + queries.WithOrderBy(queries.OrderSpec{ //we need only last tbOp now, because retry count is saved in operation itself Field: "created_at", }), )) - tbOps := make([]*types.TakeBackupOperation, len(ops)) + var lastTbOp *types.TakeBackupOperation + var existingTbOps = 0 for i := range ops { - tbOps[i] = ops[i].(*types.TakeBackupOperation) + existingTbOps++ + lastTbOp = ops[i].(*types.TakeBackupOperation) } + //to update retry count for every operation that was launched before introducing + //persistent retry counter + tbwr.Retries = max(tbwr.Retries, existingTbOps) + if err != nil { return fmt.Errorf("can't select Operations for TBWR op %s", tbwr.ID) } @@ -184,7 +232,7 @@ func TBWROperationHandler( switch tbwr.State { case types.OperationStateRunning: { - do, err := HandleTbOps(tbwr.RetryConfig, tbOps, clock) + do, err := MakeRetryDecision(ctx, tbwr, lastTbOp, clock) if err != nil { tbwr.State = types.OperationStateError tbwr.Message = err.Error() @@ -198,19 +246,18 @@ func TBWROperationHandler( } return err } - if do != Error { - fields := []zap.Field{ - zap.String("decision", do.String()), - } - if len(ops) > 0 { - fields = append(fields, zap.String("TBOperationID", ops[len(ops)-1].GetID())) - } - xlog.Info( - ctx, - "TBWROperationHandler", - fields..., - ) + fields := []zap.Field{ + zap.String("decision", do.String()), } + if do != Error && len(ops) > 0 { + fields = append(fields, zap.String("TBOperationID", ops[len(ops)-1].GetID())) + } + xlog.Info( + ctx, + "TBWROperationHandler", + fields..., + ) + switch do { case Success: { @@ -225,27 +272,8 @@ func TBWROperationHandler( return nil case Error: { - operationIDs := strings.Join(func() []string { - var ids []string - for _, item := range ops { - ids = append(ids, item.GetID()) - } - return ids - }(), ", ") - tbwr.State = types.OperationStateError - now := clock.Now() - tbwr.UpdatedAt = timestamppb.New(now) - tbwr.Audit.CompletedAt = timestamppb.New(now) - - tbwr.Message = fmt.Sprintf("retry attempts exceeded limit: %d.", len(ops)) - fields := []zap.Field{ - zap.Int("RetriesCount", len(ops)), - } - if len(ops) > 0 { - tbwr.Message = tbwr.Message + fmt.Sprintf(" Launched operations %s", operationIDs) - fields = append(fields, zap.String("OperationIDs", operationIDs)) - } - xlog.Error(ctx, "retry attempts exceeded limit for TBWR operation", fields...) + setErrorToRetryOperation(ctx, tbwr, ops, clock) + xlog.Info(ctx, tbwr.Proto().String()) return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) } case RunNewTb: @@ -261,10 +289,24 @@ func TBWROperationHandler( clock, ) if err != nil { + var empty *backup_operations.EmptyDatabaseError + var connErr *backup_operations.ClientConnectionError + + if errors.As(err, &connErr) { + tbwr.IncRetries() + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + } + if errors.As(err, &empty) { + setErrorToRetryOperation(ctx, tbwr, ops, clock) + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) + } + return err + } else { + xlog.Debug(ctx, "running new TB", zap.String("TBOperationID", tb.ID)) + tbwr.IncRetries() + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithCreateBackup(*backup).WithCreateOperation(tb).WithUpdateOperation(tbwr)) } - xlog.Debug(ctx, "running new TB", zap.String("TBOperationID", tb.ID)) - return db.ExecuteUpsert(ctx, queryBuilderFactory().WithCreateBackup(*backup).WithCreateOperation(tb)) } default: tbwr.State = types.OperationStateError @@ -283,11 +325,7 @@ func TBWROperationHandler( //if cancelled, set cancelled to itself { xlog.Info(ctx, "cancelling TBWR operation") - var last *types.TakeBackupOperation - if len(tbOps) > 0 { - last = tbOps[len(tbOps)-1] - } - if last == nil || !types.IsActive(last) { + if lastTbOp == nil || !types.IsActive(lastTbOp) { tbwr.State = types.OperationStateCancelled tbwr.Message = "Success" now := clock.Now() @@ -295,12 +333,12 @@ func TBWROperationHandler( tbwr.Audit.CompletedAt = timestamppb.New(now) return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(tbwr)) } else { - if last.State == types.OperationStatePending || last.State == types.OperationStateRunning { - xlog.Info(ctx, "cancelling TB operation", zap.String("TBOperationID", last.ID)) - last.State = types.OperationStateStartCancelling - last.Message = "Cancelling by parent operation" - last.UpdatedAt = timestamppb.New(clock.Now()) - return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(last)) + if lastTbOp.State == types.OperationStatePending || lastTbOp.State == types.OperationStateRunning { + xlog.Info(ctx, "cancelling TB operation", zap.String("TBOperationID", lastTbOp.ID)) + lastTbOp.State = types.OperationStateStartCancelling + lastTbOp.Message = "Cancelling by parent operation" + lastTbOp.UpdatedAt = timestamppb.New(clock.Now()) + return db.ExecuteUpsert(ctx, queryBuilderFactory().WithUpdateOperation(lastTbOp)) } } } diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index 7d11690f..347c5781 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -20,168 +20,279 @@ import ( ) var ( - t1 = timestamppb.New(time.Date(2024, 01, 01, 0, 0, 0, 0, time.UTC)) - t2 = timestamppb.New(time.Date(2024, 01, 01, 1, 0, 0, 0, time.UTC)) - t3 = timestamppb.New(time.Date(2024, 01, 01, 2, 0, 0, 0, time.UTC)) - t4 = timestamppb.New(time.Date(2024, 01, 01, 3, 0, 0, 0, time.UTC)) - r1 = t3.AsTime().Add(exp(2) * time.Minute) - r2 = t4.AsTime().Add(exp(3) * time.Minute) + t1 = timestamppb.New(time.Date(2024, 01, 01, 0, 0, 0, 0, time.UTC)) + t2 = timestamppb.New(time.Date(2024, 01, 01, 1, 0, 0, 0, time.UTC)) + t3 = timestamppb.New(time.Date(2024, 01, 01, 2, 0, 0, 0, time.UTC)) + t4 = timestamppb.New(time.Date(2024, 01, 01, 3, 0, 0, 0, time.UTC)) + tt2 = t2.AsTime() + tt3 = t3.AsTime() + tt4 = t4.AsTime() + r1 = t3.AsTime().Add(exp(2) * time.Minute) + r2 = t4.AsTime().Add(exp(3) * time.Minute) ) -func TestRetryLogic(t *testing.T) { +func TestRetryWithOpsLogic(t *testing.T) { for _, tt := range []struct { config *pb.RetryConfig - ops []*types.TakeBackupOperation + tbwr *types.TakeBackupWithRetryOperation + tb *types.TakeBackupOperation res *time.Time clockTime time.Time + decision RetryDecision }{ - { - config: nil, - ops: []*types.TakeBackupOperation{ - { - Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, - }, - }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t2, - CompletedAt: t3, - }, + { //00 + tbwr: &types.TakeBackupWithRetryOperation{}, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, }, }, res: nil, }, - { - config: &pb.RetryConfig{}, - ops: []*types.TakeBackupOperation{ - { - Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, - }, - }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t2, - CompletedAt: t3, - }, + { //01 + tbwr: &types.TakeBackupWithRetryOperation{ + Retries: 0, + RetryConfig: &pb.RetryConfig{}, + }, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, }, }, res: nil, }, - { - config: &pb.RetryConfig{ - Retries: &pb.RetryConfig_Count{Count: 2}, - }, - ops: []*types.TakeBackupOperation{ - { + { //02 + tbwr: &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, + CreatedAt: t1, }, }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t2, - CompletedAt: t3, - }, + Retries: 2, + RetryConfig: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 2}, + }, + }, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, }, }, res: nil, }, - { - config: &pb.RetryConfig{ - Retries: &pb.RetryConfig_Count{Count: 3}, - }, - ops: []*types.TakeBackupOperation{ - { + { //03 + tbwr: &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, + CreatedAt: t1, }, }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t2, - CompletedAt: t3, - }, + Retries: 2, + RetryConfig: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 3}, }, }, - res: &r1, - }, - { - config: &pb.RetryConfig{ - Retries: &pb.RetryConfig_Count{Count: 4}, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t2, + CompletedAt: t3, + }, }, - ops: []*types.TakeBackupOperation{ - { + clockTime: tt3.Add(time.Minute * 3), + res: &r1, + }, + { //04 + tbwr: &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, + CreatedAt: t1, }, }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t2, - CompletedAt: t3, - }, + Retries: 3, + RetryConfig: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 4}, }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t3, - CompletedAt: t4, - }, + }, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t3, + CompletedAt: t4, }, }, - res: &r2, + clockTime: tt4.Add(time.Minute * 10), + res: &r2, }, - { - config: &pb.RetryConfig{ - Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour)}, - }, - ops: []*types.TakeBackupOperation{ - { + { //05 + tbwr: &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, + CreatedAt: t1, }, }, + Retries: 5, + RetryConfig: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour)}, + }, + }, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + CompletedAt: t2, + }, }, clockTime: t4.AsTime(), res: nil, }, - { - config: &pb.RetryConfig{ - Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour * 24)}, - }, - ops: []*types.TakeBackupOperation{ - { + { //06 + tbwr: &types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ Audit: &pb.AuditInfo{ - CreatedAt: t1, - CompletedAt: t2, + CreatedAt: t1, }, }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t2, - CompletedAt: t3, - }, + Retries: 3, + RetryConfig: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour * 24)}, }, - { - Audit: &pb.AuditInfo{ - CreatedAt: t3, - CompletedAt: t4, - }, + }, + tb: &types.TakeBackupOperation{ + State: types.OperationStateError, + Audit: &pb.AuditInfo{ + CreatedAt: t3, + CompletedAt: t4, }, }, - clockTime: t4.AsTime().Add(time.Second), + clockTime: t4.AsTime().Add(time.Minute * 5), res: &r2, }, } { t.Run("", func(t *testing.T) { - require.Equal(t, tt.res, shouldRetry(tt.config, tt.ops, clockwork.NewFakeClockAt(tt.clockTime))) + decision, err := MakeRetryDecision(context.TODO(), tt.tbwr, tt.tb, clockwork.NewFakeClockAt(tt.clockTime)) + if err != nil { + require.Nil(t, tt.tbwr.Audit) + } else { + if decision == RunNewTb { + require.NotNil(t, tt.res) + cat := tt.tb.Audit.CompletedAt.AsTime() + r := shouldRetry( + tt.tbwr.RetryConfig, + tt.tbwr.Retries, + tt.tbwr.Audit.CreatedAt.AsTime(), + &cat, + clockwork.NewFakeClockAt(tt.clockTime), + ) + require.Equal(t, r, tt.res) + } else { + require.Equal(t, decision, Error) + require.Nil(t, tt.res) + } + } + }) + } +} + +func TestRetryLogic(t *testing.T) { + for _, tt := range []struct { + config *pb.RetryConfig + retries int + firstStart time.Time + lastEnd *time.Time + res *time.Time + clockTime time.Time + }{ + { //00 + firstStart: t1.AsTime(), + clockTime: tt2, + res: &tt2, + }, + { //01 + config: &pb.RetryConfig{}, //faulty config + firstStart: t1.AsTime(), + clockTime: tt2, + res: nil, + }, + { //02 + config: &pb.RetryConfig{}, + firstStart: t1.AsTime(), + retries: 2, + res: nil, + }, + { //03 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 2}, + }, + firstStart: t1.AsTime(), + clockTime: t2.AsTime(), + res: &tt2, + }, + { //04 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 2}, + }, + firstStart: t1.AsTime(), + + retries: 2, + }, + { //05 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 3}, + }, + retries: 2, + firstStart: t1.AsTime(), + lastEnd: &tt3, + res: &r1, + }, + { //06 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_Count{Count: 4}, + }, + firstStart: t1.AsTime(), + lastEnd: &tt4, + retries: 3, + res: &r2, + }, + { //07 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour)}, + }, + firstStart: t1.AsTime(), + lastEnd: &tt2, + retries: 0, + clockTime: t4.AsTime(), + res: nil, + }, + { //08 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour * 24)}, + }, + firstStart: t1.AsTime(), + lastEnd: &tt4, + retries: 3, + clockTime: t4.AsTime().Add(time.Second), + res: &r2, + }, + { //09 + config: &pb.RetryConfig{ + Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(time.Hour * 24)}, + }, + firstStart: t1.AsTime(), + lastEnd: &tt4, + retries: 3, + clockTime: t4.AsTime().Add(time.Second), + res: &r2, + }, + } { + t.Run("", func(t *testing.T) { + require.Equal(t, tt.res, shouldRetry(tt.config, tt.retries, tt.firstStart, tt.lastEnd, clockwork.NewFakeClockAt(tt.clockTime))) }) } } @@ -446,6 +557,7 @@ func TestTBWRHandlerError(t *testing.T) { } func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { + metrics.InitializeMockMetricsRegistry() ctx := context.Background() tbwrID := types.GenerateObjectID() tbwr := types.TakeBackupWithRetryOperation{ @@ -459,7 +571,9 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { Endpoint: "i.valid.com", DatabaseName: "/mydb", }, - Audit: &pb.AuditInfo{}, + Audit: &pb.AuditInfo{ + CreatedAt: t1, + }, }, RetryConfig: nil, } @@ -511,6 +625,119 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { assert.Equal(t, t1, tb.Audit.CreatedAt) } +func TestTBWRHandlerInvalidEndpointRetry(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + SourcePaths: []string{"path"}, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: "invalid.com", + DatabaseName: "/mydb", + }, + Audit: &pb.AuditInfo{}, + }, + RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 3}}, + } + + ops := []types.Operation{ + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{ + IsMock: true, + }, + config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateRunning, op.GetState()) + operations, err := dbConnector.SelectOperations(ctx, queries.NewReadTableQuery()) + assert.Empty(t, err) + assert.Equal(t, 1, len(operations)) + tbwr = *op.(*types.TakeBackupWithRetryOperation) + assert.Equal(t, 1, tbwr.Retries) +} + +func TestTBWRHandlerInvalidEndpointError(t *testing.T) { + ctx := context.Background() + tbwrID := types.GenerateObjectID() + tbwr := types.TakeBackupWithRetryOperation{ + TakeBackupOperation: types.TakeBackupOperation{ + ID: tbwrID, + ContainerID: "abcde", + State: types.OperationStateRunning, + Message: "", + SourcePaths: []string{"path"}, + YdbConnectionParams: types.YdbConnectionParams{ + Endpoint: "invalid.com", + DatabaseName: "/mydb", + }, + Audit: &pb.AuditInfo{}, + }, + RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 1}}, + } + tbwr.IncRetries() + + ops := []types.Operation{ + &tbwr, + } + + dbConnector := db.NewMockDBConnector( + db.WithOperations(toMap(ops...)), + ) + dbConnector.SetOperationsIDSelector([]string{}) + + clientConnector := client.NewMockClientConnector() + + handler := NewTBWROperationHandler( + dbConnector, + clientConnector, + config.S3Config{ + IsMock: true, + }, + config.ClientConnectionConfig{ + AllowedEndpointDomains: []string{".valid.com"}, + AllowInsecureEndpoint: true, + }, + queries.NewWriteTableQueryMock, + clockwork.NewFakeClockAt(t1.AsTime()), + ) + err := handler(ctx, &tbwr) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, tbwrID) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + tbwr = *op.(*types.TakeBackupWithRetryOperation) + assert.Equal(t, 1, tbwr.Retries) + assert.Equal(t, "retry attempts exceeded limit: 0.", tbwr.Message) +} + func TestTBWRHandlerStartCancel(t *testing.T) { ctx := context.Background() tbwrID := types.GenerateObjectID() diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 4b16282e..9e726f5c 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -54,13 +54,14 @@ func WithHandleOperationTimeout(timeout time.Duration) Option { func NewOperationProcessor( ctx context.Context, wg *sync.WaitGroup, + interval int64, db db.DBConnector, handlers OperationHandlerRegistry, options ...Option, ) *OperationProcessorImpl { op := &OperationProcessorImpl{ ctx: ctx, - period: time.Second * 10, + period: time.Second * time.Duration(interval), handleOperationTimeout: time.Second * 600, handlers: handlers, db: db, diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index 8a221286..fb3f3445 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -55,6 +55,7 @@ func TestProcessor(t *testing.T) { _ = NewOperationProcessor( ctx, &wg, + 10, db, handlers, WithTickerProvider(tickerProvider), diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 076ecde6..bddf9c33 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -120,6 +120,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques }, UpdatedAt: now, }, + Retries: 0, RetryConfig: &pb.RetryConfig{ Retries: &pb.RetryConfig_Count{Count: 3}, }, @@ -130,8 +131,9 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques _, err = backup_operations.OpenConnAndValidateSourcePaths(ctx, backup_operations.FromTBWROperation(tbwr), s.clientConn) if err != nil { - s.IncApiCallsCounter(methodName, status.Code(err)) - return nil, err + grpcError := backup_operations.ErrToStatus(err) + s.IncApiCallsCounter(methodName, status.Code(grpcError)) + return nil, grpcError } err = s.driver.ExecuteUpsert( diff --git a/internal/types/operation.go b/internal/types/operation.go index f65992e7..2f824fb8 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -274,6 +274,7 @@ type TakeBackupWithRetryOperation struct { TakeBackupOperation ScheduleID *string Ttl *time.Duration + Retries int RetryConfig *pb.RetryConfig } @@ -340,6 +341,10 @@ func (o *TakeBackupWithRetryOperation) Proto() *pb.Operation { } } +func (o *TakeBackupWithRetryOperation) IncRetries() { + o.Retries++ +} + func (o *TakeBackupWithRetryOperation) SpawnNewTBOperation(backupID string, subject string, ydbOperationId string) TakeBackupOperation { return TakeBackupOperation{ ID: GenerateObjectID(), diff --git a/local_config.yaml b/local_config.yaml index dceca100..05821f78 100644 --- a/local_config.yaml +++ b/local_config.yaml @@ -29,4 +29,5 @@ metrics_server: bind_address: 127.0.0.1 bind_port: 9090 -schedules_limit_per_db: 1 \ No newline at end of file +schedules_limit_per_db: 1 +processor_interval_seconds: 2 \ No newline at end of file