Skip to content

Commit

Permalink
Add retries for fails of creating a TakeBackupOperation
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Dec 12, 2024
1 parent 93964f2 commit 8c64851
Show file tree
Hide file tree
Showing 17 changed files with 660 additions and 213 deletions.
126 changes: 119 additions & 7 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,137 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"log"
"strings"
"time"
"ydbcp/cmd/integration/common"

"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
)

const (
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "0.0.0.0:50051"
databaseEndpoint = "grpcs://local-ydb:2135"
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "0.0.0.0:50051"
databaseEndpoint = "grpcs://local-ydb:2135"
invalidDatabaseEndpoint = "xzche"
)

func OpenYdb() *ydb.Driver {
dialTimeout := time.Second * 5
opts := []ydb.Option{
ydb.WithDialTimeout(dialTimeout),
ydb.WithTLSSInsecureSkipVerify(),
ydb.WithBalancer(balancers.SingleConn()),
ydb.WithAnonymousCredentials(),
}
driver, err := ydb.Open(context.Background(), databaseEndpoint+"/"+databaseName, opts...)
if err != nil {
log.Panicf("failed to open database: %v", err)
}
return driver
}

func TestInvalidDatabaseBackup(client pb.BackupServiceClient, opClient pb.OperationServiceClient) {
driver := OpenYdb()
opID := types.GenerateObjectID()
insertTBWRquery := fmt.Sprintf(`
UPSERT INTO Operations
(id, type, container_id, database, endpoint, created_at, status, retries, retries_count)
VALUES
("%s", "TBWR", "%s", "%s", "%s", CurrentUTCTimestamp(), "RUNNING", 0, 3)
`, opID, containerID, databaseName, invalidDatabaseEndpoint)
err := driver.Table().Do(
context.Background(), func(ctx context.Context, s table.Session) error {
_, res, err := s.Execute(
ctx,
table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
),
table.CommitTx(),
),
insertTBWRquery,
nil,
)
if err != nil {
return err
}
defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed
if res.ResultSetCount() != 0 {
return errors.New("expected 0 result set")
}
return res.Err()
},
)
if err != nil {
log.Panicf("failed to initialize YDBCP db: %v", err)
}
op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: opID,
})
if err != nil {
log.Panicf("failed to get operation: %v", err)
}
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 10) // to wait for four operation handlers

backups, err := client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backups.Backups) != 0 {
log.Panicf("expected no backups by this time, got %v", backups.Backups)
}
ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{
ContainerId: containerID,
DatabaseNameMask: databaseName,
OperationTypes: []string{types.OperationTypeTB.String()},
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if len(ops.Operations) != 0 {
log.Panicf("expected zero TB operations, got %d", len(ops.Operations))
}
tbwr, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: opID,
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if tbwr.Status != pb.Operation_ERROR {
log.Panicf("unexpected operation status: %v", tbwr.Status)
}
if tbwr.Message != "retry attempts exceeded limit: 3." {
log.Panicf("unexpected operation message: %v", tbwr.Message)
}
}

func main() {
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
Expand Down Expand Up @@ -63,6 +173,8 @@ func main() {
log.Panicf("unexpected error code: %v", err)
}

TestInvalidDatabaseBackup(client, opClient)

tbwr, err := client.MakeBackup(
context.Background(), &pb.MakeBackupRequest{
ContainerId: containerID,
Expand All @@ -84,7 +196,7 @@ func main() {
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 11) // to wait for operation handler
time.Sleep(time.Second * 3) // to wait for operation handler
backups, err = client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand Down Expand Up @@ -133,7 +245,7 @@ func main() {
if !done {
log.Panicln("failed to complete a backup in 30 seconds")
}
time.Sleep(time.Second * 11) // to wait for operation handler
time.Sleep(time.Second * 3) // to wait for operation handler
tbwr, err = opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: op.Id,
})
Expand Down
6 changes: 6 additions & 0 deletions cmd/integration/orm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation {
},
ScheduleID: &schedule,
Ttl: &ttl,
Retries: 0,
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 5}},
},
{
Expand All @@ -78,6 +79,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation {
},
ScheduleID: &schedule,
Ttl: &ttl,
Retries: 0,
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(ttl)}},
},
}
Expand Down Expand Up @@ -119,6 +121,7 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati
log.Panicf("operation %v corrupted after update and read\ngot %v", operation, *tbwr)
}
operation.Message = "xxx"
operation.IncRetries()
err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation))
if err != nil {
log.Panicf("failed to insert operation: %v", err)
Expand All @@ -127,6 +130,9 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati
if "xxx" != tbwr.Message {
log.Panicf("operation %v did not change after update", *tbwr)
}
if tbwr.Retries != 1 {
log.Panicf("operation %v did not update retries", *tbwr)
}
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func main() {
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)
processor.NewOperationProcessor(ctx, &wg, configInstance.ProcessorIntervalSeconds, dbConnector, handlersRegistry)
xlog.Info(ctx, "Initialized OperationProcessor")
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)
xlog.Info(ctx, "Created TtlWatcher")
Expand Down
49 changes: 44 additions & 5 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package backup_operations

