Skip to content

Commit

Permalink
feat(ydbcp): support ttl for backups
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 17, 2024
1 parent 02af8fd commit 8d8058d
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 2 deletions.
26 changes: 26 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"flag"
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"os"
"os/signal"
"sync"
"syscall"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"ydbcp/internal/auth"
"ydbcp/internal/config"
Expand Down Expand Up @@ -142,8 +144,32 @@ func main() {
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDOB,
handlers.NewDOBOperationHandler(dbConnector, queries.NewWriteTableQuery),
); err != nil {
xlog.Error(ctx, "failed to register DOB handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)

now := timestamppb.Now()
op := &types.DeleteOutdatedBackupsOperation{
State: types.OperationStateRunning,
UpdatedAt: now,
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: "ydbcp",
},
}

_, err = dbConnector.CreateOperation(ctx, op)
if err != nil {
xlog.Error(ctx, "can't create DeleteOutdatedBackups operation", zap.Error(err))
os.Exit(1)
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
16 changes: 16 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
creator *string
completedAt *time.Time
createdAt *time.Time
expireAt *time.Time
expireAtTs *timestamppb.Timestamp
)

err := res.ScanNamed(
Expand All @@ -91,6 +93,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("status", &status),
named.Optional("message", &message),
named.Optional("size", &size),
named.Optional("expire_at", &expireAt),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -100,6 +103,10 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
return nil, err
}

if expireAt != nil {
expireAtTs = timestamppb.New(*expireAt)
}

return &types.Backup{
ID: backupId,
ContainerID: containerId,
Expand All @@ -113,6 +120,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
Message: StringOrEmpty(message),
AuditInfo: auditFromDb(creator, createdAt, completedAt),
Size: Int64OrZero(size),
ExpireAt: expireAtTs,
}, nil
}

Expand Down Expand Up @@ -238,6 +246,14 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
PathPrefix: pathPrefix,
UpdatedAt: updatedTs,
}, nil
} else if operationType == string(types.OperationTypeDOB) {
return &types.DeleteOutdatedBackupsOperation{
ID: operationId,
State: operationState,
Message: StringOrEmpty(message),
Audit: auditFromDb(creator, createdAt, completedAt),
UpdatedAt: updatedTs,
}, nil
}

return &types.GenericOperation{ID: operationId}, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
"initiated", "created_at", "completed_at",
"s3_endpoint", "s3_region", "s3_bucket",
"s3_path_prefix", "status", "paths", "message",
"size",
"size", "expire_at",
}
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
Expand Down
23 changes: 23 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,26 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
)

d.AddValueParam("$paths", table_types.StringValueFromString(db.PathPrefix))
} else if operation.GetType() == types.OperationTypeDOB {
_, ok := operation.(*types.DeleteOutdatedBackupsOperation)
if !ok {
log.Fatalf("error cast operation to DeleteOutdatedBackupsOperation operation_id %s", operation.GetID())
}

d.AddValueParam(
"$container_id",
table_types.StringValueFromString(""), // TODO: this column is required, but this operation doesn't have it
)

d.AddValueParam(
"$database",
table_types.StringValueFromString(""), // TODO: this column is required, but this operation doesn't have it
)

d.AddValueParam(
"$endpoint",
table_types.StringValueFromString(""), // TODO: this column is required, but this operation doesn't have it
)
} else {
log.Fatalf(
"unknown operation type write to db operation_id %s, operation_type %s",
Expand Down Expand Up @@ -263,6 +282,10 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$completed_at", table_types.TimestampValueFromTime(b.AuditInfo.CompletedAt.AsTime()))
}
}

if b.ExpireAt != nil {
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(b.ExpireAt.AsTime()))
}
return d
}

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE Backups (
status String,
message String,
size Int64,
expire_at Timestamp,

paths String,

Expand Down
101 changes: 101 additions & 0 deletions internal/handlers/delete_outdated_backups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package handlers

import (
"context"
"fmt"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

func NewDOBOperationHandler(
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return DOBOperationHandler(ctx, op, db, queryBuilderFactory)
}
}

func DOBOperationHandler(
ctx context.Context,
operation types.Operation,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) error {
xlog.Info(ctx, "DOBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

if operation.GetType() != types.OperationTypeDOB {
return fmt.Errorf(
"wrong type %s != %s for operation %s",
operation.GetType(), types.OperationTypeDOB, types.OperationToString(operation),
)
}

_, ok := operation.(*types.DeleteOutdatedBackupsOperation)
if !ok {
return fmt.Errorf("can't cast operation to DeleteOutadatedBackupsOperation %s", types.OperationToString(operation))
}

backups, err := db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(types.BackupStateAvailable),
table_types.StringValueFromString(types.BackupStateError),
table_types.StringValueFromString(types.BackupStateCancelled),
},
},
),
),
)

