Skip to content

Commit

Permalink
feat(ydbcp): support ttl for backups
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina committed Sep 24, 2024
1 parent 1e9293a commit aa60231
Show file tree
Hide file tree
Showing 15 changed files with 167 additions and 12 deletions.
1 change: 1 addition & 0 deletions cmd/ydbcp/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
operation_ttl_seconds: 86400 # 24 hours
backup_ttl_seconds: 86400 * 7 # 1 week

db_connection:
connection_string: "grpcs://localhost:2135/domain/database"
Expand Down
2 changes: 2 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"ydbcp/internal/server/services/backup"
"ydbcp/internal/server/services/backup_schedule"
"ydbcp/internal/server/services/operation"
"ydbcp/internal/ttl_watcher"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
ap "ydbcp/pkg/plugins/auth"
Expand Down Expand Up @@ -144,6 +145,7 @@ func main() {
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

wg.Add(1)
go func() {
Expand Down
8 changes: 7 additions & 1 deletion internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func MakeBackup(
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
req *pb.MakeBackupRequest, scheduleId *string,
subject string,
subject string, ttlSeconds *int64,
) (*types.Backup, *types.TakeBackupOperation, error) {
if !IsAllowedEndpoint(req.DatabaseEndpoint, allowedEndpointDomains, allowInsecureEndpoint) {
xlog.Error(
Expand Down Expand Up @@ -144,6 +144,11 @@ func MakeBackup(
ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID))
xlog.Info(ctx, "Export operation started")

var expiredAt *timestamppb.Timestamp
if ttlSeconds != nil {
expiredAt = timestamppb.New(timestamppb.Now().AsTime().Add(time.Duration(*ttlSeconds) * time.Second))
}

now := timestamppb.Now()
backup := &types.Backup{
ID: types.GenerateObjectID(),
Expand All @@ -160,6 +165,7 @@ func MakeBackup(
Creator: subject,
},
ScheduleID: scheduleId,
ExpireAt: expiredAt,
}

op := &types.TakeBackupOperation{
Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Config struct {
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
BackupTtlSeconds int64 `yaml:"backup_ttl_seconds"`
}

var (
Expand Down
8 changes: 8 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
creator *string
completedAt *time.Time
createdAt *time.Time
expireAt *time.Time
expireAtTs *timestamppb.Timestamp
)

err := res.ScanNamed(
Expand All @@ -93,6 +95,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("message", &message),
named.Optional("size", &size),
named.Optional("schedule_id", &scheduleId),
named.Optional("expire_at", &expireAt),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -102,6 +105,10 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
return nil, err
}

if expireAt != nil {
expireAtTs = timestamppb.New(*expireAt)
}

return &types.Backup{
ID: backupId,
ContainerID: containerId,
Expand All @@ -116,6 +123,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
AuditInfo: auditFromDb(creator, createdAt, completedAt),
Size: Int64OrZero(size),
ScheduleID: scheduleId,
ExpireAt: expireAtTs,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
"initiated", "created_at", "completed_at",
"s3_endpoint", "s3_region", "s3_bucket",
"s3_path_prefix", "status", "paths", "message",
"size", "schedule_id",
"size", "schedule_id", "expire_at",
}
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
Expand Down
6 changes: 5 additions & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
"$message",
table_types.StringValueFromString(operation.GetMessage()),
)
if operation.GetAudit() != nil {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
d.AddValueParam(
"$completed_at",
table_types.TimestampValueFromTime(operation.GetAudit().GetCompletedAt().AsTime()),
Expand Down Expand Up @@ -266,6 +266,10 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$completed_at", table_types.TimestampValueFromTime(b.AuditInfo.CompletedAt.AsTime()))
}
}

if b.ExpireAt != nil {
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(b.ExpireAt.AsTime()))
}
return d
}

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE Backups (
status String,
message String,
size Int64,
expire_at Timestamp,

paths String,

Expand Down
9 changes: 5 additions & 4 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
func NewBackupScheduleHandler(
driver db.DBConnector,
clientConn client.ClientConnector,
s3 config.S3Config,
config config.Config,
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
queryBuilderFactory queries.WriteQueryBulderFactory,
) types.BackupScheduleHandler {
return func(ctx context.Context, schedule types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, schedule, driver, clientConn, s3, allowedEndpointDomains, allowInsecureEndpoint, queryBuilderFactory,
ctx, schedule, driver, clientConn, config, allowedEndpointDomains, allowInsecureEndpoint, queryBuilderFactory,
)
}
}
Expand All @@ -32,7 +32,7 @@ func BackupScheduleHandler(
schedule types.BackupSchedule,
driver db.DBConnector,
clientConn client.ClientConnector,
s3 config.S3Config,
config config.Config,
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
queryBuilderFactory queries.WriteQueryBulderFactory,
Expand All @@ -43,13 +43,14 @@ func BackupScheduleHandler(
now := time.Now()
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) {
b, op, err := backup_operations.MakeBackup(
ctx, clientConn, s3, allowedEndpointDomains, allowInsecureEndpoint, &pb.MakeBackupRequest{
ctx, clientConn, config.S3, allowedEndpointDomains, allowInsecureEndpoint, &pb.MakeBackupRequest{
ContainerId: schedule.ContainerID,
DatabaseName: schedule.DatabaseName,
DatabaseEndpoint: schedule.DatabaseEndpoint,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
}, &schedule.ID, "YDBCP", //TODO: who to put as subject here?
&config.BackupTtlSeconds,
)
if err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions internal/handlers/schedule_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ func TestBackupScheduleHandler(t *testing.T) {
handler := NewBackupScheduleHandler(
dbConnector,
clientConnector,
config.S3Config{
S3ForcePathStyle: false,
IsMock: true,
config.Config{
S3: config.S3Config{
S3ForcePathStyle: false,
IsMock: true,
},
},
[]string{".valid.com"},
true,
Expand Down
2 changes: 1 addition & 1 deletion internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
ctx = xlog.With(ctx, zap.String("SubjectID", subject))

backup, op, err := backup_operations.MakeBackup(
ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject,
ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject, nil,
)

if err != nil {
Expand Down
123 changes: 123 additions & 0 deletions internal/ttl_watcher/ttl_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package ttl_watcher

import (
"context"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"sync"
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/ticker"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

type TtlWatcherImpl struct {
ctx context.Context
period time.Duration
handleOperationTimeout time.Duration
tickerProvider ticker.TickerProvider
db db.DBConnector
queryBuilderFactory queries.WriteQueryBulderFactory
}

func NewTtlWatcher(
ctx context.Context,
wg *sync.WaitGroup,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) *TtlWatcherImpl {
w := &TtlWatcherImpl{
ctx: ctx,
period: time.Hour,
db: db,
tickerProvider: ticker.NewRealTicker,
queryBuilderFactory: queryBuilderFactory,
}

wg.Add(1)
go w.run(wg)
return w
}

func (o *TtlWatcherImpl) run(wg *sync.WaitGroup) {
defer wg.Done()
xlog.Debug(o.ctx, "Ttl watcher started", zap.Duration("period", o.period))
ticker := o.tickerProvider(o.period)
for {
select {
case <-o.ctx.Done():
ticker.Stop()
xlog.Debug(o.ctx, "Ttl watcher stopped")
return
case <-ticker.Chan():
o.deleteOutdatedBackups()
}
}
}

func (o *TtlWatcherImpl) deleteOutdatedBackups() {
ctx, cancel := context.WithTimeout(o.ctx, o.period)
defer cancel()

backups, err := o.db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(types.BackupStateAvailable),
table_types.StringValueFromString(types.BackupStateError),
table_types.StringValueFromString(types.BackupStateCancelled),
},
},
),
),
)

if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return
}

for _, backup := range backups {
if backup.ExpireAt != nil && backup.ExpireAt.AsTime().Before(timestamppb.Now().AsTime()) {
now := timestamppb.Now()
dbOp := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: backup.ID,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
DatabaseName: backup.DatabaseName,
Endpoint: backup.DatabaseEndpoint,
},
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: "ydbcp",
},
PathPrefix: backup.S3PathPrefix,
UpdatedAt: now,
}

backupToWrite := types.Backup{
ID: backup.ID,
Status: types.BackupStateDeleting,
}

err := o.db.ExecuteUpsert(
ctx, o.queryBuilderFactory().WithCreateOperation(dbOp).WithUpdateBackup(backupToWrite),
)

if err != nil {
xlog.Error(ctx, "can't create DeleteBackup operation", zap.String("BackupID", backup.ID), zap.Error(err))
}

xlog.Debug(ctx, "DeleteBackup operation was created successfully", zap.String("BackupID", backup.ID))
}
}
}
3 changes: 3 additions & 0 deletions internal/ttl_watcher/ttl_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package ttl_watcher

// TODO
4 changes: 3 additions & 1 deletion internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"

pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

Expand Down Expand Up @@ -37,6 +38,7 @@ type Backup struct {
AuditInfo *pb.AuditInfo
Size int64
ScheduleID *string
ExpireAt *timestamppb.Timestamp
}

func (o *Backup) String() string {
Expand Down Expand Up @@ -66,7 +68,7 @@ func (o *Backup) Proto() *pb.Backup {
Size: o.Size,
Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]),
Message: o.Message,
ExpireAt: nil,
ExpireAt: o.ExpireAt,
}
}

Expand Down
1 change: 1 addition & 0 deletions local_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
operation_ttl_seconds: 86400 # 24 hours
backup_ttl_seconds: 86400 * 7 # 1 week

db_connection:
connection_string: "grpcs://${YDB_NAME}:2135/local"
Expand Down

0 comments on commit aa60231

Please sign in to comment.