Skip to content

Commit

Permalink
Merge pull request #17 from qrort/noop-api
Browse files Browse the repository at this point in the history
implement scratch ListBackups version. Support general querying of YDBCP db
  • Loading branch information
qrort authored Jul 16, 2024
2 parents 3896c7f + e38ef91 commit a442424
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 54 deletions.
55 changes: 48 additions & 7 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"net"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"ydbcp/internal/config"
configInit "ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/handlers"
"ydbcp/internal/processor"
"ydbcp/internal/types"
Expand Down Expand Up @@ -42,7 +44,7 @@ type server struct {
// GetBackup implements BackupService
func (s *server) GetBackup(ctx context.Context, in *pb.GetBackupRequest) (*pb.Backup, error) {
log.Printf("Received: %v", in.GetId())
backups, err := s.driver.SelectBackups(ctx, types.BackupStatePending)
backups, err := s.driver.SelectBackupsByStatus(ctx, types.BackupStatePending)
if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -105,6 +107,38 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
}, nil
}

func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) {
log.Printf("ListBackups: %s", request.String())
backups, err := s.driver.SelectBackups(
ctx, queries.MakeReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter[string]{
Field: "container_id",
Values: []string{request.ContainerId},
},
queries.QueryFilter[string]{
Field: "database",
Values: []string{request.DatabaseNameMask},
IsLike: true,
},
),
),
)
if err != nil {
return nil, fmt.Errorf("error getting backups: %w", err)
}
pbBackups := make([]*pb.Backup, 0, len(backups))
for _, backup := range backups {
pbBackups = append(pbBackups, backup.Proto())
}
return &pb.ListBackupsResponse{
Backups: pbBackups,
NextPageToken: strconv.Itoa(len(backups)),
}, nil
}

func main() {
var confPath string

Expand All @@ -126,13 +160,13 @@ func main() {
}
}(logger)

config, err := configInit.InitConfig(ctx, confPath)
configInstance, err := configInit.InitConfig(ctx, confPath)

if err != nil {
xlog.Error(ctx, "Unable to initialize config", zap.Error(err))
os.Exit(1)
}
confStr, err := config.ToString()
confStr, err := configInstance.ToString()
if err == nil {
xlog.Debug(
ctx, "Use configuration file",
Expand All @@ -144,13 +178,14 @@ func main() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
return
}
s := grpc.NewServer()
reflection.Register(s)

db := db.NewYdbConnector(config)
dbConnector := db.NewYdbConnector(configInstance)

server := server{driver: db}
server := server{driver: dbConnector}
defer server.driver.Close()

pb.RegisterBackupServiceServer(s, &server)
Expand All @@ -168,9 +203,15 @@ func main() {
}()

handlersRegistry := processor.NewOperationHandlerRegistry()
handlersRegistry.Add(types.OperationType("TB"), handlers.MakeTBOperationHandler(db, client.NewClientYdbConnector()))
err = handlersRegistry.Add(
types.OperationType("TB"), handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()),
)
if err != nil {
log.Fatalf("failed to register handler: %v", err)
return
}

processor.NewOperationProcessor(ctx, &wg, db, handlersRegistry)
processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)

wg.Add(1)
go func() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/google/uuid v1.6.0
github.com/jonboulle/clockwork v0.3.0
github.com/stretchr/testify v1.8.1
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7
github.com/ydb-platform/ydb-go-sdk/v3 v3.74.2
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.27.0
Expand All @@ -20,7 +21,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-jwt/jwt/v4 v4.4.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.6.0 // indirect
Expand Down
60 changes: 48 additions & 12 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ var ErrUnimplemented = errors.New("unimplemented")

