Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries for fails of creating a TakeBackupOperation #114

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 119 additions & 7 deletions cmd/integration/make_backup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,137 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"log"
"strings"
"time"
"ydbcp/cmd/integration/common"

"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"

"google.golang.org/grpc"
)

const (
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "0.0.0.0:50051"
databaseEndpoint = "grpcs://local-ydb:2135"
containerID = "abcde"
databaseName = "/local"
ydbcpEndpoint = "0.0.0.0:50051"
databaseEndpoint = "grpcs://local-ydb:2135"
invalidDatabaseEndpoint = "xzche"
)

func OpenYdb() *ydb.Driver {
dialTimeout := time.Second * 5
opts := []ydb.Option{
ydb.WithDialTimeout(dialTimeout),
ydb.WithTLSSInsecureSkipVerify(),
ydb.WithBalancer(balancers.SingleConn()),
ydb.WithAnonymousCredentials(),
}
driver, err := ydb.Open(context.Background(), databaseEndpoint+"/"+databaseName, opts...)
if err != nil {
log.Panicf("failed to open database: %v", err)
}
return driver
}

func TestInvalidDatabaseBackup(client pb.BackupServiceClient, opClient pb.OperationServiceClient) {
driver := OpenYdb()
opID := types.GenerateObjectID()
insertTBWRquery := fmt.Sprintf(`
UPSERT INTO Operations
(id, type, container_id, database, endpoint, created_at, status, retries, retries_count)
VALUES
("%s", "TBWR", "%s", "%s", "%s", CurrentUTCTimestamp(), "RUNNING", 0, 3)
`, opID, containerID, databaseName, invalidDatabaseEndpoint)
err := driver.Table().Do(
context.Background(), func(ctx context.Context, s table.Session) error {
_, res, err := s.Execute(
ctx,
table.TxControl(
table.BeginTx(
table.WithSerializableReadWrite(),
),
table.CommitTx(),
),
insertTBWRquery,
nil,
)
if err != nil {
return err
}
defer func(res result.Result) {
err = res.Close()
if err != nil {
xlog.Error(ctx, "Error closing transaction result")
}
}(res) // result must be closed
if res.ResultSetCount() != 0 {
return errors.New("expected 0 result set")
}
return res.Err()
},
)
if err != nil {
log.Panicf("failed to initialize YDBCP db: %v", err)
}
op, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: opID,
})
if err != nil {
log.Panicf("failed to get operation: %v", err)
}
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 10) // to wait for four operation handlers

backups, err := client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
DatabaseNameMask: "%",
},
)
if err != nil {
log.Panicf("failed to list backups: %v", err)
}
if len(backups.Backups) != 0 {
log.Panicf("expected no backups by this time, got %v", backups.Backups)
}
ops, err := opClient.ListOperations(context.Background(), &pb.ListOperationsRequest{
ContainerId: containerID,
DatabaseNameMask: databaseName,
OperationTypes: []string{types.OperationTypeTB.String()},
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if len(ops.Operations) != 0 {
log.Panicf("expected zero TB operations, got %d", len(ops.Operations))
}
tbwr, err := opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: opID,
})
if err != nil {
log.Panicf("failed to list operations: %v", err)
}
if tbwr.Status != pb.Operation_ERROR {
log.Panicf("unexpected operation status: %v", tbwr.Status)
}
if tbwr.Message != "retry attempts exceeded limit: 3." {
log.Panicf("unexpected operation message: %v", tbwr.Message)
}
}

