diff --git a/cmd/ydbcp/config.yaml b/cmd/ydbcp/config.yaml index 61026482..dfb16345 100644 --- a/cmd/ydbcp/config.yaml +++ b/cmd/ydbcp/config.yaml @@ -1 +1,2 @@ ydbcp_db_connection_string: "grpc://localhost:2136/local" +operation_ttl_seconds: 86400 # 24 hours diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 9231d732..61b89d1a 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -177,7 +177,7 @@ func main() { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { - log.Fatalf("failed to listen: %v", err) + xlog.Error(ctx, "failed to listen", zap.Error(err)) return } s := grpc.NewServer() @@ -204,10 +204,19 @@ func main() { handlersRegistry := processor.NewOperationHandlerRegistry() err = handlersRegistry.Add( - types.OperationType("TB"), handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()), + types.OperationTypeTB, handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()), ) if err != nil { - log.Fatalf("failed to register handler: %v", err) + xlog.Error(ctx, "failed to register TB handler", zap.Error(err)) + return + } + + err = handlersRegistry.Add( + types.OperationTypeRB, handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance), + ) + + if err != nil { + xlog.Error(ctx, "failed to register RB handler", zap.Error(err)) return } diff --git a/internal/config/config.go b/internal/config/config.go index 55271323..bbd457a4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -22,6 +22,7 @@ type S3Config struct { type Config struct { YdbcpDbConnectionString string `yaml:"ydbcp_db_connection_string"` S3 S3Config `yaml:"s3"` + OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"` } func (config Config) ToString() (string, error) { diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index 8bcd2c7b..8d9f65e9 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -26,6 +26,7 @@ type ClientConnector interface { ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error) GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error) ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error) + CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error) } type ClientYdbConnector struct { @@ -179,3 +180,27 @@ func (d *ClientYdbConnector) ForgetOperation(ctx context.Context, clientDb *ydb. return response, nil } + +func (d *ClientYdbConnector) CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error) { + if clientDb == nil { + return nil, fmt.Errorf("unititialized client db driver") + } + + client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(clientDb)) + xlog.Info(ctx, "Cancelling operation", + zap.String("id", operationId), + ) + + response, err := client.CancelOperation( + ctx, + &Ydb_Operations.CancelOperationRequest{ + Id: operationId, + }, + ) + + if err != nil { + return nil, fmt.Errorf("error cancelling operation: %s", err.Error()) + } + + return response, nil +} diff --git a/internal/connectors/client/mock.go b/internal/connectors/client/mock.go index 3bbc7f10..84a2e83b 100644 --- a/internal/connectors/client/mock.go +++ b/internal/connectors/client/mock.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" @@ -95,7 +96,17 @@ func (m *MockClientConnector) ImportFromS3(_ context.Context, _ *ydb.Driver, s3S func (m *MockClientConnector) GetOperationStatus(_ context.Context, _ *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error) { op, exist := m.operations[operationId] if !exist { - return nil, fmt.Errorf("operation %s doesn't exist", operationId) + issues := make([]*Ydb_Issue.IssueMessage, 1) + issues[0] = &Ydb_Issue.IssueMessage{ + Message: "operation not found", + } + + return &Ydb_Operations.GetOperationResponse{ + Operation: &Ydb_Operations.Operation{ + Status: Ydb.StatusIds_NOT_FOUND, + Issues: issues, + }, + }, nil } return &Ydb_Operations.GetOperationResponse{ @@ -106,7 +117,15 @@ func (m *MockClientConnector) GetOperationStatus(_ context.Context, _ *ydb.Drive func (m *MockClientConnector) ForgetOperation(_ context.Context, _ *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error) { _, exist := m.operations[operationId] if !exist { - return nil, fmt.Errorf("operation %s doesn't exist", operationId) + issues := make([]*Ydb_Issue.IssueMessage, 1) + issues[0] = &Ydb_Issue.IssueMessage{ + Message: "operation not found", + } + + return &Ydb_Operations.ForgetOperationResponse{ + Status: Ydb.StatusIds_NOT_FOUND, + Issues: issues, + }, nil } delete(m.operations, operationId) @@ -114,3 +133,23 @@ func (m *MockClientConnector) ForgetOperation(_ context.Context, _ *ydb.Driver, Status: Ydb.StatusIds_SUCCESS, }, nil } + +func (m *MockClientConnector) CancelOperation(_ context.Context, _ *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error) { + op, exist := m.operations[operationId] + if !exist { + issues := make([]*Ydb_Issue.IssueMessage, 1) + issues[0] = &Ydb_Issue.IssueMessage{ + Message: "operation not found", + } + + return &Ydb_Operations.CancelOperationResponse{ + Status: Ydb.StatusIds_NOT_FOUND, + Issues: issues, + }, nil + } + + op.Status = Ydb.StatusIds_CANCELLED + return &Ydb_Operations.CancelOperationResponse{ + Status: Ydb.StatusIds_SUCCESS, + }, nil +} diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index 112047e9..c28a58cc 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -60,6 +60,19 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { YdbConnectionParams: types.GetYdbConnectionParams(*database), YdbOperationId: *ydbOperationId, }, nil + } else if operationType == types.OperationTypeRB { + if backupId == nil || database == nil || ydbOperationId == nil { + return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) + } + return &types.RestoreBackupOperation{ + Id: operationId, + BackupId: *backupId, + State: types.OperationState(operationState), + Message: "", + YdbConnectionParams: types.GetYdbConnectionParams(*database), + YdbOperationId: *ydbOperationId, + }, nil } + return &types.GenericOperation{Id: operationId}, nil } diff --git a/internal/handlers/restore_backup.go b/internal/handlers/restore_backup.go new file mode 100644 index 00000000..b19ef796 --- /dev/null +++ b/internal/handlers/restore_backup.go @@ -0,0 +1,199 @@ +package handlers + +import ( + "context" + "fmt" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "go.uber.org/zap" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/db" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +func MakeRBOperationHandler(db db.DBConnector, client client.ClientConnector, config config.Config) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { + return RBOperationHandler(ctx, op, db, client, config) + } +} + +func valid(status Ydb.StatusIds_StatusCode) bool { + return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED +} + +func retriable(status Ydb.StatusIds_StatusCode) bool { + return status == Ydb.StatusIds_OVERLOADED || status == Ydb.StatusIds_UNAVAILABLE +} + +func RBOperationHandler( + ctx context.Context, + operation types.Operation, + db db.DBConnector, + client client.ClientConnector, + config config.Config, +) error { + xlog.Info(ctx, "received operation", + zap.String("id", operation.GetId().String()), + zap.String("type", string(operation.GetType())), + zap.String("state", string(operation.GetState())), + zap.String("message", operation.GetMessage()), + ) + + if operation.GetType() != types.OperationTypeRB { + return fmt.Errorf("wrong type %s != %s for operation %s", + operation.GetType(), types.OperationTypeRB, types.OperationToString(operation), + ) + } + + mr, ok := operation.(*types.RestoreBackupOperation) + if !ok { + return fmt.Errorf("can't cast operation to RestoreBackupOperation %s", types.OperationToString(operation)) + } + + conn, err := client.Open(ctx, types.MakeYdbConnectionString(mr.YdbConnectionParams)) + if err != nil { + return fmt.Errorf("error initializing client connector for operation #%s: %w", + mr.GetId().String(), err, + ) + } + + defer func() { _ = client.Close(ctx, conn) }() + + xlog.Info(ctx, "getting operation status", + zap.String("id", mr.Id.String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", mr.YdbOperationId), + ) + + opResponse, err := client.GetOperationStatus(ctx, conn, mr.YdbOperationId) + if err != nil { + if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { + operation.SetState(types.OperationStateError) + operation.SetMessage("Operation deadline exceeded") + return db.UpdateOperation(ctx, operation) + } + + return fmt.Errorf( + "failed to get operation status for operation #%s, import operation id %s: %w", + mr.GetId().String(), + mr.YdbOperationId, + err, + ) + } + + if retriable(opResponse.GetOperation().GetStatus()) { + xlog.Info(ctx, "received retriable error", + zap.String("id", mr.Id.String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", mr.YdbOperationId), + ) + + return nil + } + + if !valid(opResponse.GetOperation().GetStatus()) { + operation.SetState(types.OperationStateError) + operation.SetMessage(fmt.Sprintf("Error status: %s, issues: %s", + opResponse.GetOperation().GetStatus(), + types.IssuesToString(opResponse.GetOperation().Issues)), + ) + return db.UpdateOperation(ctx, operation) + } + + switch mr.State { + case types.OperationStatePending: + { + if !opResponse.GetOperation().Ready { + if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { + xlog.Info(ctx, "cancelling operation due to ttl", + zap.String("id", mr.Id.String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", mr.YdbOperationId), + ) + + response, err := client.CancelOperation(ctx, conn, mr.YdbOperationId) + if err != nil { + return fmt.Errorf( + "error cancelling operation #%s, import operation id %s: %w", + mr.GetId().String(), + mr.YdbOperationId, + err, + ) + } + + if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { + return fmt.Errorf( + "error cancelling operation id %s, import operation id %s, issues: %s", + mr.GetId().String(), + mr.YdbOperationId, + types.IssuesToString(response.GetIssues()), + ) + } + + operation.SetState(types.OperationStateCancelling) + operation.SetMessage("Operation deadline exceeded") + return db.UpdateOperation(ctx, operation) + } + + return nil + } + + if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { + operation.SetState(types.OperationStateDone) + operation.SetMessage("Success") + } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { + operation.SetState(types.OperationStateCancelled) + operation.SetMessage("Pending operation wac cancelled") + } + } + case types.OperationStateCancelling: + { + if !opResponse.GetOperation().Ready { + if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() { + operation.SetState(types.OperationStateError) + operation.SetMessage("Operation deadline exceeded") + return db.UpdateOperation(ctx, operation) + } + + return nil + } + + if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS { + operation.SetState(types.OperationStateDone) + operation.SetMessage("Operation was completed despite cancellation") + } else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED { + operation.SetState(types.OperationStateCancelled) + operation.SetMessage("Success") + } + } + } + + xlog.Info(ctx, "forgetting operation", + zap.String("id", mr.Id.String()), + zap.String("type", string(operation.GetType())), + zap.String("ydb_operation_id", mr.YdbOperationId), + ) + + response, err := client.ForgetOperation(ctx, conn, mr.YdbOperationId) + if err != nil { + return fmt.Errorf( + "error forgetting operation #%s, import operation id %s: %w", + mr.GetId().String(), + mr.YdbOperationId, + err, + ) + } + + if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { + return fmt.Errorf( + "error forgetting operation #%s, import operation id %s, issues: %s", + mr.GetId().String(), + mr.YdbOperationId, + types.IssuesToString(response.GetIssues()), + ) + } + + return db.UpdateOperation(ctx, operation) +} diff --git a/internal/handlers/restore_backup_test.go b/internal/handlers/restore_backup_test.go new file mode 100644 index 00000000..f9dddbbe --- /dev/null +++ b/internal/handlers/restore_backup_test.go @@ -0,0 +1,462 @@ +package handlers + +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations" + "testing" + "time" + "ydbcp/internal/config" + "ydbcp/internal/connectors/client" + "ydbcp/internal/connectors/db" + "ydbcp/internal/types" +) + +func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: "1", + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + + clientConnector := client.NewMockClientConnector() + dbConnector := db.NewMockDBConnector( + db.WithOperations(opMap), + ) + + // try to handle rb operation with non-existing ydb operation id + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "error status: NOT_FOUND, issues: message:\"operation not found\"", op.GetMessage()) +} + +func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + // try to handle pending rb operation with zero ttl + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be cancelled because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + assert.Equal(t, "operation deadline exceeded", op.GetMessage()) + + // check ydb operation status (should be cancelled) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_CANCELLED, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestRBOperationHandlerPendingOperationInProgress(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + // try to handle pending rb operation with ttl + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be pending) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStatePending, op.GetState()) + + // check ydb operation status (should be in progress + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestRBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestRBOperationHandlerPendingOperationCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_CANCELLED, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be cancelled) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelled, op.GetState()) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestRBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + // try to handle cancelling rb operation with zero ttl + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be failed because of deadline exceeded) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateError, op.GetState()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestRBOperationHandlerCancellingOperationInProgress(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 1000}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be the same as before) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelling, op.GetState()) + + // check ydb operation status (should be in progress) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_SUCCESS, ydbOpStatus.GetOperation().GetStatus()) + assert.Equal(t, false, ydbOpStatus.GetOperation().GetReady()) +} + +func TestRBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_SUCCESS, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be done) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateDone, op.GetState()) + assert.Equal(t, "Operation was completed despite cancellation", op.GetMessage()) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) +} + +func TestRBOperationHandlerCancellingOperationCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: true, + Status: Ydb.StatusIds_CANCELLED, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStateCancelling, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be cancelled) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStateCancelled, op.GetState()) + + // check ydb operation status (should be forgotten) + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_NOT_FOUND, ydbOpStatus.GetOperation().GetStatus()) + +} + +func TestRBOperationHandlerRetriableErrorForPendingOperation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ydbOp := &Ydb_Operations.Operation{ + Id: "1", + Ready: false, + Status: Ydb.StatusIds_UNAVAILABLE, + Issues: nil, + } + + rbOp := types.RestoreBackupOperation{ + Id: types.GenerateObjectID(), + BackupId: types.GenerateObjectID(), + State: types.OperationStatePending, + Message: "", + YdbConnectionParams: types.YdbConnectionParams{}, + YdbOperationId: ydbOp.Id, + CreatedAt: time.Now(), + } + + opMap := make(map[types.ObjectID]types.Operation) + opMap[rbOp.Id] = &rbOp + ydbOpMap := make(map[string]*Ydb_Operations.Operation) + ydbOpMap[ydbOp.Id] = ydbOp + + clientConnector := client.NewMockClientConnector(client.WithOperations(ydbOpMap)) + dbConnector := db.NewMockDBConnector(db.WithOperations(opMap)) + + handler := MakeRBOperationHandler(dbConnector, clientConnector, config.Config{OperationTtlSeconds: 10}) + err := handler(ctx, &rbOp) + assert.Empty(t, err) + + // check operation status (should be the same as before) + op, err := dbConnector.GetOperation(ctx, rbOp.Id) + assert.Empty(t, err) + assert.NotEmpty(t, op) + assert.Equal(t, types.OperationStatePending, op.GetState()) + + // check ydb operation status + ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) + assert.Empty(t, err) + assert.Equal(t, Ydb.StatusIds_UNAVAILABLE, ydbOpStatus.GetOperation().GetStatus()) +} diff --git a/internal/types/backup.go b/internal/types/backup.go index 6ac453bb..2285cd6a 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -3,11 +3,11 @@ package types import ( "context" "fmt" - "strings" - "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "go.uber.org/zap" + "strings" + "time" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto" @@ -121,6 +121,42 @@ func (o *TakeBackupOperation) SetMessage(m string) { o.Message = m } +type RestoreBackupOperation struct { + Id ObjectID + ContainerID string + BackupId ObjectID + State OperationState + Message string + YdbConnectionParams YdbConnectionParams + YdbOperationId string + DestinationPaths []string + CreatedAt time.Time +} + +func (o *RestoreBackupOperation) GetId() ObjectID { + return o.Id +} +func (o *RestoreBackupOperation) SetId(id ObjectID) { + o.Id = id +} +func (o *RestoreBackupOperation) GetType() OperationType { + return OperationTypeRB +} +func (o *RestoreBackupOperation) SetType(_ OperationType) { +} +func (o *RestoreBackupOperation) GetState() OperationState { + return o.State +} +func (o *RestoreBackupOperation) SetState(s OperationState) { + o.State = s +} +func (o *RestoreBackupOperation) GetMessage() string { + return o.Message +} +func (o *RestoreBackupOperation) SetMessage(m string) { + o.Message = m +} + type GenericOperation struct { Id ObjectID ContainerID string @@ -165,6 +201,7 @@ var ( const ( OperationTypeTB = OperationType("TB") + OperationTypeRB = OperationType("RB") BackupStatePending = "Pending" BackupStateAvailable = "Available"