diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 9f476e06..d9267f3b 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -4,10 +4,12 @@ import ( "context" "flag" "fmt" + "google.golang.org/protobuf/types/known/timestamppb" "os" "os/signal" "sync" "syscall" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" "ydbcp/internal/auth" "ydbcp/internal/config" @@ -142,8 +144,32 @@ func main() { os.Exit(1) } + if err := handlersRegistry.Add( + types.OperationTypeDOB, + handlers.NewDOBOperationHandler(dbConnector, queries.NewWriteTableQuery), + ); err != nil { + xlog.Error(ctx, "failed to register DOB handler", zap.Error(err)) + os.Exit(1) + } + processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry) + now := timestamppb.Now() + op := &types.DeleteOutdatedBackupsOperation{ + State: types.OperationStateRunning, + UpdatedAt: now, + Audit: &pb.AuditInfo{ + CreatedAt: now, + Creator: "ydbcp", + }, + } + + _, err = dbConnector.CreateOperation(ctx, op) + if err != nil { + xlog.Error(ctx, "can't create DeleteOutdatedBackups operation", zap.Error(err)) + os.Exit(1) + } + wg.Add(1) go func() { defer wg.Done() diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index bdcccb6f..ebdd52d1 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -77,6 +77,8 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { creator *string completedAt *time.Time createdAt *time.Time + expireAt *time.Time + expireAtTs *timestamppb.Timestamp ) err := res.ScanNamed( @@ -91,6 +93,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { named.Optional("status", &status), named.Optional("message", &message), named.Optional("size", &size), + named.Optional("expire_at", &expireAt), named.Optional("created_at", &createdAt), named.Optional("completed_at", &completedAt), @@ -100,6 +103,10 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { return nil, err } + if expireAt != nil { + expireAtTs = timestamppb.New(*expireAt) + } + return &types.Backup{ ID: backupId, ContainerID: containerId, @@ -113,16 +120,17 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { Message: StringOrEmpty(message), AuditInfo: auditFromDb(creator, createdAt, completedAt), Size: Int64OrZero(size), + ExpireAt: expireAtTs, }, nil } func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { var ( operationId string - containerId string + containerId *string operationType string - databaseName string - databaseEndpoint string + databaseName *string + databaseEndpoint *string backupId *string ydbOperationId *string @@ -138,10 +146,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { ) err := res.ScanNamed( named.Required("id", &operationId), - named.Required("container_id", &containerId), + named.Optional("container_id", &containerId), named.Required("type", &operationType), - named.Required("database", &databaseName), - named.Required("endpoint", &databaseEndpoint), + named.Optional("database", &databaseName), + named.Optional("endpoint", &databaseEndpoint), named.Optional("backup_id", &backupId), named.Optional("operation_id", &ydbOperationId), @@ -175,19 +183,40 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { updatedTs = timestamppb.New(*updatedAt) } - if operationType == string(types.OperationTypeTB) { + checkRequiredFieldsForUserOperation := func() error { if backupId == nil { - return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId) + return fmt.Errorf("failed to read backup_id for TB operation: %s", operationId) + } + + if databaseName == nil { + return fmt.Errorf("failed to read database for TB operation: %s", operationId) + } + + if databaseEndpoint == nil { + return fmt.Errorf("failed to read endpoint for TB operation: %s", operationId) } + + if containerId == nil { + return fmt.Errorf("failed to read container_id for TB operation: %s", operationId) + } + + return nil + } + + if operationType == string(types.OperationTypeTB) { + if err := checkRequiredFieldsForUserOperation(); err != nil { + return nil, err + } + return &types.TakeBackupOperation{ ID: operationId, BackupID: *backupId, - ContainerID: containerId, + ContainerID: *containerId, State: operationState, Message: StringOrEmpty(message), YdbConnectionParams: types.YdbConnectionParams{ - Endpoint: databaseEndpoint, - DatabaseName: databaseName, + Endpoint: *databaseEndpoint, + DatabaseName: *databaseName, }, SourcePaths: sourcePathsSlice, SourcePathsToExclude: sourcePathsToExcludeSlice, @@ -196,18 +225,19 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { UpdatedAt: updatedTs, }, nil } else if operationType == string(types.OperationTypeRB) { - if backupId == nil { - return nil, fmt.Errorf("failed to read backup_id for RB operation: %s", operationId) + if err := checkRequiredFieldsForUserOperation(); err != nil { + return nil, err } + return &types.RestoreBackupOperation{ ID: operationId, BackupId: *backupId, - ContainerID: containerId, + ContainerID: *containerId, State: operationState, Message: StringOrEmpty(message), YdbConnectionParams: types.YdbConnectionParams{ - Endpoint: databaseEndpoint, - DatabaseName: databaseName, + Endpoint: *databaseEndpoint, + DatabaseName: *databaseName, }, YdbOperationId: StringOrEmpty(ydbOperationId), SourcePaths: sourcePathsSlice, @@ -215,8 +245,8 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { UpdatedAt: updatedTs, }, nil } else if operationType == string(types.OperationTypeDB) { - if backupId == nil { - return nil, fmt.Errorf("failed to read backup_id for DB operation: %s", operationId) + if err := checkRequiredFieldsForUserOperation(); err != nil { + return nil, err } var pathPrefix string @@ -227,10 +257,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { return &types.DeleteBackupOperation{ ID: operationId, BackupID: *backupId, - ContainerID: containerId, + ContainerID: *containerId, YdbConnectionParams: types.YdbConnectionParams{ - Endpoint: databaseEndpoint, - DatabaseName: databaseName, + Endpoint: *databaseEndpoint, + DatabaseName: *databaseName, }, State: operationState, Message: StringOrEmpty(message), @@ -238,6 +268,14 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { PathPrefix: pathPrefix, UpdatedAt: updatedTs, }, nil + } else if operationType == string(types.OperationTypeDOB) { + return &types.DeleteOutdatedBackupsOperation{ + ID: operationId, + State: operationState, + Message: StringOrEmpty(message), + Audit: auditFromDb(creator, createdAt, completedAt), + UpdatedAt: updatedTs, + }, nil } return &types.GenericOperation{ID: operationId}, nil diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index cef2695c..4f557ff3 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -20,7 +20,7 @@ var ( "initiated", "created_at", "completed_at", "s3_endpoint", "s3_region", "s3_bucket", "s3_path_prefix", "status", "paths", "message", - "size", + "size", "expire_at", } AllOperationFields = []string{ "id", "type", "container_id", "database", "endpoint", "backup_id", diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 6b43cbf6..5b7e3106 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -180,7 +180,11 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle ) d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix)) - + } else if operation.GetType() == types.OperationTypeDOB { + _, ok := operation.(*types.DeleteOutdatedBackupsOperation) + if !ok { + log.Fatalf("error cast operation to DeleteOutdatedBackupsOperation operation_id %s", operation.GetID()) + } } else { log.Fatalf( "unknown operation type write to db operation_id %s, operation_type %s", @@ -205,7 +209,7 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle "$message", table_types.StringValueFromString(operation.GetMessage()), ) - if operation.GetAudit() != nil { + if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { d.AddValueParam( "$completed_at", table_types.TimestampValueFromTime(operation.GetAudit().GetCompletedAt().AsTime()), @@ -263,6 +267,10 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl d.AddValueParam("$completed_at", table_types.TimestampValueFromTime(b.AuditInfo.CompletedAt.AsTime())) } } + + if b.ExpireAt != nil { + d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(b.ExpireAt.AsTime())) + } return d } diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index 71092efa..c8d59141 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -16,6 +16,7 @@ CREATE TABLE Backups ( status String, message String, size Int64, + expire_at Timestamp, paths String, @@ -36,9 +37,9 @@ CREATE TABLE OperationTypes ( CREATE TABLE Operations ( id String NOT NULL, type String NOT NULL, - container_id String NOT NULL, - database String NOT NULL, - endpoint String NOT NULL, + container_id String, + database String, + endpoint String, backup_id String, initiated String, diff --git a/internal/handlers/delete_outdated_backups.go b/internal/handlers/delete_outdated_backups.go new file mode 100644 index 00000000..adc3c078 --- /dev/null +++ b/internal/handlers/delete_outdated_backups.go @@ -0,0 +1,101 @@ +package handlers + +import ( + "context" + "fmt" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "google.golang.org/protobuf/types/known/timestamppb" + "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" + pb "ydbcp/pkg/proto/ydbcp/v1alpha1" +) + +func NewDOBOperationHandler( + db db.DBConnector, + queryBuilderFactory queries.WriteQueryBulderFactory, +) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { + return DOBOperationHandler(ctx, op, db, queryBuilderFactory) + } +} + +func DOBOperationHandler( + ctx context.Context, + operation types.Operation, + db db.DBConnector, + queryBuilderFactory queries.WriteQueryBulderFactory, +) error { + xlog.Info(ctx, "DOBOperationHandler", zap.String("OperationMessage", operation.GetMessage())) + + if operation.GetType() != types.OperationTypeDOB { + return fmt.Errorf( + "wrong type %s != %s for operation %s", + operation.GetType(), types.OperationTypeDOB, types.OperationToString(operation), + ) + } + + _, ok := operation.(*types.DeleteOutdatedBackupsOperation) + if !ok { + return fmt.Errorf("can't cast operation to DeleteOutadatedBackupsOperation %s", types.OperationToString(operation)) + } + + backups, err := db.SelectBackups( + ctx, queries.NewReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "status", + Values: []table_types.Value{ + table_types.StringValueFromString(types.BackupStateAvailable), + table_types.StringValueFromString(types.BackupStateError), + table_types.StringValueFromString(types.BackupStateCancelled), + }, + }, + ), + ), + ) + + if err != nil { + return fmt.Errorf("can't select backups: %v", err) + } + + for _, backup := range backups { + if backup.ExpireAt != nil && backup.ExpireAt.AsTime().Before(timestamppb.Now().AsTime()) { + now := timestamppb.Now() + dbOp := &types.DeleteBackupOperation{ + ContainerID: backup.ContainerID, + BackupID: backup.ID, + State: types.OperationStatePending, + YdbConnectionParams: types.YdbConnectionParams{ + DatabaseName: backup.DatabaseName, + Endpoint: backup.DatabaseEndpoint, + }, + Audit: &pb.AuditInfo{ + CreatedAt: now, + Creator: "ydbcp", + }, + PathPrefix: backup.S3PathPrefix, + UpdatedAt: now, + } + + backupToWrite := types.Backup{ + ID: backup.ID, + Status: types.BackupStateDeleting, + } + + err := db.ExecuteUpsert( + ctx, queryBuilderFactory().WithCreateOperation(dbOp).WithUpdateBackup(backupToWrite), + ) + + if err != nil { + return fmt.Errorf("can't create DeleteBackup operation: %v", err) + } + } + } + + return db.UpdateOperation(ctx, operation) +} diff --git a/internal/handlers/delete_outdated_backups_test.go b/internal/handlers/delete_outdated_backups_test.go new file mode 100644 index 00000000..abf248be --- /dev/null +++ b/internal/handlers/delete_outdated_backups_test.go @@ -0,0 +1,3 @@ +package handlers + +// TODO: add tests diff --git a/internal/server/services/backup/backup_service.go b/internal/server/services/backup/backup_service.go index 264d257b..b56b6216 100644 --- a/internal/server/services/backup/backup_service.go +++ b/internal/server/services/backup/backup_service.go @@ -217,6 +217,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques S3Bucket: s.s3.Bucket, S3PathPrefix: destinationPrefix, Status: types.BackupStateRunning, + ExpireAt: nil, // TODO: set ExpireAt based on user backup ttl AuditInfo: &pb.AuditInfo{ CreatedAt: now, Creator: subject, diff --git a/internal/types/backup.go b/internal/types/backup.go index 4a2232af..8a90cf37 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "google.golang.org/protobuf/types/known/timestamppb" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -36,6 +37,7 @@ type Backup struct { Message string AuditInfo *pb.AuditInfo Size int64 + ExpireAt *timestamppb.Timestamp } func (o *Backup) String() string { @@ -65,7 +67,7 @@ func (o *Backup) Proto() *pb.Backup { Size: o.Size, Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]), Message: o.Message, - ExpireAt: nil, + ExpireAt: o.ExpireAt, } } diff --git a/internal/types/operation.go b/internal/types/operation.go index f35bbefe..6565e33e 100644 --- a/internal/types/operation.go +++ b/internal/types/operation.go @@ -254,6 +254,73 @@ func (o *DeleteBackupOperation) Proto() *pb.Operation { } } +type DeleteOutdatedBackupsOperation struct { + ID string + State OperationState + Message string + Audit *pb.AuditInfo + UpdatedAt *timestamppb.Timestamp +} + +func (o *DeleteOutdatedBackupsOperation) GetID() string { + return o.ID +} +func (o *DeleteOutdatedBackupsOperation) SetID(id string) { + o.ID = id +} +func (o *DeleteOutdatedBackupsOperation) GetContainerID() string { + return "" +} +func (o *DeleteOutdatedBackupsOperation) GetType() OperationType { + return OperationTypeDOB +} +func (o *DeleteOutdatedBackupsOperation) SetType(_ OperationType) { +} +func (o *DeleteOutdatedBackupsOperation) GetState() OperationState { + return o.State +} +func (o *DeleteOutdatedBackupsOperation) SetState(s OperationState) { + o.State = s +} +func (o *DeleteOutdatedBackupsOperation) GetMessage() string { + return o.Message +} +func (o *DeleteOutdatedBackupsOperation) SetMessage(m string) { + o.Message = m +} +func (o *DeleteOutdatedBackupsOperation) GetAudit() *pb.AuditInfo { + return o.Audit +} +func (o *DeleteOutdatedBackupsOperation) GetUpdatedAt() *timestamppb.Timestamp { + return o.UpdatedAt +} +func (o *DeleteOutdatedBackupsOperation) SetUpdatedAt(t *timestamppb.Timestamp) { + o.UpdatedAt = t +} +func (o *DeleteOutdatedBackupsOperation) Copy() Operation { + copy := *o + return © +} + +func (o *DeleteOutdatedBackupsOperation) Proto() *pb.Operation { + return &pb.Operation{ + Id: o.ID, + ContainerId: "", + Type: string(OperationTypeDOB), + DatabaseName: "", + DatabaseEndpoint: "", + YdbServerOperationId: "", + BackupId: "", + SourcePaths: nil, + SourcePathsToExclude: nil, + RestorePaths: nil, + Audit: o.Audit, + Status: o.State.Enum(), + Message: o.Message, + UpdatedAt: o.UpdatedAt, + } +} + type GenericOperation struct { ID string ContainerID string @@ -333,6 +400,7 @@ const ( OperationTypeTB = OperationType("TB") OperationTypeRB = OperationType("RB") OperationTypeDB = OperationType("DB") + OperationTypeDOB = OperationType("DOB") BackupTimestampFormat = "20060102_150405" S3ForcePathStyle = true )