func main() {
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
Expand Down Expand Up @@ -63,6 +173,8 @@ func main() {
log.Panicf("unexpected error code: %v", err)
}

TestInvalidDatabaseBackup(client, opClient)

tbwr, err := client.MakeBackup(
context.Background(), &pb.MakeBackupRequest{
ContainerId: containerID,
Expand All @@ -84,7 +196,7 @@ func main() {
if op.GetType() != types.OperationTypeTBWR.String() {
log.Panicf("unexpected operation type: %v", op.GetType())
}
time.Sleep(time.Second * 11) // to wait for operation handler
time.Sleep(time.Second * 3) // to wait for operation handler
backups, err = client.ListBackups(
context.Background(), &pb.ListBackupsRequest{
ContainerId: containerID,
Expand Down Expand Up @@ -133,7 +245,7 @@ func main() {
if !done {
log.Panicln("failed to complete a backup in 30 seconds")
}
time.Sleep(time.Second * 11) // to wait for operation handler
time.Sleep(time.Second * 3) // to wait for operation handler
tbwr, err = opClient.GetOperation(context.Background(), &pb.GetOperationRequest{
Id: op.Id,
})
Expand Down
6 changes: 6 additions & 0 deletions cmd/integration/orm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation {
},
ScheduleID: &schedule,
Ttl: &ttl,
Retries: 0,
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_Count{Count: 5}},
},
{
Expand All @@ -78,6 +79,7 @@ func OperationsToInsert() []types.TakeBackupWithRetryOperation {
},
ScheduleID: &schedule,
Ttl: &ttl,
Retries: 0,
RetryConfig: &pb.RetryConfig{Retries: &pb.RetryConfig_MaxBackoff{MaxBackoff: durationpb.New(ttl)}},
},
}
Expand Down Expand Up @@ -119,6 +121,7 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati
log.Panicf("operation %v corrupted after update and read\ngot %v", operation, *tbwr)
}
operation.Message = "xxx"
operation.IncRetries()
err = ydbConn.ExecuteUpsert(ctx, queries.NewWriteTableQuery().WithUpdateOperation(&operation))
if err != nil {
log.Panicf("failed to insert operation: %v", err)
Expand All @@ -127,6 +130,9 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati
if "xxx" != tbwr.Message {
log.Panicf("operation %v did not change after update", *tbwr)
}
if tbwr.Retries != 1 {
log.Panicf("operation %v did not update retries", *tbwr)
}
}

func main() {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func main() {
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)
processor.NewOperationProcessor(ctx, &wg, configInstance.ProcessorIntervalSeconds, dbConnector, handlersRegistry)
xlog.Info(ctx, "Initialized OperationProcessor")
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)
xlog.Info(ctx, "Created TtlWatcher")
Expand Down
49 changes: 44 additions & 5 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package backup_operations

import (
"context"
"errors"
"fmt"
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-sdk/v3"
"path"
Expand Down Expand Up @@ -137,11 +139,48 @@ func ValidateSourcePaths(ctx context.Context, req MakeBackupInternalRequest, cli

if len(pathsForExport) == 0 {
xlog.Error(ctx, "empty list of paths for export")
return nil, status.Error(codes.FailedPrecondition, "empty list of paths for export")
return nil, NewEmptyDatabaseError(codes.FailedPrecondition, "empty list of paths for export")
}
return pathsForExport, nil
}

type ClientConnectionError struct {
err error
}

func NewClientConnectionError(code codes.Code, message string) *ClientConnectionError {
return &ClientConnectionError{err: status.Errorf(code, message)}
}

func (e ClientConnectionError) Error() string {
return e.err.Error()
}

type EmptyDatabaseError struct {
err error
}

func (e EmptyDatabaseError) Error() string {
return e.err.Error()
}

func NewEmptyDatabaseError(code codes.Code, message string) *EmptyDatabaseError {
return &EmptyDatabaseError{err: status.Errorf(code, message)}
}

func ErrToStatus(err error) error {
var ce *ClientConnectionError
var ee *EmptyDatabaseError

if errors.As(err, &ce) {
return ce.err
}
if errors.As(err, &ee) {
return ee.err
}
return err
}

func MakeBackup(
ctx context.Context,
clientConn client.ClientConnector,
Expand All @@ -161,9 +200,9 @@ func MakeBackup(
"endpoint of database is invalid or not allowed",
zap.String("DatabaseEndpoint", req.DatabaseEndpoint),
)
return nil, nil, status.Errorf(
codes.InvalidArgument,
"endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint,
return nil, nil, NewClientConnectionError(
codes.FailedPrecondition,
fmt.Sprintf("endpoint of database is invalid or not allowed, endpoint %s", req.DatabaseEndpoint),
)
}

Expand All @@ -176,7 +215,7 @@ func MakeBackup(
client, err := clientConn.Open(ctx, dsn)
if err != nil {
xlog.Error(ctx, "can't open client connection", zap.Error(err))
return nil, nil, status.Errorf(codes.Unknown, "can't open client connection, dsn %s", dsn)
return nil, nil, NewClientConnectionError(codes.Unknown, fmt.Sprintf("can't open client connection, dsn %s", dsn))
}
defer func() {
if err := clientConn.Close(ctx, client); err != nil {
Expand Down
17 changes: 9 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ type MetricsServerConfig struct {
}

type Config struct {
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"`
DBConnection YDBConnectionConfig `yaml:"db_connection"`
ClientConnection ClientConnectionConfig `yaml:"client_connection"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
Auth AuthConfig `yaml:"auth"`
GRPCServer GRPCServerConfig `yaml:"grpc_server"`
MetricsServer MetricsServerConfig `yaml:"metrics_server"`
SchedulesLimitPerDB int `yaml:"schedules_limit_per_db" default:"10"`
ProcessorIntervalSeconds int64 `yaml:"processor_interval_seconds" default:"10"`
}

var (
Expand Down
8 changes: 5 additions & 3 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
defer c.guard.Unlock()

queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
if queryBuilderMock.Operation != nil {
c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation
metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(*queryBuilderMock.Operation)
if queryBuilderMock.Operations != nil {
for _, op := range queryBuilderMock.Operations {
c.operations[op.GetID()] = op
metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(op)
}
}
if queryBuilderMock.Backup != nil {
c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup
Expand Down
7 changes: 7 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
parentOperationID *string
scheduleID *string
ttl *time.Duration
retries *uint32
retriesCount *uint32
maxBackoff *time.Duration
)
Expand All @@ -176,6 +177,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
named.Optional("parent_operation_id", &parentOperationID),
named.Optional("schedule_id", &scheduleID),
named.Optional("ttl", &ttl),
named.Optional("retries", &retries),
named.Optional("retries_count", &retriesCount),
named.Optional("retries_max_backoff", &maxBackoff),
)
Expand Down Expand Up @@ -277,6 +279,10 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
Retries: &pb.RetryConfig_Count{Count: *retriesCount},
}
}
retryNum := 0
if retries != nil {
retryNum = int(*retries)
}
return &types.TakeBackupWithRetryOperation{
TakeBackupOperation: types.TakeBackupOperation{
ID: operationId,
Expand All @@ -294,6 +300,7 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
},
ScheduleID: scheduleID,
Ttl: ttl,
Retries: retryNum,
RetryConfig: retryConfig,
}, nil
}
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.StringValueFromString(strings.Join(tbwr.SourcePathsToExclude, ",")),
)
}
d.AddValueParam("$retries", table_types.Uint32Value(uint32(tbwr.Retries)))
if tbwr.RetryConfig != nil {
switch r := tbwr.RetryConfig.Retries.(type) {
case *pb.RetryConfig_Count:
Expand Down Expand Up @@ -270,6 +271,9 @@ func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingle
table_types.TimestampValueFromTime(operation.GetUpdatedAt().AsTime()),
)
}
if tbwr, ok := operation.(*types.TakeBackupWithRetryOperation); ok {
d.AddValueParam("$retries", table_types.Uint32Value(uint32(tbwr.Retries)))
}
return d
}

Expand Down
Loading
Loading