import (
"context"
"errors"
"fmt"
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-sdk/v3"
"path"
Expand Down Expand Up @@ -137,11 +139,48 @@ func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, cli

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
return nil, NewEmptyDatabaseError(codes.FailedPrecondition, "empty list of paths for export")
}
return pathsForExport, nil
}

type ClientConnectionError struct {
err error
}

func NewClientConnectionError(code codes.Code, message string) *ClientConnectionError {
return &ClientConnectionError{err: status.Errorf(code, message)}
}

func (e ClientConnectionError) Error() string {
return e.err.Error()
}

type EmptyDatabaseError struct {
err error
}

func (e EmptyDatabaseError) Error() string {
return e.err.Error()
}

func NewEmptyDatabaseError(code codes.Code, message string) *EmptyDatabaseError {
return &EmptyDatabaseError{err: status.Errorf(code, message)}
}

func ErrToStatus(err error) error {
var ce *ClientConnectionError
var ee *EmptyDatabaseError

if errors.As(err, &ce) {
return ce.err
}
if errors.As(err, &ee) {
return ee.err
}
return err
}

func MakeBackup(
ctx context.Context,
clientConn client.ClientConnector,
Expand All @@ -161,9 +200,9 @@ func MakeBackup(
"endpoint of database is invalid or not allowed",
zap.String("DatabaseEndpoint", req.DatabaseEndpoint),
)
return nil, nil, status.Errorf(
codes.InvalidArgument,
"endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint,
return nil, nil, NewClientConnectionError(
codes.FailedPrecondition,
fmt.Sprintf("endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint),
)
}

Expand All @@ -176,7 +215,7 @@ func MakeBackup(
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
return nil, nil, NewClientConnectionError(codes.Unknown, fmt.Sprintf("can't open client connection, dsn %s", dsn))
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
Expand Down
17 changes: 9 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ type MetricsServerConfig struct {
}

type Config struct {
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"`
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"`
ProcessorIntervalSeconds int64 `yaml:"processor_interval_seconds" default:"10"`
}

var (
Expand Down
8 changes: 5 additions & 3 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
defer c.guard.Unlock()

queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
if queryBuilderMock.Operation != nil {
c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation
metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(*queryBuilderMock.Operation)
if queryBuilderMock.Operations != nil {
for _, op := range queryBuilderMock.Operations {
c.operations[op.GetID()] = op
metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(op)
}
}
if queryBuilderMock.Backup != nil {
c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup
Expand Down
7 changes: 7 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
parentOperationID *string
scheduleID *string
ttl *time.Duration
retries *uint32
retriesCount *uint32
maxBackoff *time.Duration
)
Expand All @@ -176,6 +177,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
named.Optional("parent_operation_id", &parentOperationID),
named.Optional("schedule_id", &scheduleID),
named.Optional("ttl", &ttl),
named.Optional("retries", &retries),
named.Optional("retries_count", &retriesCount),
named.Optional("retries_max_backoff", &maxBackoff),
)
Expand Down Expand Up @@ -277,6 +279,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
Retries: &pb.RetryConfig_Count{Count: *retriesCount},
}
}
retryNum := 0
if retries != nil {
retryNum = int(*retries)
}
return &types.TakeBackupWithRetryOperation{
TakeBackupOperation: types.TakeBackupOperation{
ID: operationId,
Expand All @@ -294,6 +300,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
},
ScheduleID: scheduleID,
Ttl: ttl,
Retries: retryNum,
RetryConfig: retryConfig,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.StringValueFromString(strings.Join(tbwr.SourcePathsToExclude, ",")),
)
}
d.AddValueParam("$retries", table_types.Uint32Value(uint32(tbwr.Retries)))
if tbwr.RetryConfig != nil {
switch r := tbwr.RetryConfig.Retries.(type) {
case *pb.RetryConfig_Count:
Expand Down Expand Up @@ -270,6 +271,9 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}
if tbwr, ok := operation.(*types.TakeBackupWithRetryOperation); ok {
d.AddValueParam("$retries", table_types.Uint32Value(uint32(tbwr.Retries)))
}
return d
}

Expand Down
Loading

0 comments on commit 8c64851

Please sign in to comment.