Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ydbcp): support ttl for backups #65

Merged
merged 1 commit into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"ydbcp/internal/server/services/operation"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
"ydbcp/internal/watchers/ttl_watcher"
ap "ydbcp/pkg/plugins/auth"

"go.uber.org/automaxprocs/maxprocs"
Expand Down Expand Up @@ -144,6 +145,7 @@ func main() {
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)

wg.Add(1)
go func() {
Expand Down
9 changes: 8 additions & 1 deletion internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func MakeBackup(
allowedEndpointDomains []string,
allowInsecureEndpoint bool,
req *pb.MakeBackupRequest, scheduleId *string,
subject string,
subject string, ttl *time.Duration,
) (*types.Backup, *types.TakeBackupOperation, error) {
if !IsAllowedEndpoint(req.DatabaseEndpoint, allowedEndpointDomains, allowInsecureEndpoint) {
xlog.Error(
Expand Down Expand Up @@ -144,6 +144,12 @@ func MakeBackup(
ctx = xlog.With(ctx, zap.String("ClientOperationID", clientOperationID))
xlog.Info(ctx, "Export operation started")

var expireAt *time.Time
if ttl != nil {
expireAt = new(time.Time)
*expireAt = time.Now().Add(*ttl)
}

now := timestamppb.Now()
backup := &types.Backup{
ID: types.GenerateObjectID(),
Expand All @@ -160,6 +166,7 @@ func MakeBackup(
Creator: subject,
},
ScheduleID: scheduleId,
ExpireAt: expireAt,
}

op := &types.TakeBackupOperation{
Expand Down
3 changes: 3 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
creator *string
completedAt *time.Time
createdAt *time.Time
expireAt *time.Time
)

err := res.ScanNamed(
Expand All @@ -93,6 +94,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
named.Optional("message", &message),
named.Optional("size", &size),
named.Optional("schedule_id", &scheduleId),
named.Optional("expire_at", &expireAt),

named.Optional("created_at", &createdAt),
named.Optional("completed_at", &completedAt),
Expand All @@ -116,6 +118,7 @@ func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) {
AuditInfo: auditFromDb(creator, createdAt, completedAt),
Size: Int64OrZero(size),
ScheduleID: scheduleId,
ExpireAt: expireAt,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/connectors/db/yql/queries/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
"initiated", "created_at", "completed_at",
"s3_endpoint", "s3_region", "s3_bucket",
"s3_path_prefix", "status", "paths", "message",
"size", "schedule_id",
"size", "schedule_id", "expire_at",
}
AllOperationFields = []string{
"id", "type", "container_id", "database", "endpoint", "backup_id",
Expand Down
6 changes: 5 additions & 1 deletion internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
"$message",
table_types.StringValueFromString(operation.GetMessage()),
)
if operation.GetAudit() != nil {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
d.AddValueParam(
"$completed_at",
table_types.TimestampValueFromTime(operation.GetAudit().GetCompletedAt().AsTime()),
Expand Down Expand Up @@ -266,6 +266,10 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
d.AddValueParam("$completed_at", table_types.TimestampValueFromTime(b.AuditInfo.CompletedAt.AsTime()))
}
}

if b.ExpireAt != nil {
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(*b.ExpireAt))
}
return d
}

Expand Down
1 change: 1 addition & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CREATE TABLE Backups (
status String,
message String,
size Int64,
expire_at Timestamp,

paths String,

Expand Down
9 changes: 8 additions & 1 deletion internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ func BackupScheduleHandler(
now := time.Now()
// do not handle last_backup_id status = (failed | deleted) for now, just do backups on cron.
if schedule.NextLaunch != nil && schedule.NextLaunch.Before(now) {
var ttl *time.Duration
if schedule.ScheduleSettings != nil && schedule.ScheduleSettings.Ttl != nil {
ttl = new(time.Duration)
*ttl = schedule.ScheduleSettings.Ttl.AsDuration()
}

b, op, err := backup_operations.MakeBackup(
ctx, clientConn, s3, allowedEndpointDomains, allowInsecureEndpoint, &pb.MakeBackupRequest{
ContainerId: schedule.ContainerID,
DatabaseName: schedule.DatabaseName,
DatabaseEndpoint: schedule.DatabaseEndpoint,
SourcePaths: schedule.SourcePaths,
SourcePathsToExclude: schedule.SourcePathsToExclude,
}, &schedule.ID, "YDBCP", //TODO: who to put as subject here?
}, &schedule.ID, types.OperationCreatorName, //TODO: who to put as subject here?
ttl,
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
ctx = xlog.With(ctx, zap.String("SubjectID", subject))

backup, op, err := backup_operations.MakeBackup(
ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject,
ctx, s.clientConn, s.s3, s.allowedEndpointDomains, s.allowInsecureEndpoint, req, nil, subject, nil,
)

if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions internal/types/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package types

import (
"fmt"
"google.golang.org/protobuf/types/known/timestamppb"
"time"

pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

Expand Down Expand Up @@ -37,6 +39,7 @@ type Backup struct {
AuditInfo *pb.AuditInfo
Size int64
ScheduleID *string
ExpireAt *time.Time
}

func (o *Backup) String() string {
Expand Down Expand Up @@ -71,6 +74,11 @@ func (o *Backup) Proto() *pb.Backup {
if o.ScheduleID != nil {
backup.ScheduleId = *o.ScheduleID
}

if o.ExpireAt != nil {
backup.ExpireAt = timestamppb.New(*o.ExpireAt)
}

return backup
}

Expand Down
2 changes: 1 addition & 1 deletion internal/types/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ const (
OperationTypeRB = OperationType("RB")
OperationTypeDB = OperationType("DB")
BackupTimestampFormat = "20060102_150405"
S3ForcePathStyle = true
OperationCreatorName = "ydbcp"
)

func OperationToString(o Operation) string {
Expand Down
103 changes: 103 additions & 0 deletions internal/watchers/ttl_watcher/ttl_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package ttl_watcher

import (
"context"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"sync"
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
"ydbcp/internal/watchers"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)

func NewTtlWatcher(
ctx context.Context,
wg *sync.WaitGroup,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
options ...watchers.Option,
) *watchers.WatcherImpl {
return watchers.NewWatcher(
ctx,
wg,
func(ctx context.Context, period time.Duration) {
TtlWatcherAction(ctx, period, db, queryBuilderFactory)
},
time.Hour,
"Ttl",
options...,
)
}

func TtlWatcherAction(
baseCtx context.Context,
period time.Duration,
db db.DBConnector,
queryBuilderFactory queries.WriteQueryBulderFactory,
) {
ctx, cancel := context.WithTimeout(baseCtx, period)
defer cancel()

backups, err := db.SelectBackups(
ctx, queries.NewReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(types.BackupStateAvailable),
table_types.StringValueFromString(types.BackupStateError),
table_types.StringValueFromString(types.BackupStateCancelled),
},
},
),
),
)

if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return
}

for _, backup := range backups {
if backup.ExpireAt != nil && backup.ExpireAt.Before(time.Now()) {
now := timestamppb.Now()
dbOp := &types.DeleteBackupOperation{
ContainerID: backup.ContainerID,
BackupID: backup.ID,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
DatabaseName: backup.DatabaseName,
Endpoint: backup.DatabaseEndpoint,
},
Audit: &pb.AuditInfo{
CreatedAt: now,
Creator: types.OperationCreatorName,
},
PathPrefix: backup.S3PathPrefix,
UpdatedAt: now,
}

backupToWrite := types.Backup{
ID: backup.ID,
Status: types.BackupStateDeleting,
}

err := db.ExecuteUpsert(
ctx, queryBuilderFactory().WithCreateOperation(dbOp).WithUpdateBackup(backupToWrite),
)

if err != nil {
xlog.Error(ctx, "can't create DeleteBackup operation", zap.String("BackupID", backup.ID), zap.Error(err))
}

xlog.Debug(ctx, "DeleteBackup operation was created successfully", zap.String("BackupID", backup.ID))
}
}
}
77 changes: 77 additions & 0 deletions internal/watchers/ttl_watcher/ttl_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ttl_watcher

import (
"context"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"sync"
"testing"
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
"ydbcp/internal/util/ticker"
"ydbcp/internal/watchers"
)

func TestTtlWatcher(t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Prepare fake clock and ticker
clock := clockwork.NewFakeClock()
var fakeTicker *ticker.FakeTicker
tickerInitialized := make(chan struct{})
tickerProvider := func(duration time.Duration) ticker.Ticker {
assert.Empty(t, fakeTicker, "ticker reuse")
fakeTicker = ticker.NewFakeTicker(duration)
tickerInitialized <- struct{}{}
return fakeTicker
}

// Prepare a backup
backupID := types.GenerateObjectID()
expireAt := time.Now()
backup := types.Backup{
ID: backupID,
Status: types.BackupStateAvailable,
ExpireAt: &expireAt,
}
backupMap := make(map[string]types.Backup)
backupMap[backupID] = backup

// Prepare mock db and ttl watcher
db := db.NewMockDBConnector(
db.WithBackups(backupMap),
)
_ = NewTtlWatcher(
ctx,
&wg,
db,
queries.NewWriteTableQueryMock,
watchers.WithTickerProvider(tickerProvider),
)

// Wait for the ticker to be initialized
select {
case <-ctx.Done():
t.Error("ticker not initialized")
case <-tickerInitialized:
assert.Equal(t, fakeTicker.Period, time.Hour, "incorrect period")
}

// Send a tick to the fake ticker
t0 := clock.Now().Add(time.Hour)
fakeTicker.Send(t0)

cancel()
wg.Wait()

// Check that DeleteBackup operation was created
ops, err := db.ActiveOperations(ctx)
assert.Empty(t, err)
assert.Equal(t, len(ops), 1)
assert.Equal(t, ops[0].GetType(), types.OperationTypeDB, "operation type should be DB")
assert.Equal(t, ops[0].GetState(), types.OperationStatePending, "operation state should be Pending")
}
Loading
Loading