if err != nil {
return fmt.Errorf("can't select backups: %v", err)
}

for _, backup := range backups {
if backup.ExpireAt != nil && backup.ExpireAt.AsTime().Before(timestamppb.Now().AsTime()) {
now := timestamppb.Now()
dbOp := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: backup.ID,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
DatabaseName: backup.DatabaseName,
Endpoint: backup.DatabaseEndpoint,
},
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: "ydbcp",
},
PathPrefix: backup.S3PathPrefix,
UpdatedAt: now,
}

backupToWrite := types.Backup{
ID: backup.ID,
Status: types.BackupStateDeleting,
}

err := db.ExecuteUpsert(
ctx, queryBuilderFactory().WithCreateOperation(dbOp).WithUpdateBackup(backupToWrite),
)

if err != nil {
return fmt.Errorf("can't create DeleteBackup operation: %v", err)
}
}
}

return db.UpdateOperation(ctx, operation)
}
3 changes: 3 additions & 0 deletions internal/handlers/delete_outdated_backups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package handlers

// TODO: add tests
1 change: 1 addition & 0 deletions internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
S3Bucket: s.s3.Bucket,
S3PathPrefix: destinationPrefix,
Status: types.BackupStateRunning,
ExpireAt: nil, // TODO: set ExpireAt based on user backup ttl
AuditInfo: &pb.AuditInfo{
CreatedAt: now,
Creator: subject,
Expand Down
4 changes: 3 additions & 1 deletion internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"

pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

Expand Down Expand Up @@ -36,6 +37,7 @@ type Backup struct {
Message string
AuditInfo *pb.AuditInfo
Size int64
ExpireAt *timestamppb.Timestamp
}

func (o *Backup) String() string {
Expand Down Expand Up @@ -65,7 +67,7 @@ func (o *Backup) Proto() *pb.Backup {
Size: o.Size,
Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]),
Message: o.Message,
ExpireAt: nil,
ExpireAt: o.ExpireAt,
}
}

Expand Down
68 changes: 68 additions & 0 deletions internal/types/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,73 @@ func (o *DeleteBackupOperation) Proto() *pb.Operation {
}
}

type DeleteOutdatedBackupsOperation struct {
ID string
State OperationState
Message string
Audit *pb.AuditInfo
UpdatedAt *timestamppb.Timestamp
}

func (o *DeleteOutdatedBackupsOperation) GetID() string {
return o.ID
}
func (o *DeleteOutdatedBackupsOperation) SetID(id string) {
o.ID = id
}
func (o *DeleteOutdatedBackupsOperation) GetContainerID() string {
return ""
}
func (o *DeleteOutdatedBackupsOperation) GetType() OperationType {
return OperationTypeDOB
}
func (o *DeleteOutdatedBackupsOperation) SetType(_ OperationType) {
}
func (o *DeleteOutdatedBackupsOperation) GetState() OperationState {
return o.State
}
func (o *DeleteOutdatedBackupsOperation) SetState(s OperationState) {
o.State = s
}
func (o *DeleteOutdatedBackupsOperation) GetMessage() string {
return o.Message
}
func (o *DeleteOutdatedBackupsOperation) SetMessage(m string) {
o.Message = m
}
func (o *DeleteOutdatedBackupsOperation) GetAudit() *pb.AuditInfo {
return o.Audit
}
func (o *DeleteOutdatedBackupsOperation) GetUpdatedAt() *timestamppb.Timestamp {
return o.UpdatedAt
}
func (o *DeleteOutdatedBackupsOperation) SetUpdatedAt(t *timestamppb.Timestamp) {
o.UpdatedAt = t
}
func (o *DeleteOutdatedBackupsOperation) Copy() Operation {
copy := *o
return &copy
}

func (o *DeleteOutdatedBackupsOperation) Proto() *pb.Operation {
return &pb.Operation{
Id: o.ID,
ContainerId: "",
Type: string(OperationTypeDOB),
DatabaseName: "",
DatabaseEndpoint: "",
YdbServerOperationId: "",
BackupId: "",
SourcePaths: nil,
SourcePathsToExclude: nil,
RestorePaths: nil,
Audit: o.Audit,
Status: o.State.Enum(),
Message: o.Message,
UpdatedAt: o.UpdatedAt,
}
}

type GenericOperation struct {
ID string
ContainerID string
Expand Down Expand Up @@ -333,6 +400,7 @@ const (
OperationTypeTB = OperationType("TB")
OperationTypeRB = OperationType("RB")
OperationTypeDB = OperationType("DB")
OperationTypeDOB = OperationType("DOB")
BackupTimestampFormat = "20060102_150405"
S3ForcePathStyle = true
)
Expand Down

0 comments on commit 8d8058d

Please sign in to comment.