Skip to content

Commit

Permalink
feat(mvp): implement handler for make restore operation
Browse files Browse the repository at this point in the history
  • Loading branch information
Iuliia Sidorina committed Jul 19, 2024
1 parent a442424 commit 6412074
Show file tree
Hide file tree
Showing 9 changed files with 793 additions and 7 deletions.
1 change: 1 addition & 0 deletions cmd/ydbcp/config.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
ydbcp_db_connection_string: "grpc://localhost:2136/local"
operation_ttl_seconds: 86400 # 24 hours
15 changes: 12 additions & 3 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func main() {

lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
xlog.Error(ctx, "failed to listen", zap.Error(err))
return
}
s := grpc.NewServer()
Expand All @@ -204,10 +204,19 @@ func main() {

handlersRegistry := processor.NewOperationHandlerRegistry()
err = handlersRegistry.Add(
types.OperationType("TB"), handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()),
types.OperationTypeTB, handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()),
)
if err != nil {
log.Fatalf("failed to register handler: %v", err)
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
return
}

err = handlersRegistry.Add(
types.OperationTypeRB, handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance),
)

if err != nil {
xlog.Error(ctx, "failed to register RB handler", zap.Error(err))
return
}

Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type S3Config struct {
type Config struct {
YdbcpDbConnectionString string `yaml:"ydbcp_db_connection_string"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
}

func (config Config) ToString() (string, error) {
Expand Down
25 changes: 25 additions & 0 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ClientConnector interface {
ImportFromS3(ctx context.Context, clientDb *ydb.Driver, s3Settings *Ydb_Import.ImportFromS3Settings) (string, error)
GetOperationStatus(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error)
ForgetOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error)
CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error)
}

type ClientYdbConnector struct {
Expand Down Expand Up @@ -179,3 +180,27 @@ func (d *ClientYdbConnector) ForgetOperation(ctx context.Context, clientDb *ydb.

return response, nil
}

func (d *ClientYdbConnector) CancelOperation(ctx context.Context, clientDb *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error) {
if clientDb == nil {
return nil, fmt.Errorf("unititialized client db driver")
}

client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(clientDb))
xlog.Info(ctx, "Cancelling operation",
zap.String("id", operationId),
)

response, err := client.CancelOperation(
ctx,
&Ydb_Operations.CancelOperationRequest{
Id: operationId,
},
)

if err != nil {
return nil, fmt.Errorf("error cancelling operation: %s", err.Error())
}

return response, nil
}
43 changes: 41 additions & 2 deletions internal/connectors/client/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"

"github.com/google/uuid"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
Expand Down Expand Up @@ -95,7 +96,17 @@ func (m *MockClientConnector) ImportFromS3(_ context.Context, _ *ydb.Driver, s3S
func (m *MockClientConnector) GetOperationStatus(_ context.Context, _ *ydb.Driver, operationId string) (*Ydb_Operations.GetOperationResponse, error) {
op, exist := m.operations[operationId]
if !exist {
return nil, fmt.Errorf("operation %s doesn't exist", operationId)
issues := make([]*Ydb_Issue.IssueMessage, 1)
issues[0] = &Ydb_Issue.IssueMessage{
Message: "operation not found",
}

return &Ydb_Operations.GetOperationResponse{
Operation: &Ydb_Operations.Operation{
Status: Ydb.StatusIds_NOT_FOUND,
Issues: issues,
},
}, nil
}

return &Ydb_Operations.GetOperationResponse{
Expand All @@ -106,11 +117,39 @@ func (m *MockClientConnector) GetOperationStatus(_ context.Context, _ *ydb.Drive
func (m *MockClientConnector) ForgetOperation(_ context.Context, _ *ydb.Driver, operationId string) (*Ydb_Operations.ForgetOperationResponse, error) {
_, exist := m.operations[operationId]
if !exist {
return nil, fmt.Errorf("operation %s doesn't exist", operationId)
issues := make([]*Ydb_Issue.IssueMessage, 1)
issues[0] = &Ydb_Issue.IssueMessage{
Message: "operation not found",
}

return &Ydb_Operations.ForgetOperationResponse{
Status: Ydb.StatusIds_NOT_FOUND,
Issues: issues,
}, nil
}

delete(m.operations, operationId)
return &Ydb_Operations.ForgetOperationResponse{
Status: Ydb.StatusIds_SUCCESS,
}, nil
}

func (m *MockClientConnector) CancelOperation(_ context.Context, _ *ydb.Driver, operationId string) (*Ydb_Operations.CancelOperationResponse, error) {
op, exist := m.operations[operationId]
if !exist {
issues := make([]*Ydb_Issue.IssueMessage, 1)
issues[0] = &Ydb_Issue.IssueMessage{
Message: "operation not found",
}

return &Ydb_Operations.CancelOperationResponse{
Status: Ydb.StatusIds_NOT_FOUND,
Issues: issues,
}, nil
}

op.Status = Ydb.StatusIds_CANCELLED
return &Ydb_Operations.CancelOperationResponse{
Status: Ydb.StatusIds_SUCCESS,
}, nil
}
13 changes: 13 additions & 0 deletions internal/connectors/db/process_result_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ func ReadOperationFromResultSet(res result.Result) (types.Operation, error) {
YdbConnectionParams: types.GetYdbConnectionParams(*database),
YdbOperationId: *ydbOperationId,
}, nil
} else if operationType == types.OperationTypeRB {
if backupId == nil || database == nil || ydbOperationId == nil {
return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String())
}
return &types.RestoreBackupOperation{
Id: operationId,
BackupId: *backupId,
State: types.OperationState(operationState),
Message: "",
YdbConnectionParams: types.GetYdbConnectionParams(*database),
YdbOperationId: *ydbOperationId,
}, nil
}

return &types.GenericOperation{Id: operationId}, nil
}
199 changes: 199 additions & 0 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package handlers

import (
"context"
"fmt"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"go.uber.org/zap"
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
)

func MakeRBOperationHandler(db db.DBConnector, client client.ClientConnector, config config.Config) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return RBOperationHandler(ctx, op, db, client, config)
}
}

func valid(status Ydb.StatusIds_StatusCode) bool {
return status == Ydb.StatusIds_SUCCESS || status == Ydb.StatusIds_CANCELLED
}

func retriable(status Ydb.StatusIds_StatusCode) bool {
return status == Ydb.StatusIds_OVERLOADED || status == Ydb.StatusIds_UNAVAILABLE
}

func RBOperationHandler(
ctx context.Context,
operation types.Operation,
db db.DBConnector,
client client.ClientConnector,
config config.Config,
) error {
xlog.Info(ctx, "received operation",
zap.String("id", operation.GetId().String()),
zap.String("type", string(operation.GetType())),
zap.String("state", string(operation.GetState())),
zap.String("message", operation.GetMessage()),
)

if operation.GetType() != types.OperationTypeRB {
return fmt.Errorf("wrong type %s != %s for operation %s",
operation.GetType(), types.OperationTypeRB, types.OperationToString(operation),
)
}

mr, ok := operation.(*types.RestoreBackupOperation)
if !ok {
return fmt.Errorf("can't cast operation to RestoreBackupOperation %s", types.OperationToString(operation))
}

conn, err := client.Open(ctx, types.MakeYdbConnectionString(mr.YdbConnectionParams))
if err != nil {
return fmt.Errorf("error initializing client connector for operation #%s: %w",
mr.GetId().String(), err,
)
}

defer func() { _ = client.Close(ctx, conn) }()

xlog.Info(ctx, "getting operation status",
zap.String("id", mr.Id.String()),
zap.String("type", string(operation.GetType())),
zap.String("ydb_operation_id", mr.YdbOperationId),
)

opResponse, err := client.GetOperationStatus(ctx, conn, mr.YdbOperationId)
if err != nil {
if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() {
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperation(ctx, operation)
}

return fmt.Errorf(
"failed to get operation status for operation #%s, import operation id %s: %w",
mr.GetId().String(),
mr.YdbOperationId,
err,
)
}

if retriable(opResponse.GetOperation().GetStatus()) {
xlog.Info(ctx, "received retriable error",
zap.String("id", mr.Id.String()),
zap.String("type", string(operation.GetType())),
zap.String("ydb_operation_id", mr.YdbOperationId),
)

return nil
}

if !valid(opResponse.GetOperation().GetStatus()) {
operation.SetState(types.OperationStateError)
operation.SetMessage(fmt.Sprintf("Error status: %s, issues: %s",
opResponse.GetOperation().GetStatus(),
types.IssuesToString(opResponse.GetOperation().Issues)),
)
return db.UpdateOperation(ctx, operation)
}

switch mr.State {
case types.OperationStatePending:
{
if !opResponse.GetOperation().Ready {
if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() {
xlog.Info(ctx, "cancelling operation due to ttl",
zap.String("id", mr.Id.String()),
zap.String("type", string(operation.GetType())),
zap.String("ydb_operation_id", mr.YdbOperationId),
)

response, err := client.CancelOperation(ctx, conn, mr.YdbOperationId)
if err != nil {
return fmt.Errorf(
"error cancelling operation #%s, import operation id %s: %w",
mr.GetId().String(),
mr.YdbOperationId,
err,
)
}

if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS {
return fmt.Errorf(
"error cancelling operation id %s, import operation id %s, issues: %s",
mr.GetId().String(),
mr.YdbOperationId,
types.IssuesToString(response.GetIssues()),
)
}

operation.SetState(types.OperationStateCancelling)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperation(ctx, operation)
}

return nil
}

if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
} else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED {
operation.SetState(types.OperationStateCancelled)
operation.SetMessage("Pending operation wac cancelled")
}
}
case types.OperationStateCancelling:
{
if !opResponse.GetOperation().Ready {
if (mr.CreatedAt.Unix() + config.OperationTtlSeconds) <= time.Now().Unix() {
operation.SetState(types.OperationStateError)
operation.SetMessage("Operation deadline exceeded")
return db.UpdateOperation(ctx, operation)
}

return nil
}

if opResponse.GetOperation().Status == Ydb.StatusIds_SUCCESS {
operation.SetState(types.OperationStateDone)
operation.SetMessage("Operation was completed despite cancellation")
} else if opResponse.GetOperation().Status == Ydb.StatusIds_CANCELLED {
operation.SetState(types.OperationStateCancelled)
operation.SetMessage("Success")
}
}
}

xlog.Info(ctx, "forgetting operation",
zap.String("id", mr.Id.String()),
zap.String("type", string(operation.GetType())),
zap.String("ydb_operation_id", mr.YdbOperationId),
)

response, err := client.ForgetOperation(ctx, conn, mr.YdbOperationId)
if err != nil {
return fmt.Errorf(
"error forgetting operation #%s, import operation id %s: %w",
mr.GetId().String(),
mr.YdbOperationId,
err,
)
}

if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS {
return fmt.Errorf(
"error forgetting operation #%s, import operation id %s, issues: %s",
mr.GetId().String(),
mr.YdbOperationId,
types.IssuesToString(response.GetIssues()),
)
}

return db.UpdateOperation(ctx, operation)
}
Loading

0 comments on commit 6412074

Please sign in to comment.