Skip to content

Commit

Permalink
feat(ydbcp): support ttl for backups
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 17, 2024
1 parent 02af8fd commit f972b64
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 28 deletions.
26 changes: 26 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"flag"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"os"
"os/signal"
"sync"
"syscall"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"ydbcp/internal/auth"
"ydbcp/internal/config"
Expand Down Expand Up @@ -142,8 +144,32 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDOB,
handlers.NewDOBOperationHandler(dbConnector, queries.NewWriteTableQuery),
); err != nil {
xlog.Error(ctx, "failed to register DOB handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)

now := timestamppb.Now()
op := &types.DeleteOutdatedBackupsOperation{
State: types.OperationStateRunning,
UpdatedAt: now,
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: "ydbcp",
},
}

_, err = dbConnector.CreateOperation(ctx, op)
if err != nil {
xlog.Error(ctx, "can't create DeleteOutdatedBackups operation", zap.Error(err))
os.Exit(1)
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
80 changes: 59 additions & 21 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
creator *string
completedAt *time.Time
createdAt *time.Time
expireAt *time.Time
expireAtTs *timestamppb.Timestamp
)

err := res.ScanNamed(
Expand All @@ -91,6 +93,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("status", &status),
named.Optional("message", &message),
named.Optional("size", &size),
named.Optional("expire_at", &expireAt),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -100,6 +103,10 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
return nil, err
}

if expireAt != nil {
expireAtTs = timestamppb.New(*expireAt)
}

return &types.Backup{
ID: backupId,
ContainerID: containerId,
Expand All @@ -113,16 +120,17 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
Message: StringOrEmpty(message),
AuditInfo: auditFromDb(creator, createdAt, completedAt),
Size: Int64OrZero(size),
ExpireAt: expireAtTs,
}, nil
}

func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
var (
operationId string
containerId string
containerId *string
operationType string
databaseName string
databaseEndpoint string
databaseName *string
databaseEndpoint *string

backupId *string
ydbOperationId *string
Expand All @@ -138,10 +146,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
)
err := res.ScanNamed(
named.Required("id", &operationId),
named.Required("container_id", &containerId),
named.Optional("container_id", &containerId),
named.Required("type", &operationType),
named.Required("database", &databaseName),
named.Required("endpoint", &databaseEndpoint),
named.Optional("database", &databaseName),
named.Optional("endpoint", &databaseEndpoint),

named.Optional("backup_id", &backupId),
named.Optional("operation_id", &ydbOperationId),
Expand Down Expand Up @@ -175,19 +183,40 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
updatedTs = timestamppb.New(*updatedAt)
}

if operationType == string(types.OperationTypeTB) {
checkRequiredFieldsForUserOperation := func() error {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for TB operation: %s", operationId)
return fmt.Errorf("failed to read backup_id for TB operation: %s", operationId)
}

if databaseName == nil {
return fmt.Errorf("failed to read database for TB operation: %s", operationId)
}

if databaseEndpoint == nil {
return fmt.Errorf("failed to read endpoint for TB operation: %s", operationId)
}

if containerId == nil {
return fmt.Errorf("failed to read container_id for TB operation: %s", operationId)
}

return nil
}

if operationType == string(types.OperationTypeTB) {
if err := checkRequiredFieldsForUserOperation(); err != nil {
return nil, err
}

return &types.TakeBackupOperation{
ID: operationId,
BackupID: *backupId,
ContainerID: containerId,
ContainerID: *containerId,
State: operationState,
Message: StringOrEmpty(message),
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: databaseEndpoint,
DatabaseName: databaseName,
Endpoint: *databaseEndpoint,
DatabaseName: *databaseName,
},
SourcePaths: sourcePathsSlice,
SourcePathsToExclude: sourcePathsToExcludeSlice,
Expand All @@ -196,27 +225,28 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeRB) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for RB operation: %s", operationId)
if err := checkRequiredFieldsForUserOperation(); err != nil {
return nil, err
}

return &types.RestoreBackupOperation{
ID: operationId,
BackupId: *backupId,
ContainerID: containerId,
ContainerID: *containerId,
State: operationState,
Message: StringOrEmpty(message),
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: databaseEndpoint,
DatabaseName: databaseName,
Endpoint: *databaseEndpoint,
DatabaseName: *databaseName,
},
YdbOperationId: StringOrEmpty(ydbOperationId),
SourcePaths: sourcePathsSlice,
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeDB) {
if backupId == nil {
return nil, fmt.Errorf("failed to read backup_id for DB operation: %s", operationId)
if err := checkRequiredFieldsForUserOperation(); err != nil {
return nil, err
}

var pathPrefix string
Expand All @@ -227,17 +257,25 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
return &types.DeleteBackupOperation{
ID: operationId,
BackupID: *backupId,
ContainerID: containerId,
ContainerID: *containerId,
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: databaseEndpoint,
DatabaseName: databaseName,
Endpoint: *databaseEndpoint,
DatabaseName: *databaseName,
},
State: operationState,
Message: StringOrEmpty(message),
Audit: auditFromDb(creator, createdAt, completedAt),
PathPrefix: pathPrefix,
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeDOB) {
return &types.DeleteOutdatedBackupsOperation{
ID: operationId,
State: operationState,
Message: StringOrEmpty(message),
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
}, nil
}

return &types.GenericOperation{ID: operationId}, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
"initiated", "created_at", "completed_at",
"s3_endpoint", "s3_region", "s3_bucket",
"s3_path_prefix", "status", "paths", "message",
"size",
"size", "expire_at",
}
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
Expand Down
12 changes: 10 additions & 2 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
)