type DBConnector interface {
GetTableClient() table.Client
SelectBackups(ctx context.Context, backupStatus string) ([]*types.Backup, error)
SelectBackups(ctx context.Context, queryBuilder queries.ReadTableQuery) (
[]*types.Backup, error,
)
SelectBackupsByStatus(ctx context.Context, backupStatus string) ([]*types.Backup, error)
ActiveOperations(context.Context) ([]types.Operation, error)
UpdateOperation(context.Context, types.Operation) error
CreateOperation(context.Context, types.Operation) (types.ObjectID, error)
Expand Down Expand Up @@ -89,21 +92,25 @@ func (d *YdbConnector) Close() {
func DoStructSelect[T any](
ctx context.Context,
d *YdbConnector,
query string,
queryBuilder queries.ReadTableQuery,
readLambda StructFromResultSet[T],
) ([]*T, error) {
var (
entities []*T
)
err := d.GetTableClient().Do(
queryFormat, err := queryBuilder.FormatQuery()
if err != nil {
return nil, err
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
var (
res result.Result
)
_, res, err = s.Execute(
ctx,
readTx,
query, table.NewQueryParameters(),
queryFormat.QueryText, queryFormat.QueryParams,
)
if err != nil {
return err
Expand Down Expand Up @@ -141,21 +148,25 @@ func DoStructSelect[T any](
func DoInterfaceSelect[T any](
ctx context.Context,
d *YdbConnector,
query string,
queryBuilder queries.ReadTableQuery,
readLambda InterfaceFromResultSet[T],
) ([]T, error) {
var (
entities []T
)
err := d.GetTableClient().Do(
queryFormat, err := queryBuilder.FormatQuery()
if err != nil {
return nil, err
}
err = d.GetTableClient().Do(
ctx, func(ctx context.Context, s table.Session) (err error) {
var (
res result.Result
)
_, res, err = s.Execute(
ctx,
readTx,
query, table.NewQueryParameters(),
queryFormat.QueryText, queryFormat.QueryParams,
)
if err != nil {
return err
Expand Down Expand Up @@ -214,13 +225,33 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, query string, paramete
return nil
}

func (d *YdbConnector) SelectBackups(
func (d *YdbConnector) SelectBackupsByStatus(
ctx context.Context, backupStatus string,
) ([]*types.Backup, error) {
return DoStructSelect[types.Backup](
ctx,
d,
queries.SelectEntitiesQuery("Backups", backupStatus),
queries.MakeReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields("id", "operation_id"),
queries.WithQueryFilters[string](
queries.QueryFilter[string]{
Field: "Status",
Values: []string{backupStatus},
},
),
),
ReadBackupFromResultSet,
)
}

func (d *YdbConnector) SelectBackups(
ctx context.Context, queryBuilder queries.ReadTableQuery,
) ([]*types.Backup, error) {
return DoStructSelect[types.Backup](
ctx,
d,
queryBuilder,
ReadBackupFromResultSet,
)
}
Expand All @@ -231,9 +262,14 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
return DoInterfaceSelect[types.Operation](
ctx,
d,
queries.SelectEntitiesQuery(
"Operations", types.OperationStatePending.String(),
types.OperationStateCancelling.String(),
queries.MakeReadTableQuery(
queries.WithTableName("Operations"),
queries.WithQueryFilters(
queries.QueryFilter[types.OperationState]{
Field: "Status",
Values: []types.OperationState{types.OperationStatePending, types.OperationStateCancelling},
},
),
),
ReadOperationFromResultSet,
)
Expand Down
17 changes: 14 additions & 3 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"

"github.com/ydb-platform/ydb-go-sdk/v3/table"
Expand Down Expand Up @@ -40,7 +41,17 @@ func WithBackups(backups map[types.ObjectID]types.Backup) Option {
}

func (c *MockDBConnector) SelectBackups(
ctx context.Context, backupStatus string,
_ context.Context, _ queries.ReadTableQuery,
) ([]*types.Backup, error) {
backups := make([]*types.Backup, 0, len(c.backups))
for _, backup := range c.backups {
backups = append(backups, &backup)
}
return backups, nil
}

func (c *MockDBConnector) SelectBackupsByStatus(
_ context.Context, _ string,
) ([]*types.Backup, error) {
backups := make([]*types.Backup, 0, len(c.backups))
for _, backup := range c.backups {
Expand All @@ -50,7 +61,7 @@ func (c *MockDBConnector) SelectBackups(
}

func (c *MockDBConnector) UpdateBackup(
ctx context.Context, id types.ObjectID, backupStatus string,
_ context.Context, id types.ObjectID, backupStatus string,
) error {
if _, ok := c.backups[id]; !ok {
return errors.New(fmt.Sprintf("no backup found for id %v", id))
Expand All @@ -66,7 +77,7 @@ func (c *MockDBConnector) GetTableClient() table.Client {
return nil
}

func (c *MockDBConnector) CreateBackup(ctx context.Context, backup types.Backup) (types.ObjectID, error) {
func (c *MockDBConnector) CreateBackup(_ context.Context, backup types.Backup) (types.ObjectID, error) {
var id types.ObjectID
for {
id = types.GenerateObjectID()
Expand Down
Loading

0 comments on commit a442424

Please sign in to comment.