Skip to content

Commit

Permalink
Change OperationHandler return value. Handlers have been moved to ano…
Browse files Browse the repository at this point in the history
…ther package (#18)
  • Loading branch information
bma13 authored Jul 16, 2024
1 parent afe3a8a commit 3896c7f
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 104 deletions.
16 changes: 13 additions & 3 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"syscall"
"ydbcp/internal/config"
configInit "ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/handlers"
"ydbcp/internal/processor"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

Expand Down Expand Up @@ -145,10 +148,12 @@ func main() {
s := grpc.NewServer()
reflection.Register(s)

ydbServer := server{driver: db.NewYdbConnector(config)}
defer ydbServer.driver.Close()
db := db.NewYdbConnector(config)

pb.RegisterBackupServiceServer(s, &ydbServer)
server := server{driver: db}
defer server.driver.Close()

pb.RegisterBackupServiceServer(s, &server)

wg.Add(1)
go func() {
Expand All @@ -162,6 +167,11 @@ func main() {
}
}()

handlersRegistry := processor.NewOperationHandlerRegistry()
handlersRegistry.Add(types.OperationType("TB"), handlers.MakeTBOperationHandler(db, client.NewClientYdbConnector()))

processor.NewOperationProcessor(ctx, &wg, db, handlersRegistry)

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type ClientConnector interface {
type ClientYdbConnector struct {
}

func NewClientYdbConnector() *ClientYdbConnector {
return &ClientYdbConnector{}
}

func (d *ClientYdbConnector) Open(ctx context.Context, dsn string) (*ydb.Driver, error) {
xlog.Info(ctx, "Connecting to client db", zap.String("dsn", dsn))
db, connErr := ydb.Open(ctx, dsn, ydb.WithAnonymousCredentials())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
package processor
package handlers

import (
"context"
"errors"
"fmt"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"go.uber.org/zap"
)

type OperationHandler func(context.Context, types.Operation) (
types.Operation, error,
)

func MakeTBOperationHandler(db db.DBConnector, client client.ClientConnector) OperationHandler {
return func(ctx context.Context, op types.Operation) (types.Operation, error) {
func MakeTBOperationHandler(db db.DBConnector, client client.ClientConnector) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
return TBOperationHandler(ctx, op, db, client)
}
}
Expand All @@ -28,19 +21,18 @@ func TBOperationHandler(
operation types.Operation,
db db.DBConnector,
client client.ClientConnector,
) (types.Operation, error) {
) error {
if operation.GetType() != types.OperationTypeTB {
return operation, errors.New("Passed wrong operation type to TBOperationHandler")
return fmt.Errorf("wrong operation type %s != %s", operation.GetType(), types.OperationTypeTB)
}
tb, ok := operation.(*types.TakeBackupOperation)
if !ok {
return operation, fmt.Errorf("can't cast Operation to TakeBackupOperation %s", types.OperationToString(operation))
return fmt.Errorf("can't cast Operation to TakeBackupOperation %s", types.OperationToString(operation))
}

conn, err := client.Open(ctx, types.MakeYdbConnectionString(tb.YdbConnectionParams))
if err != nil {
xlog.Error(ctx, "error initializing client db driver", zap.Error(err))
return operation, nil
return fmt.Errorf("error initializing client connector %w", err)
}

defer func() { _ = client.Close(ctx, conn) }()
Expand All @@ -50,21 +42,20 @@ func TBOperationHandler(
if err != nil {
//skip, write log
//upsert message into operation?
xlog.Error(
ctx,
"Failed to lookup operation status for",
zap.String("operation_id", tb.YdbOperationId),
zap.Error(err),
return fmt.Errorf(
"failed to lookup operation status for operation id %s, export operation id %s: %w",
tb.GetId().String(),
tb.YdbOperationId,
err,
)
return operation, nil
}
switch tb.State {
case types.OperationStatePending:
{
if !opInfo.GetOperation().Ready {
//if pending: return op, nil
//if backup deadline failed: cancel operation. (skip for now)
return operation, nil
return nil
}
if opInfo.GetOperation().Status == Ydb.StatusIds_SUCCESS {
//upsert into operations (id, status) values (id, done)?
Expand All @@ -73,8 +64,11 @@ func TBOperationHandler(
//.WithYUpdateOperation()
err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateAvailable)
if err != nil {
xlog.Error(ctx, "error updating backup table", zap.Error(err))
return operation, nil
return fmt.Errorf(
"error updating backup table, operation id %s: %w",
tb.GetId().String(),
err,
)
}
operation.SetState(types.OperationStateDone)
operation.SetMessage("Success")
Expand All @@ -83,8 +77,11 @@ func TBOperationHandler(
//upsert into operations (id, status, message) values (id, error, message)?
err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError)
if err != nil {
xlog.Error(ctx, "error updating backup table", zap.Error(err))
return operation, nil
return fmt.Errorf(
"error updating backup table, operation id %s: %w",
tb.GetId().String(),
err,
)
}
if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED {
operation.SetMessage("got CANCELLED status for PENDING operation")
Expand All @@ -93,51 +90,57 @@ func TBOperationHandler(
}
operation.SetState(types.OperationStateError)
}

response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId)
if err != nil {
xlog.Error(ctx, err.Error())
}

if response != nil && response.GetStatus() != Ydb.StatusIds_SUCCESS {
xlog.Error(ctx, "error forgetting operation", zap.Any("issues", response.GetIssues()))
}
}
case types.OperationStateCancelling:
{
if !opInfo.GetOperation().Ready {
//can this hang in cancelling state?
return operation, nil
return nil
}
if opInfo.GetOperation().Status == Ydb.StatusIds_CANCELLED {
//upsert into operations (id, status, message) values (id, cancelled)?
err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateCancelled)
if err != nil {
xlog.Error(ctx, "error updating backup table", zap.Error(err))
return operation, nil
return fmt.Errorf(
"error updating backup table, operation id %s: %w",
tb.GetId().String(),
err,
)
}
operation.SetState(types.OperationStateCancelled)
operation.SetMessage("Success")
} else {
//upsert into operations (id, status, message) values (id, error, error.message)?
err = db.UpdateBackup(ctx, tb.BackupId, types.BackupStateError)
if err != nil {
xlog.Error(ctx, "error updating backup table", zap.Error(err))
return operation, nil
return fmt.Errorf(
"error updating backup table, operation id %s: %w",
tb.GetId().String(),
err,
)
}
operation.SetState(types.OperationStateError)
operation.SetMessage(types.IssuesToString(opInfo.GetOperation().Issues))
}

response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId)
if err != nil {
xlog.Error(ctx, err.Error())
}

if response != nil && response.GetStatus() != Ydb.StatusIds_SUCCESS {
xlog.Error(ctx, "error forgetting operation", zap.Any("issues", response.GetIssues()))
}
}
}
return operation, nil
response, err := client.ForgetOperation(ctx, conn, tb.YdbOperationId)
if err != nil {
return fmt.Errorf(
"error forgetting operation id %s, export operation id %s: %w",
tb.GetId().String(),
tb.YdbOperationId,
err,
)
}

if response == nil || response.GetStatus() != Ydb.StatusIds_SUCCESS {
return fmt.Errorf(
"error forgetting operation id %s, export operation id %s, issues: %s",
tb.GetId().String(),
tb.YdbOperationId,
types.IssuesToString(response.GetIssues()),
)
}
return db.UpdateOperation(ctx, operation)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package processor
package handlers

import (
"context"
Expand Down Expand Up @@ -57,8 +57,10 @@ func TestTBOperationHandler(t *testing.T) {

handler := MakeTBOperationHandler(db, client)

result, err := handler(ctx, &tbOp)
err := handler(ctx, &tbOp)
assert.Empty(t, err)

result, err := db.GetOperation(ctx, opId)
assert.Empty(t, err)
assert.Equal(
t, result.GetState(), types.OperationStateDone,
Expand Down
43 changes: 12 additions & 31 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ type OperationProcessorImpl struct {
db db.DBConnector

runningOperations map[types.ObjectID]bool
results chan OperationHandlerResult
}

type OperationHandlerResult struct {
old types.Operation
new types.Operation
results chan types.ObjectID
}

type Option func(*OperationProcessorImpl)
Expand Down Expand Up @@ -66,7 +61,7 @@ func NewOperationProcessor(
db: db,
tickerProvider: ticker.NewRealTicker,
runningOperations: make(map[types.ObjectID]bool),
results: make(chan OperationHandlerResult),
results: make(chan types.ObjectID),
}
for _, opt := range options {
opt(op)
Expand All @@ -89,8 +84,8 @@ func (o *OperationProcessorImpl) run() {
return
case <-ticker.Chan():
o.processOperations()
case result := <-o.results:
o.handleOperationResult(result)
case operationID := <-o.results:
o.handleOperationResult(operationID)
}
}
}
Expand All @@ -108,9 +103,7 @@ func (o *OperationProcessorImpl) processOperations() {
}
}

func (o *OperationProcessorImpl) processOperation(
ctx context.Context, op types.Operation,
) {
func (o *OperationProcessorImpl) processOperation(ctx context.Context, op types.Operation) {
if _, exist := o.runningOperations[op.GetId()]; exist {
xlog.Debug(
ctx, "operation already running",
Expand All @@ -126,42 +119,30 @@ func (o *OperationProcessorImpl) processOperation(
ctx, "start operation handler",
zap.String("operation", types.OperationToString(op)),
)
genericOp := types.GenericOperation{
Id: op.GetId(),
Type: op.GetType(),
State: op.GetState(),
Message: op.GetMessage(),
}
result, err := o.handlers.Call(ctx, op)
err := o.handlers.Call(ctx, op)
if err != nil {
xlog.Error(
ctx, "operation handler failed",
zap.String("operation", types.OperationToString(op)),
)
}
o.results <- OperationHandlerResult{old: &genericOp, new: result}
o.results <- op.GetId()
}()
}

func (o *OperationProcessorImpl) handleOperationResult(result OperationHandlerResult) {
func (o *OperationProcessorImpl) handleOperationResult(operationID types.ObjectID) {
ctx, cancel := context.WithTimeout(o.ctx, o.handleOperationResultTimeout)
defer cancel()

xlog.Debug(
ctx,
"operation handler result",
zap.String("oldOperation", types.OperationToString(result.old)),
zap.String("newOperation", types.OperationToString(result.new)),
)
if _, exist := o.runningOperations[result.old.GetId()]; !exist {
xlog.Debug(ctx, "operation handler is finished", zap.String("operationID", operationID.String()))
if _, exist := o.runningOperations[operationID]; !exist {
xlog.Error(
ctx, "got result from not running operation",
zap.String("operation", types.OperationToString(result.old)),
zap.String("operationID", operationID.String()),
)
return
}
o.updateOperationState(ctx, result.old, result.new)
delete(o.runningOperations, result.old.GetId())
delete(o.runningOperations, operationID)
}

func (o *OperationProcessorImpl) updateOperationState(
Expand Down
5 changes: 3 additions & 2 deletions internal/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ func TestProcessor(t *testing.T) {
handlerCalled := make(chan struct{})
handlers.Add(
operationTypeTB,
func(ctx context.Context, op types.Operation) (types.Operation, error) {
func(ctx context.Context, op types.Operation) error {
xlog.Debug(
ctx, "TB handler called for operation",
zap.String("operation", types.OperationToString(op)),
)
op.SetState(types.OperationStateDone)
op.SetMessage("Success")
db.UpdateOperation(ctx, op)
handlerCalled <- struct{}{}
return op, nil
return nil
},
)

Expand Down
Loading

0 comments on commit 3896c7f

Please sign in to comment.