From 2c63a1bf58b9fa0da46a292889718622ea709892 Mon Sep 17 00:00:00 2001 From: qrort <31865255+qrort@users.noreply.github.com> Date: Mon, 22 Jul 2024 16:39:09 +0300 Subject: [PATCH] implement scratch ListOperations version. General upserts into DB (#20) --- cmd/ydbcp/main.go | 128 +++++++-- internal/connectors/db/connector.go | 134 ++++----- internal/connectors/db/mock.go | 6 + internal/connectors/db/process_result_set.go | 87 ++++-- internal/connectors/db/yql/queries/read.go | 54 ++-- .../connectors/db/yql/queries/read_test.go | 27 +- internal/connectors/db/yql/queries/write.go | 260 +++++++++++++----- .../connectors/db/yql/queries/write_test.go | 53 ++++ .../db/yql/schema/create_tables.yql | 8 +- .../connectors/db/yql/schema/fill_tables.yql | 7 +- internal/handlers/restore_backup_test.go | 4 +- internal/types/backup.go | 60 +++- test.sh | 20 +- 13 files changed, 609 insertions(+), 239 deletions(-) create mode 100644 internal/connectors/db/yql/queries/write_test.go diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index 61b89d1a..e6da997b 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -2,9 +2,10 @@ package main import ( "context" + "errors" "flag" "fmt" - "log" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "net" "os" "os/signal" @@ -37,26 +38,42 @@ var ( // server is used to implement BackupService. type server struct { pb.UnimplementedBackupServiceServer + pb.UnimplementedOperationServiceServer driver db.DBConnector s3 config.S3Config } -// GetBackup implements BackupService -func (s *server) GetBackup(ctx context.Context, in *pb.GetBackupRequest) (*pb.Backup, error) { - log.Printf("Received: %v", in.GetId()) - backups, err := s.driver.SelectBackupsByStatus(ctx, types.BackupStatePending) +func (s *server) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (*pb.Backup, error) { + xlog.Debug(ctx, "GetBackup", zap.String("request", request.String())) + requestId, err := types.ParseObjectId(request.GetId()) + if err != nil { + return nil, fmt.Errorf("failed to parse uuid %s: %w", request.GetId(), err) + } + backups, err := s.driver.SelectBackups( + ctx, queries.MakeReadTableQuery( + queries.WithTableName("Backups"), + queries.WithSelectFields(queries.AllBackupFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "id", + Values: []table_types.Value{table_types.UUIDValue(requestId)}, + }, + ), + ), + ) if err != nil { xlog.Error(ctx, "can't select backups", zap.Error(err)) return nil, err } - for _, backup := range backups { - fmt.Println("backup:", backup.ID.String()) + if len(backups) == 0 { + return nil, errors.New("No backup with such Id") } - return &pb.Backup{Id: in.GetId()}, nil + xlog.Debug(ctx, "GetBackup", zap.String("backup", backups[0].String())) + return backups[0].Proto(), nil } func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb.Operation, error) { - xlog.Info(ctx, "MakeBackup called", zap.String("request", req.String())) + xlog.Info(ctx, "MakeBackup", zap.String("request", req.String())) backup := types.Backup{ ContainerID: req.GetContainerId(), @@ -78,8 +95,9 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb } op := &types.TakeBackupOperation{ - BackupId: backupID, - State: types.OperationStatePending, + BackupId: backupID, + ContainerID: req.ContainerId, + State: types.OperationStatePending, YdbConnectionParams: types.YdbConnectionParams{ Endpoint: req.GetEndpoint(), DatabaseName: req.GetDatabaseName(), @@ -94,33 +112,28 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb return nil, err } - // TODO: get pb.Operation from Operation in one place - return &pb.Operation{ - Id: operationID.String(), - ContainerId: req.ContainerId, - Type: string(op.GetType()), - DatabaseName: op.YdbConnectionParams.DatabaseName, - BackupId: backupID.String(), - SourcePaths: op.SourcePaths, - SourcePathsToExclude: op.SourcePathToExclude, - Status: types.OperationState(op.GetState()).Enum(), - }, nil + op.Id = operationID + return op.Proto(), nil } func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest) (*pb.ListBackupsResponse, error) { - log.Printf("ListBackups: %s", request.String()) + xlog.Debug(ctx, "ListBackups", zap.String("request", request.String())) backups, err := s.driver.SelectBackups( ctx, queries.MakeReadTableQuery( queries.WithTableName("Backups"), queries.WithSelectFields(queries.AllBackupFields...), queries.WithQueryFilters( - queries.QueryFilter[string]{ - Field: "container_id", - Values: []string{request.ContainerId}, + queries.QueryFilter{ + Field: "container_id", + Values: []table_types.Value{ + table_types.StringValueFromString(request.ContainerId), + }, }, - queries.QueryFilter[string]{ - Field: "database", - Values: []string{request.DatabaseNameMask}, + queries.QueryFilter{ + Field: "database", + Values: []table_types.Value{ + table_types.StringValueFromString(request.DatabaseNameMask), + }, IsLike: true, }, ), @@ -139,6 +152,59 @@ func (s *server) ListBackups(ctx context.Context, request *pb.ListBackupsRequest }, nil } +func (s *server) ListOperations(ctx context.Context, request *pb.ListOperationsRequest) ( + *pb.ListOperationsResponse, error, +) { + xlog.Debug(ctx, "ListOperations", zap.String("request", request.String())) + operations, err := s.driver.SelectOperations( + ctx, queries.MakeReadTableQuery( + queries.WithTableName("Operations"), + queries.WithSelectFields(queries.AllOperationFields...), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "container_id", + Values: []table_types.Value{ + table_types.StringValueFromString(request.ContainerId), + }, + }, + queries.QueryFilter{ + Field: "database", + Values: []table_types.Value{ + table_types.StringValueFromString(request.DatabaseNameMask), + }, + IsLike: true, + }, + ), + ), + ) + if err != nil { + return nil, fmt.Errorf("error getting backups: %w", err) + } + pbOperations := make([]*pb.Operation, 0, len(operations)) + for _, operation := range operations { + pbOperations = append(pbOperations, operation.Proto()) + } + return &pb.ListOperationsResponse{ + Operations: pbOperations, + NextPageToken: strconv.Itoa(len(operations)), + }, nil +} + +func (s *server) CancelOperation(ctx context.Context, request *pb.CancelOperationRequest) (*pb.Operation, error) { + //TODO implement me + panic("implement me") +} + +func (s *server) GetOperation(ctx context.Context, request *pb.GetOperationRequest) (*pb.Operation, error) { + //TODO implement me + panic("implement me") +} + +func (s *server) mustEmbedUnimplementedOperationServiceServer() { + //TODO implement me + panic("implement me") +} + func main() { var confPath string @@ -189,6 +255,7 @@ func main() { defer server.driver.Close() pb.RegisterBackupServiceServer(s, &server) + pb.RegisterOperationServiceServer(s, &server) wg.Add(1) go func() { @@ -212,7 +279,8 @@ func main() { } err = handlersRegistry.Add( - types.OperationTypeRB, handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance), + types.OperationTypeRB, + handlers.MakeRBOperationHandler(dbConnector, client.NewClientYdbConnector(), configInstance), ) if err != nil { diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index 25ec2dcb..84d0f638 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -3,6 +3,7 @@ package db import ( "context" "errors" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "ydbcp/internal/config" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -10,7 +11,6 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" - table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" "go.uber.org/zap" "ydbcp/internal/util/xlog" @@ -38,6 +38,9 @@ type DBConnector interface { SelectBackups(ctx context.Context, queryBuilder queries.ReadTableQuery) ( []*types.Backup, error, ) + SelectOperations(ctx context.Context, queryBuilder queries.ReadTableQuery) ( + []types.Operation, error, + ) SelectBackupsByStatus(ctx context.Context, backupStatus string) ([]*types.Backup, error) ActiveOperations(context.Context) ([]types.Operation, error) UpdateOperation(context.Context, types.Operation) error @@ -98,12 +101,12 @@ func DoStructSelect[T any]( var ( entities []*T ) - queryFormat, err := queryBuilder.FormatQuery() + queryFormat, err := queryBuilder.FormatQuery(ctx) if err != nil { return nil, err } err = d.GetTableClient().Do( - ctx, func(ctx context.Context, s table.Session) (err error) { + ctx, func(ctx context.Context, s table.Session) error { var ( res result.Result ) @@ -154,12 +157,12 @@ func DoInterfaceSelect[T any]( var ( entities []T ) - queryFormat, err := queryBuilder.FormatQuery() + queryFormat, err := queryBuilder.FormatQuery(ctx) if err != nil { return nil, err } err = d.GetTableClient().Do( - ctx, func(ctx context.Context, s table.Session) (err error) { + ctx, func(ctx context.Context, s table.Session) error { var ( res result.Result ) @@ -201,14 +204,18 @@ func DoInterfaceSelect[T any]( return entities, nil } -func (d *YdbConnector) ExecuteUpsert(ctx context.Context, query string, parameters *table.QueryParameters) error { - err := d.GetTableClient().Do( +func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.WriteTableQuery) error { + queryFormat, err := queryBuilder.FormatQuery(ctx) + if err != nil { + return err + } + err = d.GetTableClient().Do( ctx, func(ctx context.Context, s table.Session) (err error) { _, _, err = s.Execute( ctx, writeTx, - query, - parameters, + queryFormat.QueryText, + queryFormat.QueryParams, ) if err != nil { return err @@ -233,11 +240,11 @@ func (d *YdbConnector) SelectBackupsByStatus( d, queries.MakeReadTableQuery( queries.WithTableName("Backups"), - queries.WithSelectFields("id", "operation_id"), - queries.WithQueryFilters[string]( - queries.QueryFilter[string]{ - Field: "Status", - Values: []string{backupStatus}, + queries.WithSelectFields("id"), + queries.WithQueryFilters( + queries.QueryFilter{ + Field: "status", + Values: []table_types.Value{table_types.StringValueFromString(backupStatus)}, }, ), ), @@ -256,6 +263,17 @@ func (d *YdbConnector) SelectBackups( ) } +func (d *YdbConnector) SelectOperations( + ctx context.Context, queryBuilder queries.ReadTableQuery, +) ([]types.Operation, error) { + return DoInterfaceSelect[types.Operation]( + ctx, + d, + queryBuilder, + ReadOperationFromResultSet, + ) +} + func (d *YdbConnector) ActiveOperations(ctx context.Context) ( []types.Operation, error, ) { @@ -264,10 +282,14 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) ( d, queries.MakeReadTableQuery( queries.WithTableName("Operations"), + queries.WithSelectFields(queries.AllOperationFields...), queries.WithQueryFilters( - queries.QueryFilter[types.OperationState]{ - Field: "Status", - Values: []types.OperationState{types.OperationStatePending, types.OperationStateCancelling}, + queries.QueryFilter{ + Field: "status", + Values: []table_types.Value{ + table_types.StringValueFromString(string(types.OperationStatePending)), + table_types.StringValueFromString(string(types.OperationStateCancelling)), + }, }, ), ), @@ -275,69 +297,17 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) ( ) } -func BuildCreateOperationParams(operation types.Operation) *table.QueryParameters { - return table.NewQueryParameters( - table.ValueParam("$id", table_types.UUIDValue(operation.GetId())), - table.ValueParam( - "$type", - table_types.StringValueFromString(string(operation.GetType())), - ), - table.ValueParam( - "$container_id", table_types.StringValueFromString(""), - ), - table.ValueParam("$database", table_types.StringValueFromString("")), - table.ValueParam( - "$backup_id", table_types.UUIDValue(types.ObjectID{}), - ), //TODO: change to actual backup id. - table.ValueParam( - "$initiated", - table_types.StringValueFromString(operation.GetMessage()), - ), - table.ValueParam( - "$created_at", table_types.UUIDValue(operation.GetId()), - ), - table.ValueParam( - "$status", table_types.StringValueFromString(operation.GetState().String()), - ), - table.ValueParam( - "$operation_id", - table_types.StringValueFromString(operation.GetMessage()), - ), - ) -} - -func BuildUpdateOperationParams(operation types.Operation) *table.QueryParameters { - return table.NewQueryParameters( - table.ValueParam("$id", table_types.UUIDValue(operation.GetId())), - table.ValueParam( - "$status", table_types.StringValueFromString(operation.GetState().String()), - ), - table.ValueParam( - "$message", - table_types.StringValueFromString(operation.GetMessage()), - ), - ) -} - -func BuildUpdateBackupParams(id types.ObjectID, status string) *table.QueryParameters { - return table.NewQueryParameters( - table.ValueParam("$id", table_types.UUIDValue(id)), - table.ValueParam("$status", table_types.StringValueFromString(status)), - ) -} - func (d *YdbConnector) UpdateOperation( ctx context.Context, operation types.Operation, ) error { - return d.ExecuteUpsert(ctx, queries.UpdateOperationQuery(), BuildUpdateOperationParams(operation)) + return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithUpdateOperation(operation))) } -// draft, not used. imo we can not use types.Operation here. it better be a more specific struct func (d *YdbConnector) CreateOperation( ctx context.Context, operation types.Operation, ) (types.ObjectID, error) { operation.SetId(types.GenerateObjectID()) - err := d.ExecuteUpsert(ctx, queries.CreateOperationQuery(), BuildCreateOperationParams(operation)) + err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateOperation(operation))) if err != nil { return types.ObjectID{}, err } @@ -349,7 +319,7 @@ func (d *YdbConnector) CreateBackup( ) (types.ObjectID, error) { id := types.GenerateObjectID() backup.ID = id - err := d.ExecuteUpsert(ctx, queries.CreateBackupQuery(), BuildCreateBackupParams(backup)) + err := d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup))) if err != nil { return types.ObjectID{}, err } @@ -359,19 +329,9 @@ func (d *YdbConnector) CreateBackup( func (d *YdbConnector) UpdateBackup( context context.Context, id types.ObjectID, backupStatus string, ) error { - return d.ExecuteUpsert(context, queries.UpdateBackupQuery(), BuildUpdateBackupParams(id, backupStatus)) -} - -func BuildCreateBackupParams(b types.Backup) *table.QueryParameters { - return table.NewQueryParameters( - table.ValueParam("$id", table_types.UUIDValue(b.ID)), - table.ValueParam("$container_id", table_types.StringValueFromString(b.ContainerID)), - table.ValueParam("$database", table_types.StringValueFromString(b.DatabaseName)), - table.ValueParam("$initiated", table_types.StringValueFromString("")), // TODO - table.ValueParam("$s3_endpoint", table_types.StringValueFromString(b.S3Endpoint)), - table.ValueParam("$s3_region", table_types.StringValueFromString(b.S3Region)), - table.ValueParam("$s3_bucket", table_types.StringValueFromString(b.S3Bucket)), - table.ValueParam("$s3_path_prefix", table_types.StringValueFromString(b.S3PathPrefix)), - table.ValueParam("$status", table_types.StringValueFromString(b.Status)), - ) + backup := types.Backup{ + ID: id, + Status: backupStatus, + } + return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup))) } diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 851b2241..dc9f73ea 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -139,3 +139,9 @@ func (c *MockDBConnector) GetOperation( "operation not found, id %s", operationID.String(), ) } + +func (d *MockDBConnector) SelectOperations( + ctx context.Context, queryBuilder queries.ReadTableQuery, +) ([]types.Operation, error) { + return nil, errors.New("Do not call this method") +} diff --git a/internal/connectors/db/process_result_set.go b/internal/connectors/db/process_result_set.go index c28a58cc..a6a6c12b 100644 --- a/internal/connectors/db/process_result_set.go +++ b/internal/connectors/db/process_result_set.go @@ -2,72 +2,119 @@ package db import ( "fmt" - "ydbcp/internal/types" - + "github.com/google/uuid" "github.com/ydb-platform/ydb-go-sdk/v3/table/result" "github.com/ydb-platform/ydb-go-sdk/v3/table/result/named" + "ydbcp/internal/types" ) type StructFromResultSet[T any] func(result result.Result) (*T, error) type InterfaceFromResultSet[T any] func(result result.Result) (T, error) +func StringOrDefault(str *string, def string) string { + if str == nil { + return def + } + return *str +} + +func StringOrEmpty(str *string) string { + return StringOrDefault(str, "") +} + func ReadBackupFromResultSet(res result.Result) (*types.Backup, error) { var ( - backupId types.ObjectID - operationId types.ObjectID + backupId [16]byte + containerId string + databaseName string + s3endpoint *string + s3region *string + s3pathprefix *string + status *string ) err := res.ScanNamed( named.Required("id", &backupId), - named.Optional("operation_id", &operationId), + named.Required("container_id", &containerId), + named.Required("database", &databaseName), + named.Optional("s3_endpoint", &s3endpoint), + named.Optional("s3_region", &s3region), + named.Optional("s3_path_prefix", &s3pathprefix), + named.Optional("status", &status), ) if err != nil { return nil, err } - return &types.Backup{ID: backupId}, nil + + id, err := uuid.FromBytes(backupId[:]) + + if err != nil { + return nil, err + } + + return &types.Backup{ + ID: types.ObjectID(id), + ContainerID: containerId, + DatabaseName: databaseName, + S3Endpoint: StringOrEmpty(s3endpoint), + S3Region: StringOrEmpty(s3region), + S3Bucket: "", + S3PathPrefix: StringOrEmpty(s3pathprefix), + Status: StringOrDefault(status, types.BackupStateUnknown), + }, nil } func ReadOperationFromResultSet(res result.Result) (types.Operation, error) { var ( - operationId types.ObjectID - backupId *types.ObjectID - operationType types.OperationType - operationState string - database *string - ydbOperationId *string + operationId types.ObjectID + containerId string + operationType string + + operationStateBuf *string + backupId *types.ObjectID + ydbOperationId *string + database *string ) err := res.ScanNamed( named.Required("id", &operationId), - named.Optional("backup_id", &backupId), + named.Required("container_id", &containerId), named.Required("type", &operationType), - named.Required("status", &operationState), + + named.Optional("status", &operationStateBuf), + named.Optional("backup_id", &backupId), named.Optional("operation_id", &ydbOperationId), named.Optional("database", &database), ) if err != nil { return nil, err } - if operationType == types.OperationTypeTB { + operationState := types.OperationStateUnknown + if operationStateBuf != nil { + operationState = types.OperationState(*operationStateBuf) + } + if operationType == string(types.OperationTypeTB) { if backupId == nil || database == nil || ydbOperationId == nil { return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) } return &types.TakeBackupOperation{ Id: operationId, - BackupId: *backupId, - State: types.OperationState(operationState), + BackupId: types.ObjectID(*backupId), + ContainerID: containerId, + State: operationState, Message: "", YdbConnectionParams: types.GetYdbConnectionParams(*database), YdbOperationId: *ydbOperationId, }, nil - } else if operationType == types.OperationTypeRB { + } else if operationType == string(types.OperationTypeRB) { if backupId == nil || database == nil || ydbOperationId == nil { return nil, fmt.Errorf("failed to read required fields of operation %s", operationId.String()) } return &types.RestoreBackupOperation{ Id: operationId, - BackupId: *backupId, - State: types.OperationState(operationState), + BackupId: types.ObjectID(*backupId), + ContainerID: containerId, + State: operationState, Message: "", YdbConnectionParams: types.GetYdbConnectionParams(*database), YdbOperationId: *ydbOperationId, diff --git a/internal/connectors/db/yql/queries/read.go b/internal/connectors/db/yql/queries/read.go index 75d9c027..97c047fb 100644 --- a/internal/connectors/db/yql/queries/read.go +++ b/internal/connectors/db/yql/queries/read.go @@ -1,29 +1,33 @@ package queries import ( + "context" "errors" "fmt" "github.com/ydb-platform/ydb-go-sdk/v3/table" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" "strings" + "ydbcp/internal/util/xlog" ) var ( AllBackupFields = []string{ "id", "container_id", "database", - "initiated", "created_at", "compileted_at", - "s3_bucket", "s3_key_prefix", "status", + "initiated", "created_at", "completed_at", + "s3_endpoint", "s3_region", "s3_bucket", + "s3_path_prefix", "status", "paths", + } + AllOperationFields = []string{ + "id", "type", "container_id", "database", "backup_id", + "initiated", "created_at", "completed_at", "status", "paths", "operation_id", } ) -type StringLike interface { - ~string -} - -type QueryFilter[T StringLike] struct { +type QueryFilter struct { Field string - Values []T + Values []table_types.Value IsLike bool } @@ -34,13 +38,13 @@ type FormatQueryResult struct { type ReadTableQuery interface { MakeFilterString() string - FormatQuery() (*FormatQueryResult, error) + FormatQuery(ctx context.Context) (*FormatQueryResult, error) } type ReadTableQueryImpl struct { tableName string selectFields []string - filters [][]string + filters [][]table_types.Value filterFields []string isLikeFilter map[string]bool tableQueryParams []table.ParameterOption @@ -50,7 +54,7 @@ type ReadTableQueryOption func(*ReadTableQueryImpl) func MakeReadTableQuery(options ...ReadTableQueryOption) *ReadTableQueryImpl { d := &ReadTableQueryImpl{} - d.filters = make([][]string, 0) + d.filters = make([][]table_types.Value, 0) d.filterFields = make([]string, 0) d.isLikeFilter = make(map[string]bool) for _, opt := range options { @@ -71,13 +75,13 @@ func WithSelectFields(fields ...string) ReadTableQueryOption { } } -func WithQueryFilters[T StringLike](filters ...QueryFilter[T]) ReadTableQueryOption { +func WithQueryFilters(filters ...QueryFilter) ReadTableQueryOption { return func(d *ReadTableQueryImpl) { for _, filter := range filters { d.filterFields = append(d.filterFields, filter.Field) - newFilters := make([]string, 0, len(filter.Values)) + newFilters := make([]table_types.Value, 0, len(filter.Values)) for _, value := range filter.Values { - newFilters = append(newFilters, string(value)) + newFilters = append(newFilters, value) } d.filters = append(d.filters, newFilters) if filter.IsLike { @@ -87,14 +91,22 @@ func WithQueryFilters[T StringLike](filters ...QueryFilter[T]) ReadTableQueryOpt } } -func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue string) string { +func (d *ReadTableQueryImpl) AddTableQueryParam(paramValue table_types.Value) string { paramName := fmt.Sprintf("$param%d", len(d.tableQueryParams)) d.tableQueryParams = append( - d.tableQueryParams, table.ValueParam(paramName, table_types.StringValueFromString(paramValue)), + d.tableQueryParams, table.ValueParam(paramName, paramValue), ) return paramName } +func (d *ReadTableQueryImpl) DeclareParameters() string { + declares := make([]string, len(d.tableQueryParams)) + for i, param := range d.tableQueryParams { + declares[i] = fmt.Sprintf("DECLARE %s AS %s", param.Name(), param.Value().Type().String()) + } + return strings.Join(declares, ";\n") +} + func (d *ReadTableQueryImpl) MakeFilterString() string { filterStrings := make([]string, 0, len(d.filters)) for i := 0; i < len(d.filterFields); i++ { @@ -108,12 +120,11 @@ func (d *ReadTableQueryImpl) MakeFilterString() string { fieldFilterStrings = append(fieldFilterStrings, fmt.Sprintf("%s %s %s", d.filterFields[i], op, paramName)) } filterStrings = append(filterStrings, fmt.Sprintf("(%s)", strings.Join(fieldFilterStrings, " OR "))) - } return strings.Join(filterStrings, " AND ") } -func (d *ReadTableQueryImpl) FormatQuery() (*FormatQueryResult, error) { +func (d *ReadTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) { if len(d.selectFields) == 0 { return nil, errors.New("No fields to select") } @@ -123,12 +134,15 @@ func (d *ReadTableQueryImpl) FormatQuery() (*FormatQueryResult, error) { if len(d.filters) == 0 { return nil, errors.New("No filters") } + filter := d.MakeFilterString() res := fmt.Sprintf( - "SELECT %s FROM %s WHERE %s", + "%s;\nSELECT %s FROM %s WHERE %s", + d.DeclareParameters(), strings.Join(d.selectFields, ", "), d.tableName, - d.MakeFilterString(), + filter, ) + xlog.Debug(ctx, "read query", zap.String("yql", res)) return &FormatQueryResult{ QueryText: res, QueryParams: table.NewQueryParameters(d.tableQueryParams...), diff --git a/internal/connectors/db/yql/queries/read_test.go b/internal/connectors/db/yql/queries/read_test.go index 2d43e9e2..19b3047e 100644 --- a/internal/connectors/db/yql/queries/read_test.go +++ b/internal/connectors/db/yql/queries/read_test.go @@ -1,6 +1,7 @@ package queries import ( + "context" "github.com/stretchr/testify/assert" "github.com/ydb-platform/ydb-go-sdk/v3/table" table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" @@ -9,7 +10,11 @@ import ( func TestQueryBuilder_Read(t *testing.T) { const ( - queryString = `SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)` + queryString = `DECLARE $param0 AS String; +DECLARE $param1 AS String; +DECLARE $param2 AS String; +DECLARE $param3 AS String; +SELECT column1, column2, column3 FROM table1 WHERE (column1 = $param0 OR column1 = $param1) AND (column2 = $param2 OR column2 = $param3)` ) var ( queryParams = table.NewQueryParameters( @@ -23,19 +28,25 @@ func TestQueryBuilder_Read(t *testing.T) { WithTableName("table1"), WithSelectFields("column1", "column2", "column3"), WithQueryFilters( - QueryFilter[string]{ - Field: "column1", - Values: []string{"value1", "value2"}, + QueryFilter{ + Field: "column1", + Values: []table_types.Value{ + table_types.StringValueFromString("value1"), + table_types.StringValueFromString("value2"), + }, }, ), WithQueryFilters( - QueryFilter[string]{ - Field: "column2", - Values: []string{"xxx", "yyy"}, + QueryFilter{ + Field: "column2", + Values: []table_types.Value{ + table_types.StringValueFromString("xxx"), + table_types.StringValueFromString("yyy"), + }, }, ), ) - query, err := builder.FormatQuery() + query, err := builder.FormatQuery(context.Background()) assert.Empty(t, err) assert.Equal( t, queryString, query.QueryText, diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index ff4721c8..da83cb79 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -1,69 +1,197 @@ package queries -func UpdateBackupQuery() string { - return ` - UPSERT INTO Backups (id, status) VALUES ($id, $status); - ` -} - -func UpdateOperationQuery() string { - return ` - UPSERT INTO Operations (id, status, message) VALUES ($id, $status, $message); - ` -} - -func CreateOperationQuery() string { - //skipped completed_at and paths - return ` - UPSERT INTO Operations ( - id, - type, - container_id, - database, - backup_id, - initiated, - created_at, - status, - operation_id - ) VALUES ( - $id, - $type, - $container_id, - $database, - $backup_id, - $initiated, - $created_at, - $status, - $operation_id - ); - ` -} - -func CreateBackupQuery() string { - //skipped completed_at - return ` - UPSERT INTO Backup ( - id, - container_id, - database, - initiated, - s3_endpoint, - s3_region, - s3_bucket, - s3_path_prefix, - status, - paths - ) VALUES ( - $id, - $container_id, - $database, - $initiated, - $s3_endpoint, - $s3_region, - $s3_bucket, - $s3_path_prefix, - $status, - $paths - ); - ` +import ( + "context" + "errors" + "fmt" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "go.uber.org/zap" + "strings" + "ydbcp/internal/types" + "ydbcp/internal/util/xlog" +) + +type WriteTableQuery interface { + FormatQuery(ctx context.Context) (*FormatQueryResult, error) +} + +type WriteTableQueryImpl struct { + tableQueries []WriteSingleTableQueryImpl +} + +type WriteSingleTableQueryImpl struct { + index int + tableName string + upsertFields []string + tableQueryParams []table.ParameterOption +} + +func (d *WriteSingleTableQueryImpl) AddValueParam(name string, value table_types.Value) { + d.upsertFields = append(d.upsertFields, name[1:]) + d.tableQueryParams = append(d.tableQueryParams, table.ValueParam(fmt.Sprintf("%s_%d", name, d.index), value)) +} + +func (d *WriteSingleTableQueryImpl) GetParamNames() []string { + res := make([]string, len(d.tableQueryParams)) + for i, p := range d.tableQueryParams { + res[i] = p.Name() + } + return res +} + +func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl { + d := WriteSingleTableQueryImpl{ + index: index, + tableName: "Operations", + } + d.AddValueParam("$id", table_types.UUIDValue(operation.GetId())) + d.AddValueParam("$type", table_types.StringValueFromString(string(operation.GetType()))) + d.AddValueParam( + "$status", table_types.StringValueFromString(operation.GetState().String()), + ) + if operation.GetType() == types.OperationType("TB") { + tb := operation.(*types.TakeBackupOperation) + d.AddValueParam( + "$container_id", table_types.StringValueFromString(tb.ContainerID), + ) + d.AddValueParam( + "$database", + table_types.StringValueFromString(tb.YdbConnectionParams.DatabaseName), + ) + d.AddValueParam( + "$backup_id", + table_types.UUIDValue(tb.BackupId), + ) + d.AddValueParam( + "$initiated", + table_types.StringValueFromString(""), //TODO + ) + d.AddValueParam( + "$created_at", + table_types.StringValueFromString(""), //TODO + ) + d.AddValueParam( + "$operation_id", + table_types.StringValueFromString(tb.YdbOperationId), + ) + } + + return d +} + +func BuildUpdateOperationQuery(operation types.Operation, index int) WriteSingleTableQueryImpl { + d := WriteSingleTableQueryImpl{ + index: index, + tableName: "Operations", + } + d.AddValueParam("$id", table_types.UUIDValue(operation.GetId())) + d.AddValueParam( + "$status", table_types.StringValueFromString(operation.GetState().String()), + ) + d.AddValueParam( + "$message", + table_types.StringValueFromString(operation.GetMessage()), + ) + return d +} + +func BuildUpdateBackupQuery(backup types.Backup, index int) WriteSingleTableQueryImpl { + d := WriteSingleTableQueryImpl{ + index: index, + tableName: "Backups", + } + d.AddValueParam("$id", table_types.UUIDValue(backup.ID)) + d.AddValueParam("$status", table_types.StringValueFromString(backup.Status)) + return d +} + +func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl { + d := WriteSingleTableQueryImpl{ + index: index, + tableName: "Backups", + } + d.AddValueParam("$id", table_types.UUIDValue(b.ID)) + d.AddValueParam("$container_id", table_types.StringValueFromString(b.ContainerID)) + d.AddValueParam("$database", table_types.StringValueFromString(b.DatabaseName)) + d.AddValueParam("$initiated", table_types.StringValueFromString("")) // TODO + d.AddValueParam("$s3_endpoint", table_types.StringValueFromString(b.S3Endpoint)) + d.AddValueParam("$s3_region", table_types.StringValueFromString(b.S3Region)) + d.AddValueParam("$s3_bucket", table_types.StringValueFromString(b.S3Bucket)) + d.AddValueParam("$s3_path_prefix", table_types.StringValueFromString(b.S3PathPrefix)) + d.AddValueParam("$status", table_types.StringValueFromString(b.Status)) + return d +} + +type WriteTableQueryOption func(*WriteTableQueryImpl) + +func MakeWriteTableQuery(options ...WriteTableQueryOption) *WriteTableQueryImpl { + d := &WriteTableQueryImpl{} + for _, opt := range options { + opt(d) + } + return d +} + +func WithCreateBackup(backup types.Backup) WriteTableQueryOption { + return func(d *WriteTableQueryImpl) { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) + } +} + +func WithUpdateBackup(backup types.Backup) WriteTableQueryOption { + return func(d *WriteTableQueryImpl) { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildUpdateBackupQuery(backup, index)) + } +} + +func WithUpdateOperation(operation types.Operation) WriteTableQueryOption { + return func(d *WriteTableQueryImpl) { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildUpdateOperationQuery(operation, index)) + } +} + +func WithCreateOperation(operation types.Operation) WriteTableQueryOption { + return func(d *WriteTableQueryImpl) { + index := len(d.tableQueries) + d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) + } +} + +func (d *WriteSingleTableQueryImpl) DeclareParameters() string { + declares := make([]string, len(d.tableQueryParams)) + for i, param := range d.tableQueryParams { + declares[i] = fmt.Sprintf("DECLARE %s AS %s", param.Name(), param.Value().Type().String()) + } + return strings.Join(declares, ";\n") +} + +func (d *WriteTableQueryImpl) FormatQuery(ctx context.Context) (*FormatQueryResult, error) { + queryStrings := make([]string, len(d.tableQueries)) + allParams := make([]table.ParameterOption, 0) + for i, t := range d.tableQueries { + if len(t.upsertFields) == 0 { + return nil, errors.New("No fields to upsert") + } + if len(t.tableName) == 0 { + return nil, errors.New("No table") + } + declares := t.DeclareParameters() + queryStrings[i] = fmt.Sprintf( + "%s;\nUPSERT INTO %s (%s) VALUES (%s)", declares, t.tableName, strings.Join(t.upsertFields, ", "), + strings.Join(t.GetParamNames(), ", "), + ) + for _, p := range t.tableQueryParams { + allParams = append(allParams, p) + } + } + res := strings.Join(queryStrings, ";\n") + xlog.Debug(ctx, "write query", zap.String("yql", res)) + return &FormatQueryResult{ + QueryText: res, + QueryParams: table.NewQueryParameters(allParams...), + }, nil } diff --git a/internal/connectors/db/yql/queries/write_test.go b/internal/connectors/db/yql/queries/write_test.go new file mode 100644 index 00000000..4c039aab --- /dev/null +++ b/internal/connectors/db/yql/queries/write_test.go @@ -0,0 +1,53 @@ +package queries + +import ( + "context" + "github.com/stretchr/testify/assert" + "github.com/ydb-platform/ydb-go-sdk/v3/table" + table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types" + "testing" + "ydbcp/internal/types" +) + +func TestQueryBuilder_Write(t *testing.T) { + const ( + queryString = `DECLARE $id_0 AS Uuid; +DECLARE $status_0 AS String; +UPSERT INTO Backups (id, status) VALUES ($id_0, $status_0); +DECLARE $id_1 AS Uuid; +DECLARE $status_1 AS String; +DECLARE $message_1 AS String; +UPSERT INTO Operations (id, status, message) VALUES ($id_1, $status_1, $message_1)` + ) + opId := types.GenerateObjectID() + backupId := types.GenerateObjectID() + op := types.TakeBackupOperation{ + Id: opId, + State: "Done", + Message: "Abcde", + } + backup := types.Backup{ + ID: backupId, + Status: "Available", + } + builder := MakeWriteTableQuery( + WithUpdateBackup(backup), + WithUpdateOperation(&op), + ) + var ( + queryParams = table.NewQueryParameters( + table.ValueParam("$id_0", table_types.UUIDValue(backupId)), + table.ValueParam("$status_0", table_types.StringValueFromString("Available")), + table.ValueParam("$id_1", table_types.UUIDValue(opId)), + table.ValueParam("$status_1", table_types.StringValueFromString("Done")), + table.ValueParam("$message_1", table_types.StringValueFromString("Abcde")), + ) + ) + query, err := builder.FormatQuery(context.Background()) + assert.Empty(t, err) + assert.Equal( + t, queryString, query.QueryText, + "bad query format", + ) + assert.Equal(t, queryParams, query.QueryParams, "bad query params") +} diff --git a/internal/connectors/db/yql/schema/create_tables.yql b/internal/connectors/db/yql/schema/create_tables.yql index cb01bcb9..d0f75cb8 100644 --- a/internal/connectors/db/yql/schema/create_tables.yql +++ b/internal/connectors/db/yql/schema/create_tables.yql @@ -1,3 +1,7 @@ +DROP TABLE Backups; +DROP TABLE OperationTypes; +DROP TABLE Operations; + CREATE TABLE Backups ( id UUID NOT NULL, container_id String NOT NULL, @@ -30,10 +34,10 @@ CREATE TABLE OperationTypes ( CREATE TABLE Operations ( id UUID NOT NULL, - type String, + type String NOT NULL, container_id String NOT NULL, database String NOT NULL, - backup_id String, + backup_id UUID, initiated String, created_at Timestamp, diff --git a/internal/connectors/db/yql/schema/fill_tables.yql b/internal/connectors/db/yql/schema/fill_tables.yql index 215b0692..7dfa4be1 100644 --- a/internal/connectors/db/yql/schema/fill_tables.yql +++ b/internal/connectors/db/yql/schema/fill_tables.yql @@ -4,5 +4,8 @@ UPSERT INTO `OperationTypes` (code, description, is_cancellable) VALUES ('RM', 'Remove backup', False); --for testing purposes -UPSERT INTO `Backups` (id, container_id, database, operation_id, status) VALUES - (Uuid('12345678-1234-5678-1234-567812345678'), '', '', Uuid('12345678-1234-5678-1234-567812345678'), 'Pending'); +UPSERT INTO `Backups` (id, container_id, database, status) VALUES + (Uuid('12345678-1234-5678-1234-567812345678'), '', '', 'PENDING'); + +UPSERT INTO `Operations` (id, container_id, database, type, status, operation_id, backup_id) VALUES + (Uuid('11111111-1111-1111-1111-111111111111'), '', '', 'TB', 'DONE', '', Uuid('11111111-1111-1111-1111-111111111112')); diff --git a/internal/handlers/restore_backup_test.go b/internal/handlers/restore_backup_test.go index f9dddbbe..b502aad7 100644 --- a/internal/handlers/restore_backup_test.go +++ b/internal/handlers/restore_backup_test.go @@ -43,7 +43,7 @@ func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) { assert.Empty(t, err) assert.NotEmpty(t, op) assert.Equal(t, types.OperationStateError, op.GetState()) - assert.Equal(t, "error status: NOT_FOUND, issues: message:\"operation not found\"", op.GetMessage()) + assert.Equal(t, "Error status: NOT_FOUND, issues: message:\"operation not found\"", op.GetMessage()) } func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) { @@ -85,7 +85,7 @@ func TestRBOperationHandlerDeadlineExceededForPendingOperation(t *testing.T) { assert.Empty(t, err) assert.NotEmpty(t, op) assert.Equal(t, types.OperationStateCancelling, op.GetState()) - assert.Equal(t, "operation deadline exceeded", op.GetMessage()) + assert.Equal(t, "Operation deadline exceeded", op.GetMessage()) // check ydb operation status (should be cancelled) ydbOpStatus, err := clientConnector.GetOperationStatus(ctx, nil, rbOp.YdbOperationId) diff --git a/internal/types/backup.go b/internal/types/backup.go index 2285cd6a..e63ea567 100644 --- a/internal/types/backup.go +++ b/internal/types/backup.go @@ -3,10 +3,13 @@ package types import ( "context" "fmt" + "google.golang.org/protobuf/types/known/timestamppb" + "log" + "strings" + "github.com/google/uuid" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "go.uber.org/zap" - "strings" "time" "ydbcp/internal/util/xlog" @@ -36,6 +39,17 @@ func GenerateObjectID() ObjectID { return ObjectID(uuid.New()) } +func ParseObjectId(objectId string) (ObjectID, error) { + parsed, err := uuid.Parse(objectId) + if err != nil { + return ObjectID{}, fmt.Errorf("Invalid uuid: %w", err) + } + if parsed.Variant() != uuid.RFC4122 && parsed.Version() != 4 { + return ObjectID{}, fmt.Errorf("ObjectId is not UUID4: %w", err) + } + return ObjectID(parsed), nil +} + type Backup struct { ID ObjectID ContainerID string @@ -83,6 +97,7 @@ type Operation interface { SetState(s OperationState) GetMessage() string SetMessage(m string) + Proto() *pb.Operation } type TakeBackupOperation struct { @@ -121,6 +136,23 @@ func (o *TakeBackupOperation) SetMessage(m string) { o.Message = m } +func (o *TakeBackupOperation) Proto() *pb.Operation { + return &pb.Operation{ + Id: o.Id.String(), + ContainerId: o.ContainerID, + Type: string(OperationTypeTB), + DatabaseName: o.YdbConnectionParams.DatabaseName, + YdbServerOperationId: o.YdbOperationId, + BackupId: o.BackupId.String(), + SourcePaths: o.SourcePaths, + SourcePathsToExclude: o.SourcePathToExclude, + RestorePaths: nil, + Audit: nil, + Status: o.State.Enum(), + Message: o.Message, + } +} + type RestoreBackupOperation struct { Id ObjectID ContainerID string @@ -157,6 +189,27 @@ func (o *RestoreBackupOperation) SetMessage(m string) { o.Message = m } +func (o *RestoreBackupOperation) Proto() *pb.Operation { + return &pb.Operation{ + Id: o.Id.String(), + ContainerId: o.ContainerID, + Type: string(OperationTypeTB), + DatabaseName: o.YdbConnectionParams.DatabaseName, + YdbServerOperationId: o.YdbOperationId, + BackupId: o.BackupId.String(), + SourcePaths: nil, + SourcePathsToExclude: nil, + RestorePaths: o.DestinationPaths, + Audit: &pb.AuditInfo{ + Creator: "", + CreatedAt: timestamppb.New(o.CreatedAt), + CompletedAt: nil, + }, + Status: o.State.Enum(), + Message: o.Message, + } +} + type GenericOperation struct { Id ObjectID ContainerID string @@ -189,6 +242,10 @@ func (o *GenericOperation) GetMessage() string { func (o *GenericOperation) SetMessage(m string) { o.Message = m } +func (o *GenericOperation) Proto() *pb.Operation { + log.Fatalf("Converting GenericOperation to Proto: %s", o.Id) + return nil +} var ( OperationStateUnknown = OperationState(pb.Operation_STATUS_UNSPECIFIED.String()) @@ -203,6 +260,7 @@ const ( OperationTypeTB = OperationType("TB") OperationTypeRB = OperationType("RB") + BackupStateUnknown = "Unknown" BackupStatePending = "Pending" BackupStateAvailable = "Available" BackupStateError = "Error" diff --git a/test.sh b/test.sh index 73157381..09760bf8 100755 --- a/test.sh +++ b/test.sh @@ -1 +1,19 @@ -grpcurl -plaintext -format text -d 'id: "my_backup_id"' localhost:50051 ydbcp.BackupService.GetBackup +if [[ -z "$1" ]]; then + echo "Specify oneof request examples: GetBackup, ListBackups" +fi +doneflag=0 +if [[ "GetBackup" == "$1" ]]; then + grpcurl -plaintext -d '{"id": "12345678-1234-5678-1234-567812345678"}' localhost:50051 ydbcp.BackupService.GetBackup + doneflag=1 +fi +if [[ "ListBackups" == "$1" ]]; then + grpcurl -plaintext -d '{"databaseNameMask": "", "containerId": ""}' localhost:50051 ydbcp.BackupService.ListBackups + doneflag=1 +fi +if [[ "ListOperations" == "$1" ]]; then + grpcurl -plaintext -d '{"databaseNameMask": "", "containerId": ""}' localhost:50051 ydbcp.OperationService.ListOperations + doneflag=1 +fi +if [[ 0 == $doneflag ]]; then + echo "Failed to parse command; nothing done" +fi