From e38ef9126695f43ae73402d222ee0259542c3354 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Fri, 12 Jul 2024 17:52:43 +0300 Subject: [PATCH] implement scratch ListBackups version. General querying of YDBCP db --- cmd/ydbcp/main.go | 55 ++++++- go.mod | 2 +- internal/connectors/db/connector.go | 60 ++++++-- internal/connectors/db/mock.go | 17 ++- internal/connectors/db/yql/queries/read.go | 136 ++++++++++++++++++ .../connectors/db/yql/queries/read_test.go | 45 ++++++ .../db/yql/queries/{backup.go => write.go} | 26 ---- internal/handlers/take_backup_test.go | 10 +- internal/types/backup.go | 14 ++ 9 files changed, 311 insertions(+), 54 deletions(-) create mode 100644 internal/connectors/db/yql/queries/read.go create mode 100644 internal/connectors/db/yql/queries/read_test.go rename internal/connectors/db/yql/queries/{backup.go => write.go} (69%) diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 66a8bc5b..9231d732 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -8,12 +8,14 @@ import ( "net" "os" "os/signal" + "strconv" "sync" "syscall" "ydbcp/internal/config" configInit "ydbcp/internal/config" "ydbcp/internal/connectors/client" "ydbcp/internal/connectors/db" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/handlers" "ydbcp/internal/processor" "ydbcp/internal/types" @@ -42,7 +44,7 @@ type server struct { // GetBackup implements BackupService func (s *server) GetBackup(ctx context.Context, in *pb.GetBackupRequest) (*pb.Backup, error) { log.Printf("Received: %v", in.GetId()) - backups, err := s.driver.SelectBackups(ctx, types.BackupStatePending) + backups, err := s.driver.SelectBackupsByStatus(ctx, types.BackupStatePending) if err != nil { xlog.Error(ctx, "can't select backups", zap.Error(err)) return nil, err @@ -105,6 +107,38 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb }, nil } +func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) { + log.Printf("ListBackups: %s", request.String()) + backups, err := s.driver.SelectBackups( + ctx, queries.MakeReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter[string]{ + Field: "container_id", + Values: []string{request.ContainerId}, + }, + queries.QueryFilter[string]{ + Field: "database", + Values: []string{request.DatabaseNameMask}, + IsLike: true, + }, + ), + ), + ) + if err != nil { + return nil, fmt.Errorf("error getting backups: %w", err) + } + pbBackups := make([]*pb.Backup, 0, len(backups)) + for _, backup := range backups { + pbBackups = append(pbBackups, backup.Proto()) + } + return &pb.ListBackupsResponse{ + Backups: pbBackups, + NextPageToken: strconv.Itoa(len(backups)), + }, nil +} + func main() { var confPath string @@ -126,13 +160,13 @@ func main() { } }(logger) - config, err := configInit.InitConfig(ctx, confPath) + configInstance, err := configInit.InitConfig(ctx, confPath) if err != nil { xlog.Error(ctx, "Unable to initialize config", zap.Error(err)) os.Exit(1) } - confStr, err := config.ToString() + confStr, err := configInstance.ToString() if err == nil { xlog.Debug( ctx, "Use configuration file", @@ -144,13 +178,14 @@ func main() { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) + return } s := grpc.NewServer() reflection.Register(s) - db := db.NewYdbConnector(config) + dbConnector := db.NewYdbConnector(configInstance) - server := server{driver: db} + server := server{driver: dbConnector} defer server.driver.Close() pb.RegisterBackupServiceServer(s, &server) @@ -168,9 +203,15 @@ func main() { }() handlersRegistry := processor.NewOperationHandlerRegistry() - handlersRegistry.Add(types.OperationType("TB"), handlers.MakeTBOperationHandler(db, client.NewClientYdbConnector())) + err = handlersRegistry.Add( + types.OperationType("TB"), handlers.MakeTBOperationHandler(dbConnector, client.NewClientYdbConnector()), + ) + if err != nil { + log.Fatalf("failed to register handler: %v", err) + return + } - processor.NewOperationProcessor(ctx, &wg, db, handlersRegistry) + processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry) wg.Add(1) go func() { diff --git a/go.mod b/go.mod index 5ebd8574..df72f677 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/google/uuid v1.6.0 github.com/jonboulle/clockwork v0.3.0 github.com/stretchr/testify v1.8.1 + github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 github.com/ydb-platform/ydb-go-sdk/v3 v3.74.2 go.uber.org/automaxprocs v1.5.3 go.uber.org/zap v1.27.0 @@ -20,7 +21,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang-jwt/jwt/v4 v4.4.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/ydb-platform/ydb-go-genproto v0.0.0-20240528144234-5d5a685e41f7 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sync v0.6.0 // indirect diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index 370357d8..25ec2dcb 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -35,7 +35,10 @@ var ErrUnimplemented = errors.New("unimplemented") type DBConnector interface { GetTableClient() table.Client - SelectBackups(ctx context.Context, backupStatus string) ([]*types.Backup, error) + SelectBackups(ctx context.Context, queryBuilder queries.ReadTableQuery) ( + []*types.Backup, error, + ) + SelectBackupsByStatus(ctx context.Context, backupStatus string) ([]*types.Backup, error) ActiveOperations(context.Context) ([]types.Operation, error) UpdateOperation(context.Context, types.Operation) error CreateOperation(context.Context, types.Operation) (types.ObjectID, error) @@ -89,13 +92,17 @@ func (d *YdbConnector) Close() { func DoStructSelect[T any]( ctx context.Context, d *YdbConnector, - query string, + queryBuilder queries.ReadTableQuery, readLambda StructFromResultSet[T], ) ([]*T, error) { var ( entities []*T ) - err := d.GetTableClient().Do( + queryFormat, err := queryBuilder.FormatQuery() + if err != nil { + return nil, err + } + err = d.GetTableClient().Do( ctx, func(ctx context.Context, s table.Session) (err error) { var ( res result.Result @@ -103,7 +110,7 @@ func DoStructSelect[T any]( _, res, err = s.Execute( ctx, readTx, - query, table.NewQueryParameters(), + queryFormat.QueryText, queryFormat.QueryParams, ) if err != nil { return err @@ -141,13 +148,17 @@ func DoStructSelect[T any]( func DoInterfaceSelect[T any]( ctx context.Context, d *YdbConnector, - query string, + queryBuilder queries.ReadTableQuery, readLambda InterfaceFromResultSet[T], ) ([]T, error) { var ( entities []T ) - err := d.GetTableClient().Do( + queryFormat, err := queryBuilder.FormatQuery() + if err != nil { + return nil, err + } + err = d.GetTableClient().Do( ctx, func(ctx context.Context, s table.Session) (err error) { var ( res result.Result @@ -155,7 +166,7 @@ func DoInterfaceSelect[T any]( _, res, err = s.Execute( ctx, readTx, - query, table.NewQueryParameters(), + queryFormat.QueryText, queryFormat.QueryParams, ) if err != nil { return err @@ -214,13 +225,33 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, query string, paramete return nil } -func (d *YdbConnector) SelectBackups( +func (d *YdbConnector) SelectBackupsByStatus( ctx context.Context, backupStatus string, ) ([]*types.Backup, error) { return DoStructSelect[types.Backup]( ctx, d, - queries.SelectEntitiesQuery("Backups", backupStatus), + queries.MakeReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields("id", "operation_id"), + queries.WithQueryFilters[string]( + queries.QueryFilter[string]{ + Field: "Status", + Values: []string{backupStatus}, + }, + ), + ), + ReadBackupFromResultSet, + ) +} + +func (d *YdbConnector) SelectBackups( + ctx context.Context, queryBuilder queries.ReadTableQuery, +) ([]*types.Backup, error) { + return DoStructSelect[types.Backup]( + ctx, + d, + queryBuilder, ReadBackupFromResultSet, ) } @@ -231,9 +262,14 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) ( return DoInterfaceSelect[types.Operation]( ctx, d, - queries.SelectEntitiesQuery( - "Operations", types.OperationStatePending.String(), - types.OperationStateCancelling.String(), + queries.MakeReadTableQuery( + queries.WithTableName("Operations"), + queries.WithQueryFilters( + queries.QueryFilter[types.OperationState]{ + Field: "Status", + Values: []types.OperationState{types.OperationStatePending, types.OperationStateCancelling}, + }, + ), ), ReadOperationFromResultSet, ) diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 1be1b7f6..851b2241 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" "github.com/ydb-platform/ydb-go-sdk/v3/table" @@ -40,7 +41,17 @@ func WithBackups(backups map[types.ObjectID]types.Backup) Option { } func (c *MockDBConnector) SelectBackups( - ctx context.Context, backupStatus string, + _ context.Context, _ queries.ReadTableQuery, +) ([]*types.Backup, error) { + backups := make([]*types.Backup, 0, len(c.backups)) + for _, backup := range c.backups { + backups = append(backups, &backup) + } + return backups, nil +} + +func (c *MockDBConnector) SelectBackupsByStatus( + _ context.Context, _ string, ) ([]*types.Backup, error) { backups := make([]*types.Backup, 0, len(c.backups)) for _, backup := range c.backups { @@ -50,7 +61,7 @@ func (c *MockDBConnector) SelectBackups( } func (c *MockDBConnector) UpdateBackup( - ctx context.Context, id types.ObjectID, backupStatus string, + _ context.Context, id types.ObjectID, backupStatus string, ) error { if _, ok := c.backups[id]; !ok { return errors.New(fmt.Sprintf("no backup found for id %v", id)) @@ -66,7 +77,7 @@ func (c *MockDBConnector) GetTableClient() table.Client { return nil } -func (c *MockDBConnector) CreateBackup(ctx context.Context, backup types.Backup) (types.ObjectID, error) { +func (c *MockDBConnector) CreateBackup(_ context.Context, backup types.Backup) (types.ObjectID, error) { var id types.ObjectID for { id = types.GenerateObjectID() diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go new file mode 100644 index 00000000..75d9c027 --- /dev/null +++ b/internal/connectors/db/yql/queries/read.go @@ -0,0 +1,136 @@ +package queries + +import ( + "errors" + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "strings" +) + +var ( + AllBackupFields = []string{ + "id", "container_id", "database", + "initiated", "created_at", "compileted_at", + "s3_bucket", "s3_key_prefix", "status", + "paths", "operation_id", + } +) + +type StringLike interface { + ~string +} + +type QueryFilter[T StringLike] struct { + Field string + Values []T + IsLike bool +} + +type FormatQueryResult struct { + QueryText string + QueryParams *table.QueryParameters +} + +type ReadTableQuery interface { + MakeFilterString() string + FormatQuery() (*FormatQueryResult, error) +} + +type ReadTableQueryImpl struct { + tableName string + selectFields []string + filters [][]string + filterFields []string + isLikeFilter map[string]bool + tableQueryParams []table.ParameterOption +} + +type ReadTableQueryOption func(*ReadTableQueryImpl) + +func MakeReadTableQuery(options ...ReadTableQueryOption) *ReadTableQueryImpl { + d := &ReadTableQueryImpl{} + d.filters = make([][]string, 0) + d.filterFields = make([]string, 0) + d.isLikeFilter = make(map[string]bool) + for _, opt := range options { + opt(d) + } + return d +} + +func WithTableName(tableName string) ReadTableQueryOption { + return func(d *ReadTableQueryImpl) { + d.tableName = tableName + } +} + +func WithSelectFields(fields ...string) ReadTableQueryOption { + return func(d *ReadTableQueryImpl) { + d.selectFields = fields + } +} + +func WithQueryFilters[T StringLike](filters ...QueryFilter[T]) ReadTableQueryOption { + return func(d *ReadTableQueryImpl) { + for _, filter := range filters { + d.filterFields = append(d.filterFields, filter.Field) + newFilters := make([]string, 0, len(filter.Values)) + for _, value := range filter.Values { + newFilters = append(newFilters, string(value)) + } + d.filters = append(d.filters, newFilters) + if filter.IsLike { + d.isLikeFilter[filter.Field] = true + } + } + } +} + +func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue string) string { + paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams)) + d.tableQueryParams = append( + d.tableQueryParams, table.ValueParam(paramName, table_types.StringValueFromString(paramValue)), + ) + return paramName +} + +func (d *ReadTableQueryImpl) MakeFilterString() string { + filterStrings := make([]string, 0, len(d.filters)) + for i := 0; i < len(d.filterFields); i++ { + fieldFilterStrings := make([]string, 0, len(d.filters[i])) + op := "=" + if d.isLikeFilter[d.filterFields[i]] { + op = "LIKE" + } + for _, value := range d.filters[i] { + paramName := d.AddTableQueryParam(value) + fieldFilterStrings = append(fieldFilterStrings, fmt.Sprintf("%s %s %s", d.filterFields[i], op, paramName)) + } + filterStrings = append(filterStrings, fmt.Sprintf("(%s)", strings.Join(fieldFilterStrings, " OR "))) + + } + return strings.Join(filterStrings, " AND ") +} + +func (d *ReadTableQueryImpl) FormatQuery() (*FormatQueryResult, error) { + if len(d.selectFields) == 0 { + return nil, errors.New("No fields to select") + } + if len(d.tableName) == 0 { + return nil, errors.New("No table") + } + if len(d.filters) == 0 { + return nil, errors.New("No filters") + } + res := fmt.Sprintf( + "SELECT %s FROM %s WHERE %s", + strings.Join(d.selectFields, ", "), + d.tableName, + d.MakeFilterString(), + ) + return &FormatQueryResult{ + QueryText: res, + QueryParams: table.NewQueryParameters(d.tableQueryParams...), + }, nil +} diff --git a/internal/connectors/db/yql/queries/read_test.go b/internal/connectors/db/yql/queries/read_test.go new file mode 100644 index 00000000..2d43e9e2 --- /dev/null +++ b/internal/connectors/db/yql/queries/read_test.go @@ -0,0 +1,45 @@ +package queries + +import ( + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "testing" +) + +func TestQueryBuilder_Read(t *testing.T) { + const ( + queryString = `SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)` + ) + var ( + queryParams = table.NewQueryParameters( + table.ValueParam("$param0", table_types.StringValueFromString("value1")), + table.ValueParam("$param1", table_types.StringValueFromString("value2")), + table.ValueParam("$param2", table_types.StringValueFromString("xxx")), + table.ValueParam("$param3", table_types.StringValueFromString("yyy")), + ) + ) + builder := MakeReadTableQuery( + WithTableName("table1"), + WithSelectFields("column1", "column2", "column3"), + WithQueryFilters( + QueryFilter[string]{ + Field: "column1", + Values: []string{"value1", "value2"}, + }, + ), + WithQueryFilters( + QueryFilter[string]{ + Field: "column2", + Values: []string{"xxx", "yyy"}, + }, + ), + ) + query, err := builder.FormatQuery() + assert.Empty(t, err) + assert.Equal( + t, queryString, query.QueryText, + "bad query format", + ) + assert.Equal(t, queryParams, query.QueryParams, "bad query params") +} diff --git a/internal/connectors/db/yql/queries/backup.go b/internal/connectors/db/yql/queries/write.go similarity index 69% rename from internal/connectors/db/yql/queries/backup.go rename to internal/connectors/db/yql/queries/write.go index 3fa5abcd..ff4721c8 100644 --- a/internal/connectors/db/yql/queries/backup.go +++ b/internal/connectors/db/yql/queries/write.go @@ -1,31 +1,5 @@ package queries -import ( - "fmt" - "strings" -) - -func MakeStatusFilter(statuses ...string) string { - for i := range statuses { - statuses[i] = fmt.Sprintf("status = '%s'", statuses[i]) - } - return strings.Join(statuses, " OR ") -} - -func SelectEntitiesQuery(tableName string, entityStatuses ...string) string { - return fmt.Sprintf( - ` - SELECT - id, - operation_id, - FROM - %s - WHERE - %s; - `, tableName, MakeStatusFilter(entityStatuses...), - ) -} - func UpdateBackupQuery() string { return ` UPSERT INTO Backups (id, status) VALUES ($id, $status); diff --git a/internal/handlers/take_backup_test.go b/internal/handlers/take_backup_test.go index 4ccc95e7..41871ce1 100644 --- a/internal/handlers/take_backup_test.go +++ b/internal/handlers/take_backup_test.go @@ -47,27 +47,27 @@ func TestTBOperationHandler(t *testing.T) { backupMap[backupID] = backup opMap[opId] = &tbOp ydbOpMap["1"] = ydbOp - db := db.NewMockDBConnector( + dbConnector := db.NewMockDBConnector( db.WithBackups(backupMap), db.WithOperations(opMap), ) - client := client.NewMockClientConnector( + clientConnector := client.NewMockClientConnector( client.WithOperations(ydbOpMap), ) - handler := MakeTBOperationHandler(db, client) + handler := MakeTBOperationHandler(dbConnector, clientConnector) err := handler(ctx, &tbOp) assert.Empty(t, err) - result, err := db.GetOperation(ctx, opId) + result, err := dbConnector.GetOperation(ctx, opId) assert.Empty(t, err) assert.Equal( t, result.GetState(), types.OperationStateDone, "operation state should be Done", ) - backups, err2 := db.SelectBackups(ctx, types.BackupStateAvailable) + backups, err2 := dbConnector.SelectBackupsByStatus(ctx, types.BackupStateAvailable) assert.Empty(t, err2) assert.Equal(t, 1, len(backups)) assert.Equal(t, types.BackupStateAvailable, backups[0].Status) diff --git a/internal/types/backup.go b/internal/types/backup.go index 8ea14fe1..6ac453bb 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -57,6 +57,20 @@ func (o *Backup) String() string { ) } +func (o *Backup) Proto() *pb.Backup { + return &pb.Backup{ + Id: o.ID.String(), + ContainerId: o.ContainerID, + DatabaseName: o.DatabaseName, + Location: nil, + Audit: nil, + Size: 0, + Status: pb.Backup_Status(pb.Backup_Status_value[o.Status]), + Message: "", + ExpireAt: nil, + } +} + type OperationType string type OperationState string