Skip to content

Commit

Permalink
implement scratch ListOperations version. General upserts into DB (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort authored Jul 22, 2024
1 parent 96d5717 commit 2c63a1b
Show file tree
Hide file tree
Showing 13 changed files with 609 additions and 239 deletions.
128 changes: 98 additions & 30 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"net"
"os"
"os/signal"
Expand Down Expand Up @@ -37,26 +38,42 @@ var (
// server is used to implement BackupService.
type server struct {
pb.UnimplementedBackupServiceServer
pb.UnimplementedOperationServiceServer
driver db.DBConnector
s3 config.S3Config
}

// GetBackup implements BackupService
func (s *server) GetBackup(ctx context.Context, in *pb.GetBackupRequest) (*pb.Backup, error) {
log.Printf("Received: %v", in.GetId())
backups, err := s.driver.SelectBackupsByStatus(ctx, types.BackupStatePending)
func (s *server) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (*pb.Backup, error) {
xlog.Debug(ctx, "GetBackup", zap.String("request", request.String()))
requestId, err := types.ParseObjectId(request.GetId())
if err != nil {
return nil, fmt.Errorf("failed to parse uuid %s: %w", request.GetId(), err)
}
backups, err := s.driver.SelectBackups(
ctx, queries.MakeReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{table_types.UUIDValue(requestId)},
},
),
),
)
if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return nil, err
}
for _, backup := range backups {
fmt.Println("backup:", backup.ID.String())
if len(backups) == 0 {
return nil, errors.New("No backup with such Id")
}
return &pb.Backup{Id: in.GetId()}, nil
xlog.Debug(ctx, "GetBackup", zap.String("backup", backups[0].String()))
return backups[0].Proto(), nil
}

func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb.Operation, error) {
xlog.Info(ctx, "MakeBackup called", zap.String("request", req.String()))
xlog.Info(ctx, "MakeBackup", zap.String("request", req.String()))

backup := types.Backup{
ContainerID: req.GetContainerId(),
Expand All @@ -78,8 +95,9 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
}

op := &types.TakeBackupOperation{
BackupId: backupID,
State: types.OperationStatePending,
BackupId: backupID,
ContainerID: req.ContainerId,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: req.GetEndpoint(),
DatabaseName: req.GetDatabaseName(),
Expand All @@ -94,33 +112,28 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
return nil, err
}

// TODO: get pb.Operation from Operation in one place
return &pb.Operation{
Id: operationID.String(),
ContainerId: req.ContainerId,
Type: string(op.GetType()),
DatabaseName: op.YdbConnectionParams.DatabaseName,
BackupId: backupID.String(),
SourcePaths: op.SourcePaths,
SourcePathsToExclude: op.SourcePathToExclude,
Status: types.OperationState(op.GetState()).Enum(),
}, nil
op.Id = operationID
return op.Proto(), nil
}

func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) {
log.Printf("ListBackups: %s", request.String())
xlog.Debug(ctx, "ListBackups", zap.String("request", request.String()))
backups, err := s.driver.SelectBackups(
ctx, queries.MakeReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter[string]{
Field: "container_id",
Values: []string{request.ContainerId},
queries.QueryFilter{
Field: "container_id",
Values: []table_types.Value{
table_types.StringValueFromString(request.ContainerId),
},
},
queries.QueryFilter[string]{
Field: "database",
Values: []string{request.DatabaseNameMask},
queries.QueryFilter{
Field: "database",
Values: []table_types.Value{
table_types.StringValueFromString(request.DatabaseNameMask),
},
IsLike: true,
},
),
Expand All @@ -139,6 +152,59 @@ func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest
}, nil
}

func (s *server) ListOperations(ctx context.Context, request *pb.ListOperationsRequest) (
*pb.ListOperationsResponse, error,
) {
xlog.Debug(ctx, "ListOperations", zap.String("request", request.String()))
operations, err := s.driver.SelectOperations(
ctx, queries.MakeReadTableQuery(
queries.WithTableName("Operations"),
queries.WithSelectFields(queries.AllOperationFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "container_id",
Values: []table_types.Value{
table_types.StringValueFromString(request.ContainerId),
},
},
queries.QueryFilter{
Field: "database",
Values: []table_types.Value{
table_types.StringValueFromString(request.DatabaseNameMask),
},
IsLike: true,
},
),
),
)
if err != nil {
return nil, fmt.Errorf("error getting backups: %w", err)
}
pbOperations := make([]*pb.Operation, 0, len(operations))
for _, operation := range operations {
pbOperations = append(pbOperations, operation.Proto())
}
return &pb.ListOperationsResponse{
Operations: pbOperations,
NextPageToken: strconv.Itoa(len(operations)),
}, nil
}

func (s *server) CancelOperation(ctx context.Context, request *pb.CancelOperationRequest) (*pb.Operation, error) {
//TODO implement me
panic("implement me")
}

func (s *server) GetOperation(ctx context.Context, request *pb.GetOperationRequest) (*pb.Operation, error) {
//TODO implement me
panic("implement me")
}

func (s *server) mustEmbedUnimplementedOperationServiceServer() {
//TODO implement me
panic("implement me")
}

func main() {
var confPath string

Expand Down Expand Up @@ -189,6 +255,7 @@ func main() {
defer server.driver.Close()

pb.RegisterBackupServiceServer(s, &server)
pb.RegisterOperationServiceServer(s, &server)

wg.Add(1)
go func() {
Expand All @@ -212,7 +279,8 @@ func main() {
}

err = handlersRegistry.Add(
types.OperationTypeRB, handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance),
types.OperationTypeRB,
handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance),
)

if err != nil {
Expand Down
Loading

0 comments on commit 2c63a1b

Please sign in to comment.