diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 0ee41087..66a8bc5b 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -12,7 +12,10 @@ import ( "syscall" "ydbcp/internal/config" configInit "ydbcp/internal/config" + "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" + "ydbcp/internal/handlers" + "ydbcp/internal/processor" "ydbcp/internal/types" "ydbcp/internal/util/xlog" @@ -145,10 +148,12 @@ func main() { s := grpc.NewServer() reflection.Register(s) - ydbServer := server{driver: db.NewYdbConnector(config)} - defer ydbServer.driver.Close() + db := db.NewYdbConnector(config) - pb.RegisterBackupServiceServer(s, &ydbServer) + server := server{driver: db} + defer server.driver.Close() + + pb.RegisterBackupServiceServer(s, &server) wg.Add(1) go func() { @@ -162,6 +167,11 @@ func main() { } }() + handlersRegistry := processor.NewOperationHandlerRegistry() + handlersRegistry.Add(types.OperationType("TB"), handlers.MakeTBOperationHandler(db, client.NewClientYdbConnector())) + + processor.NewOperationProcessor(ctx, &wg, db, handlersRegistry) + wg.Add(1) go func() { defer wg.Done() diff --git a/internal/connectors/client/connector.go b/internal/connectors/client/connector.go index 26a76dc4..8bcd2c7b 100644 --- a/internal/connectors/client/connector.go +++ b/internal/connectors/client/connector.go @@ -31,6 +31,10 @@ type ClientConnector interface { type ClientYdbConnector struct { } +func NewClientYdbConnector() *ClientYdbConnector { + return &ClientYdbConnector{} +} + func (d *ClientYdbConnector) Open(ctx context.Context, dsn string) (*ydb.Driver, error) { xlog.Info(ctx, "Connecting to client db", zap.String("dsn", dsn)) db, connErr := ydb.Open(ctx, dsn, ydb.WithAnonymousCredentials()) diff --git a/internal/processor/operation_handlers.go b/internal/handlers/take_backup.go similarity index 60% rename from internal/processor/operation_handlers.go rename to internal/handlers/take_backup.go index fffa0601..c74e4274 100644 --- a/internal/processor/operation_handlers.go +++ b/internal/handlers/take_backup.go @@ -1,24 +1,17 @@ -package processor +package handlers import ( "context" - "errors" "fmt" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" "ydbcp/internal/types" - "ydbcp/internal/util/xlog" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "go.uber.org/zap" ) -type OperationHandler func(context.Context, types.Operation) ( - types.Operation, error, -) - -func MakeTBOperationHandler(db db.DBConnector, client client.ClientConnector) OperationHandler { - return func(ctx context.Context, op types.Operation) (types.Operation, error) { +func MakeTBOperationHandler(db db.DBConnector, client client.ClientConnector) types.OperationHandler { + return func(ctx context.Context, op types.Operation) error { return TBOperationHandler(ctx, op, db, client) } } @@ -28,19 +21,18 @@ func TBOperationHandler( operation types.Operation, db db.DBConnector, client client.ClientConnector, -) (types.Operation, error) { +) error { if operation.GetType() != types.OperationTypeTB { - return operation, errors.New("Passed wrong operation type to TBOperationHandler") + return fmt.Errorf("wrong operation type %s != %s", operation.GetType(), types.OperationTypeTB) } tb, ok := operation.(*types.TakeBackupOperation) if !ok { - return operation, fmt.Errorf("can't cast Operation to TakeBackupOperation %s", types.OperationToString(operation)) + return fmt.Errorf("can't cast Operation to TakeBackupOperation %s", types.OperationToString(operation)) } conn, err := client.Open(ctx, types.MakeYdbConnectionString(tb.YdbConnectionParams)) if err != nil { - xlog.Error(ctx, "error initializing client db driver", zap.Error(err)) - return operation, nil + return fmt.Errorf("error initializing client connector %w", err) } defer func() { _ = client.Close(ctx, conn) }() @@ -50,13 +42,12 @@ func TBOperationHandler( if err != nil { //skip, write log //upsert message into operation? - xlog.Error( - ctx, - "Failed to lookup operation status for", - zap.String("operation_id", tb.YdbOperationId), - zap.Error(err), + return fmt.Errorf( + "failed to lookup operation status for operation id %s, export operation id %s: %w", + tb.GetId().String(), + tb.YdbOperationId, + err, ) - return operation, nil } switch tb.State { case types.OperationStatePending: @@ -64,7 +55,7 @@ func TBOperationHandler( if !opInfo.GetOperation().Ready { //if pending: return op, nil //if backup deadline failed: cancel operation. (skip for now) - return operation, nil + return nil } if opInfo.GetOperation().Status == Ydb.StatusIds_SUCCESS { //upsert into operations (id, status) values (id, done)? @@ -73,8 +64,11 @@ func TBOperationHandler( //.WithYUpdateOperation() err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateAvailable) if err != nil { - xlog.Error(ctx, "error updating backup table", zap.Error(err)) - return operation, nil + return fmt.Errorf( + "error updating backup table, operation id %s: %w", + tb.GetId().String(), + err, + ) } operation.SetState(types.OperationStateDone) operation.SetMessage("Success") @@ -83,8 +77,11 @@ func TBOperationHandler( //upsert into operations (id, status, message) values (id, error, message)? err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError) if err != nil { - xlog.Error(ctx, "error updating backup table", zap.Error(err)) - return operation, nil + return fmt.Errorf( + "error updating backup table, operation id %s: %w", + tb.GetId().String(), + err, + ) } if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED { operation.SetMessage("got CANCELLED status for PENDING operation") @@ -93,28 +90,22 @@ func TBOperationHandler( } operation.SetState(types.OperationStateError) } - - response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId) - if err != nil { - xlog.Error(ctx, err.Error()) - } - - if response != nil && response.GetStatus() != Ydb.StatusIds_SUCCESS { - xlog.Error(ctx, "error forgetting operation", zap.Any("issues", response.GetIssues())) - } } case types.OperationStateCancelling: { if !opInfo.GetOperation().Ready { //can this hang in cancelling state? - return operation, nil + return nil } if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED { //upsert into operations (id, status, message) values (id, cancelled)? err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateCancelled) if err != nil { - xlog.Error(ctx, "error updating backup table", zap.Error(err)) - return operation, nil + return fmt.Errorf( + "error updating backup table, operation id %s: %w", + tb.GetId().String(), + err, + ) } operation.SetState(types.OperationStateCancelled) operation.SetMessage("Success") @@ -122,22 +113,34 @@ func TBOperationHandler( //upsert into operations (id, status, message) values (id, error, error.message)? err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError) if err != nil { - xlog.Error(ctx, "error updating backup table", zap.Error(err)) - return operation, nil + return fmt.Errorf( + "error updating backup table, operation id %s: %w", + tb.GetId().String(), + err, + ) } operation.SetState(types.OperationStateError) operation.SetMessage(types.IssuesToString(opInfo.GetOperation().Issues)) } - - response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId) - if err != nil { - xlog.Error(ctx, err.Error()) - } - - if response != nil && response.GetStatus() != Ydb.StatusIds_SUCCESS { - xlog.Error(ctx, "error forgetting operation", zap.Any("issues", response.GetIssues())) - } } } - return operation, nil + response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId) + if err != nil { + return fmt.Errorf( + "error forgetting operation id %s, export operation id %s: %w", + tb.GetId().String(), + tb.YdbOperationId, + err, + ) + } + + if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS { + return fmt.Errorf( + "error forgetting operation id %s, export operation id %s, issues: %s", + tb.GetId().String(), + tb.YdbOperationId, + types.IssuesToString(response.GetIssues()), + ) + } + return db.UpdateOperation(ctx, operation) } diff --git a/internal/processor/operation_handlers_test.go b/internal/handlers/take_backup_test.go similarity index 94% rename from internal/processor/operation_handlers_test.go rename to internal/handlers/take_backup_test.go index 5cc20fa4..4ccc95e7 100644 --- a/internal/processor/operation_handlers_test.go +++ b/internal/handlers/take_backup_test.go @@ -1,4 +1,4 @@ -package processor +package handlers import ( "context" @@ -57,8 +57,10 @@ func TestTBOperationHandler(t *testing.T) { handler := MakeTBOperationHandler(db, client) - result, err := handler(ctx, &tbOp) + err := handler(ctx, &tbOp) + assert.Empty(t, err) + result, err := db.GetOperation(ctx, opId) assert.Empty(t, err) assert.Equal( t, result.GetState(), types.OperationStateDone, diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 2fa20f73..afc17cb7 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -24,12 +24,7 @@ type OperationProcessorImpl struct { db db.DBConnector runningOperations map[types.ObjectID]bool - results chan OperationHandlerResult -} - -type OperationHandlerResult struct { - old types.Operation - new types.Operation + results chan types.ObjectID } type Option func(*OperationProcessorImpl) @@ -66,7 +61,7 @@ func NewOperationProcessor( db: db, tickerProvider: ticker.NewRealTicker, runningOperations: make(map[types.ObjectID]bool), - results: make(chan OperationHandlerResult), + results: make(chan types.ObjectID), } for _, opt := range options { opt(op) @@ -89,8 +84,8 @@ func (o *OperationProcessorImpl) run() { return case <-ticker.Chan(): o.processOperations() - case result := <-o.results: - o.handleOperationResult(result) + case operationID := <-o.results: + o.handleOperationResult(operationID) } } } @@ -108,9 +103,7 @@ func (o *OperationProcessorImpl) processOperations() { } } -func (o *OperationProcessorImpl) processOperation( - ctx context.Context, op types.Operation, -) { +func (o *OperationProcessorImpl) processOperation(ctx context.Context, op types.Operation) { if _, exist := o.runningOperations[op.GetId()]; exist { xlog.Debug( ctx, "operation already running", @@ -126,42 +119,30 @@ func (o *OperationProcessorImpl) processOperation( ctx, "start operation handler", zap.String("operation", types.OperationToString(op)), ) - genericOp := types.GenericOperation{ - Id: op.GetId(), - Type: op.GetType(), - State: op.GetState(), - Message: op.GetMessage(), - } - result, err := o.handlers.Call(ctx, op) + err := o.handlers.Call(ctx, op) if err != nil { xlog.Error( ctx, "operation handler failed", zap.String("operation", types.OperationToString(op)), ) } - o.results <- OperationHandlerResult{old: &genericOp, new: result} + o.results <- op.GetId() }() } -func (o *OperationProcessorImpl) handleOperationResult(result OperationHandlerResult) { +func (o *OperationProcessorImpl) handleOperationResult(operationID types.ObjectID) { ctx, cancel := context.WithTimeout(o.ctx, o.handleOperationResultTimeout) defer cancel() - xlog.Debug( - ctx, - "operation handler result", - zap.String("oldOperation", types.OperationToString(result.old)), - zap.String("newOperation", types.OperationToString(result.new)), - ) - if _, exist := o.runningOperations[result.old.GetId()]; !exist { + xlog.Debug(ctx, "operation handler is finished", zap.String("operationID", operationID.String())) + if _, exist := o.runningOperations[operationID]; !exist { xlog.Error( ctx, "got result from not running operation", - zap.String("operation", types.OperationToString(result.old)), + zap.String("operationID", operationID.String()), ) return } - o.updateOperationState(ctx, result.old, result.new) - delete(o.runningOperations, result.old.GetId()) + delete(o.runningOperations, operationID) } func (o *OperationProcessorImpl) updateOperationState( diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index edd21990..9064042c 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -37,15 +37,16 @@ func TestProcessor(t *testing.T) { handlerCalled := make(chan struct{}) handlers.Add( operationTypeTB, - func(ctx context.Context, op types.Operation) (types.Operation, error) { + func(ctx context.Context, op types.Operation) error { xlog.Debug( ctx, "TB handler called for operation", zap.String("operation", types.OperationToString(op)), ) op.SetState(types.OperationStateDone) op.SetMessage("Success") + db.UpdateOperation(ctx, op) handlerCalled <- struct{}{} - return op, nil + return nil }, ) diff --git a/internal/processor/registry.go b/internal/processor/registry.go index 93a46ecf..36d41438 100644 --- a/internal/processor/registry.go +++ b/internal/processor/registry.go @@ -7,23 +7,23 @@ import ( ) type OperationHandlerRegistry interface { - Add(types.OperationType, OperationHandler) error - Call(context.Context, types.Operation) (types.Operation, error) + Add(types.OperationType, types.OperationHandler) error + Call(context.Context, types.Operation) error } type OperationHandlerRegistryImpl struct { - handlers map[types.OperationType]OperationHandler + handlers map[types.OperationType]types.OperationHandler } func NewOperationHandlerRegistry() *OperationHandlerRegistryImpl { return &OperationHandlerRegistryImpl{ - handlers: make(map[types.OperationType]OperationHandler), + handlers: make(map[types.OperationType]types.OperationHandler), } } func (r OperationHandlerRegistryImpl) Add( operationType types.OperationType, - handler OperationHandler, + handler types.OperationHandler, ) error { if _, ok := r.handlers[operationType]; ok { return fmt.Errorf("OperationType %s already registred", operationType) @@ -32,14 +32,11 @@ func (r OperationHandlerRegistryImpl) Add( return nil } -func (r OperationHandlerRegistryImpl) Call( - ctx context.Context, - op types.Operation, -) (types.Operation, error) { +func (r OperationHandlerRegistryImpl) Call(ctx context.Context, op types.Operation) error { operationType := op.GetType() handler, ok := r.handlers[operationType] if !ok { - return op, fmt.Errorf("unknown OperationType %s", operationType) + return fmt.Errorf("unknown OperationType %s", operationType) } return handler(ctx, op) } diff --git a/internal/processor/registry_test.go b/internal/processor/registry_test.go index e468a199..ac1bf6d9 100644 --- a/internal/processor/registry_test.go +++ b/internal/processor/registry_test.go @@ -14,25 +14,27 @@ func TestOperationHandlerRegistry(t *testing.T) { op := &types.GenericOperation{ Type: types.OperationType("UNKNOWN"), } - _, err := registry.Call(ctx, op) + err := registry.Call(ctx, op) assert.NotEmpty(t, err, "unknown operation type should raise error") opType := types.OperationType("TEST") expectedMessage := "Test message" + var result types.Operation err = registry.Add( opType, - func(ctx context.Context, op types.Operation) (types.Operation, error) { + func(ctx context.Context, op types.Operation) error { op.SetState(types.OperationStateDone) op.SetMessage(expectedMessage) - return op, nil + result = op + return nil }, ) assert.Empty(t, err) err = registry.Add( opType, - func(_ context.Context, op types.Operation) (types.Operation, error) { - return op, nil + func(_ context.Context, op types.Operation) error { + return nil }, ) assert.NotEmpty(t, err, "registry must prohibit re-register handlers") @@ -42,7 +44,8 @@ func TestOperationHandlerRegistry(t *testing.T) { Type: opType, State: types.OperationStatePending, } - result, err := registry.Call(ctx, op) + err = registry.Call(ctx, op) + assert.Empty(t, err) assert.Equal(t, result.GetState(), types.OperationStateDone) assert.Equal(t, result.GetMessage(), expectedMessage) } diff --git a/internal/types/backup.go b/internal/types/backup.go index be35e59f..8ea14fe1 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -212,3 +212,5 @@ type YdbConnectionParams struct { func MakeYdbConnectionString(params YdbConnectionParams) string { return params.Endpoint + params.DatabaseName } + +type OperationHandler func(context.Context, Operation) error