d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix))

} else if operation.GetType() == types.OperationTypeDOB {
_, ok := operation.(*types.DeleteOutdatedBackupsOperation)
if !ok {
log.Fatalf("error cast operation to DeleteOutdatedBackupsOperation operation_id %s", operation.GetID())
}
} else {
log.Fatalf(
"unknown operation type write to db operation_id %s, operation_type %s",
Expand All @@ -205,7 +209,7 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
"$message",
table_types.StringValueFromString(operation.GetMessage()),
)
if operation.GetAudit() != nil {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
d.AddValueParam(
"$completed_at",
table_types.TimestampValueFromTime(operation.GetAudit().GetCompletedAt().AsTime()),
Expand Down Expand Up @@ -263,6 +267,10 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$completed_at", table_types.TimestampValueFromTime(b.AuditInfo.CompletedAt.AsTime()))
}
}

if b.ExpireAt != nil {
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(b.ExpireAt.AsTime()))
}
return d
}

Expand Down
7 changes: 4 additions & 3 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE Backups (
status String,
message String,
size Int64,
expire_at Timestamp,

paths String,

Expand All @@ -36,9 +37,9 @@ CREATE TABLE OperationTypes (
CREATE TABLE Operations (
id String NOT NULL,
type String NOT NULL,
container_id String NOT NULL,
database String NOT NULL,
endpoint String NOT NULL,
container_id String,
database String,
endpoint String,
backup_id String,

initiated String,
Expand Down
101 changes: 101 additions & 0 deletions internal/handlers/delete_outdated_backups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package handlers

import (
"context"
"fmt"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

func NewDOBOperationHandler(
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DOBOperationHandler(ctx, op, db, queryBuilderFactory)
}
}

func DOBOperationHandler(
ctx context.Context,
operation types.Operation,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) error {
xlog.Info(ctx, "DOBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

if operation.GetType() != types.OperationTypeDOB {
return fmt.Errorf(
"wrong type %s != %s for operation %s",
operation.GetType(), types.OperationTypeDOB, types.OperationToString(operation),
)
}

_, ok := operation.(*types.DeleteOutdatedBackupsOperation)
if !ok {
return fmt.Errorf("can't cast operation to DeleteOutadatedBackupsOperation %s", types.OperationToString(operation))
}

backups, err := db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(types.BackupStateAvailable),
table_types.StringValueFromString(types.BackupStateError),
table_types.StringValueFromString(types.BackupStateCancelled),
},
},
),
),
)

if err != nil {
return fmt.Errorf("can't select backups: %v", err)
}

for _, backup := range backups {
if backup.ExpireAt != nil && backup.ExpireAt.AsTime().Before(timestamppb.Now().AsTime()) {
now := timestamppb.Now()
dbOp := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: backup.ID,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
DatabaseName: backup.DatabaseName,
Endpoint: backup.DatabaseEndpoint,
},
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: "ydbcp",
},
PathPrefix: backup.S3PathPrefix,
UpdatedAt: now,
}

backupToWrite := types.Backup{
ID: backup.ID,
Status: types.BackupStateDeleting,
}

err := db.ExecuteUpsert(
ctx, queryBuilderFactory().WithCreateOperation(dbOp).WithUpdateBackup(backupToWrite),
)

if err != nil {
return fmt.Errorf("can't create DeleteBackup operation: %v", err)
}
}
}

return db.UpdateOperation(ctx, operation)
}
3 changes: 3 additions & 0 deletions internal/handlers/delete_outdated_backups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package handlers

// TODO: add tests
1 change: 1 addition & 0 deletions internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
S3Bucket: s.s3.Bucket,
S3PathPrefix: destinationPrefix,
Status: types.BackupStateRunning,
ExpireAt: nil, // TODO: set ExpireAt based on user backup ttl
AuditInfo: &pb.AuditInfo{
CreatedAt: now,
Creator: subject,
Expand Down
Loading

0 comments on commit f972b64

Please sign in to comment.