Skip to content

Commit

Permalink
Start export s3 and fixes for demo
Browse files Browse the repository at this point in the history
  • Loading branch information
bma13 committed Jul 26, 2024
1 parent 2c63a1b commit 0f96998
Show file tree
Hide file tree
Showing 20 changed files with 544 additions and 350 deletions.
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
TOPTARGETS := all
FILES ?= $(shell find . -type f -name '*.go')
PACKAGES ?= $(shell go list ./...)

SUBDIRS := pkg/proto

Expand All @@ -7,3 +9,16 @@ $(SUBDIRS):
$(MAKE) -C $@ $(MAKECMDGOALS)

.PHONY: $(TOPTARGETS) $(SUBDIRS)

test:
go test -v ./... -short

fmt:
go fmt ./...
goimports -w $(FILES)

lint:
golint $(PACKAGES)

vet:
go vet ./...
14 changes: 13 additions & 1 deletion cmd/ydbcp/config.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
ydbcp_db_connection_string: "grpc://localhost:2136/local"
operation_ttl_seconds: 86400 # 24 hours

db_connection:
connection_string: "grpcs://localhost:2135/domain/database"
insecure: true
discovery: false

s3:
endpoint: s3.endpoint.com
region: s3-region
bucket: s3-bucket
path_prefix: cluster-domain
access_key_id_path: path-to-s3-key
secret_access_key_path: path-to-s3-sec
86 changes: 78 additions & 8 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"errors"
"flag"
"fmt"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"ydbcp/internal/config"
configInit "ydbcp/internal/config"
"ydbcp/internal/connectors/client"
Expand All @@ -22,6 +23,9 @@ import (
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Export"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"

_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand All @@ -39,8 +43,9 @@ var (
type server struct {
pb.UnimplementedBackupServiceServer
pb.UnimplementedOperationServiceServer
driver db.DBConnector
s3 config.S3Config
driver db.DBConnector
clientConn client.ClientConnector
s3 config.S3Config
}

func (s *server) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (*pb.Backup, error) {
Expand Down Expand Up @@ -75,13 +80,68 @@ func (s *server) GetBackup(ctx context.Context, request *pb.GetBackupRequest) (*
func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb.Operation, error) {
xlog.Info(ctx, "MakeBackup", zap.String("request", req.String()))

clientConnectionParams := types.YdbConnectionParams{
Endpoint: req.GetDatabaseEndpoint(),
DatabaseName: req.GetDatabaseName(),
}
dsn := types.MakeYdbConnectionString(clientConnectionParams)
client, err := s.clientConn.Open(ctx, dsn)
if err != nil {
// xlog.Error(ctx, "can't open client connection", zap.Error(err), zap.String("dsn", dsn))
return nil, fmt.Errorf("can't open client connection, dsn %s: %w", dsn, err)
}
defer func() {
if err := s.clientConn.Close(ctx, client); err != nil {
xlog.Error(ctx, "can't close client connection", zap.Error(err))
}
}()

accessKey, err := s.s3.AccessKey()
if err != nil {
xlog.Error(ctx, "can't get S3AccessKey", zap.Error(err))
return nil, fmt.Errorf("can't get S3AccessKey: %w", err)
}
secretKey, err := s.s3.SecretKey()
if err != nil {
xlog.Error(ctx, "can't get S3SecretKey", zap.Error(err))
return nil, fmt.Errorf("can't get S3SecretKey: %w", err)
}
exportItems := make([]*Ydb_Export.ExportToS3Settings_Item, 0, len(s.s3.PathPrefix))
databaseName := strings.Replace(req.DatabaseName, "/", "_", -1) // TODO: checking user imput
databaseName = strings.Trim(databaseName, "_")
dstPrefix := fmt.Sprintf("%s/%s/%s", s.s3.PathPrefix, databaseName, time.Now().Format("20060102_150405"))
for _, path := range req.GetSourcePaths() {
exportItems = append(exportItems, &Ydb_Export.ExportToS3Settings_Item{
SourcePath: path,
DestinationPrefix: dstPrefix, // TODO: check if we need different destination prefixes?
})
}

s3Settings := &Ydb_Export.ExportToS3Settings{
Endpoint: s.s3.Endpoint,
Region: s.s3.Region,
Bucket: s.s3.Bucket,
AccessKey: accessKey,
SecretKey: secretKey,
Description: "ydbcp backup", // TODO: the description shoud be better
NumberOfRetries: 10, // TODO: get it from configuration
Items: exportItems,
}

clientOperationID, err := s.clientConn.ExportToS3(ctx, client, s3Settings)
if err != nil {
xlog.Error(ctx, "can't start export operation", zap.Error(err), zap.String("dns", dsn))
return nil, fmt.Errorf("can't start export operation, dsn %s: %w", dsn, err)
}
xlog.Debug(ctx, "export operation started", zap.String("clientOperationID", clientOperationID), zap.String("dsn", dsn))

backup := types.Backup{
ContainerID: req.GetContainerId(),
DatabaseName: req.GetDatabaseName(),
S3Endpoint: s.s3.Endpoint,
S3Region: s.s3.Region,
S3Bucket: s.s3.Bucket,
S3PathPrefix: s.s3.PathPrefix,
S3PathPrefix: dstPrefix,
Status: types.BackupStatePending,
}
backupID, err := s.driver.CreateBackup(ctx, backup)
Expand All @@ -99,11 +159,13 @@ func (s *server) MakeBackup(ctx context.Context, req *pb.MakeBackupRequest) (*pb
ContainerID: req.ContainerId,
State: types.OperationStatePending,
YdbConnectionParams: types.YdbConnectionParams{
Endpoint: req.GetEndpoint(),
Endpoint: req.GetDatabaseEndpoint(),
DatabaseName: req.GetDatabaseName(),
},
SourcePaths: req.GetSourcePaths(),
SourcePathToExclude: req.GetSourcePathsToExclude(),
CreatedAt: time.Now(),
YdbOperationId: clientOperationID,
}

operationID, err := s.driver.CreateOperation(ctx, op)
Expand Down Expand Up @@ -249,10 +311,18 @@ func main() {
s := grpc.NewServer()
reflection.Register(s)

dbConnector := db.NewYdbConnector(configInstance)
dbConnector, err := db.NewYdbConnector(ctx, configInstance.DBConnection)
if err != nil {
xlog.Error(ctx, "Error init DBConnector", zap.Error(err))
os.Exit(1)
}

server := server{driver: dbConnector}
defer server.driver.Close()
server := server{
driver: dbConnector,
clientConn: client.NewClientYdbConnector(),
s3: configInstance.S3,
}
defer server.driver.Close(ctx)

pb.RegisterBackupServiceServer(s, &server)
pb.RegisterOperationServiceServer(s, &server)
Expand Down
32 changes: 29 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package config
import (
"context"
"errors"
"fmt"
"os"
"strings"
"ydbcp/internal/util/xlog"

"go.uber.org/zap"
Expand All @@ -19,10 +21,17 @@ type S3Config struct {
SecretAccessKeyPath string `yaml:"secret_access_key_path"`
}

type YDBConnectionConfig struct {
ConnectionString string `yaml:"connection_string"`
Insecure bool `yaml:"insecure"`
Discovery bool `yaml:"discovery" default:"true"`
DialTimeoutSeconds uint32 `yaml:"dial_timeout_seconds" default:"5"`
}

type Config struct {
YdbcpDbConnectionString string `yaml:"ydbcp_db_connection_string"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
DBConnection YDBConnectionConfig `yaml:"db_connection"`
S3 S3Config `yaml:"s3"`
OperationTtlSeconds int64 `yaml:"operation_ttl_seconds"`
}

func (config Config) ToString() (string, error) {
Expand Down Expand Up @@ -54,3 +63,20 @@ func InitConfig(ctx context.Context, confPath string) (Config, error) {
}
return Config{}, errors.New("configuration file path is empty")
}

func readSecret(filename string) (string, error) {
rawSecret, err := os.ReadFile(filename)
if err != nil {
return "", fmt.Errorf("can't read file %s: %w", filename, err)
}
return strings.TrimSpace(string(rawSecret)), nil
}

func (c *S3Config) AccessKey() (string, error) {
return readSecret(c.AccessKeyIDPath)
}

func (c *S3Config) SecretKey() (string, error) {
return readSecret(c.SecretAccessKeyPath)

}
10 changes: 9 additions & 1 deletion internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Import"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/durationpb"
)
Expand All @@ -38,7 +39,14 @@ func NewClientYdbConnector() *ClientYdbConnector {

func (d *ClientYdbConnector) Open(ctx context.Context, dsn string) (*ydb.Driver, error) {
xlog.Info(ctx, "Connecting to client db", zap.String("dsn", dsn))
db, connErr := ydb.Open(ctx, dsn, ydb.WithAnonymousCredentials())
opts := []ydb.Option{
ydb.WithAnonymousCredentials(),
ydb.WithTLSSInsecureSkipVerify(),
ydb.WithDialTimeout(time.Second * 10),
ydb.WithBalancer(balancers.SingleConn()),
}

db, connErr := ydb.Open(ctx, dsn, opts...)

if connErr != nil {
return nil, fmt.Errorf("error connecting to client db: %s", connErr.Error())
Expand Down
77 changes: 47 additions & 30 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package db
import (
"context"
"errors"
table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"
"fmt"
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"

table_types "github.com/ydb-platform/ydb-go-sdk/v3/table/types"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/balancers"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/result"
"go.uber.org/zap"
Expand Down Expand Up @@ -47,45 +51,38 @@ type DBConnector interface {
CreateOperation(context.Context, types.Operation) (types.ObjectID, error)
CreateBackup(context.Context, types.Backup) (types.ObjectID, error)
UpdateBackup(context context.Context, id types.ObjectID, backupState string) error
Close()
Close(context.Context)
}

type YdbConnector struct {
driver *ydb.Driver
}

func NewYdbConnector(config config.Config) *YdbConnector {
p := new(YdbConnector)
p.driver = InitDriver(config.YdbcpDbConnectionString)
return p
}

func InitDriver(dsn string) *ydb.Driver {
ctx := context.Background()

func NewYdbConnector(ctx context.Context, config config.YDBConnectionConfig) (*YdbConnector, error) {
opts := []ydb.Option{
ydb.WithAnonymousCredentials(),
ydb.WithDialTimeout(time.Second * time.Duration(config.DialTimeoutSeconds)),
}
if config.Insecure {
opts = append(opts, ydb.WithTLSSInsecureSkipVerify())
}
if !config.Discovery {
opts = append(opts, ydb.WithBalancer(balancers.SingleConn()))
}
xlog.Info(ctx, "connecting to ydb", zap.String("dsn", dsn))
db, err := ydb.Open(ctx, dsn, opts...)

xlog.Info(ctx, "connecting to ydb", zap.String("dsn", config.ConnectionString))
driver, err := ydb.Open(ctx, config.ConnectionString, opts...)
if err != nil {
// handle a connection error
xlog.Error(
ctx, "Error connecting to YDB", zap.String("message", err.Error()),
)
return nil
return nil, fmt.Errorf("can't connect to YDB, dsn %s: %w", config.ConnectionString, err)
}

return db
return &YdbConnector{driver: driver}, nil
}

func (d *YdbConnector) GetTableClient() table.Client {
return d.driver.Table()
}

func (d *YdbConnector) Close() {
ctx := context.Background()
func (d *YdbConnector) Close(ctx context.Context) {
err := d.driver.Close(ctx)
if err != nil {
xlog.Error(ctx, "Error closing YDB driver")
Expand Down Expand Up @@ -287,8 +284,8 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
queries.QueryFilter{
Field: "status",
Values: []table_types.Value{
table_types.StringValueFromString(string(types.OperationStatePending)),
table_types.StringValueFromString(string(types.OperationStateCancelling)),
table_types.StringValueFromString(types.OperationStatePending.String()),
table_types.StringValueFromString(types.OperationStateCancelling.String()),
},
},
),
Expand All @@ -300,7 +297,8 @@ func (d *YdbConnector) ActiveOperations(ctx context.Context) (
func (d *YdbConnector) UpdateOperation(
ctx context.Context, operation types.Operation,
) error {
return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithUpdateOperation(operation)))
// return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithUpdateOperation(operation)))
return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateOperation(operation)))
}

func (d *YdbConnector) CreateOperation(
Expand All @@ -327,11 +325,30 @@ func (d *YdbConnector) CreateBackup(
}

func (d *YdbConnector) UpdateBackup(
context context.Context, id types.ObjectID, backupStatus string,
ctx context.Context, id types.ObjectID, backupStatus string,
) error {
backup := types.Backup{
ID: id,
Status: backupStatus,
// TODO: We must't select here
backups, err := d.SelectBackups(
ctx, queries.MakeReadTableQuery(
queries.WithTableName("Backups"),
queries.WithSelectFields(queries.AllBackupFields...),
queries.WithQueryFilters(
queries.QueryFilter{
Field: "id",
Values: []table_types.Value{table_types.UUIDValue(id)},
},
),
),
)
if err != nil {
xlog.Error(ctx, "can't select backups", zap.Error(err))
return err
}
return d.ExecuteUpsert(context, queries.MakeWriteTableQuery(queries.WithCreateBackup(backup)))
if len(backups) != 1 {
return errors.New("No backup with such Id")
}
backup := backups[0]
backup.Status = backupStatus

return d.ExecuteUpsert(ctx, queries.MakeWriteTableQuery(queries.WithCreateBackup(*backup)))
}
Loading

0 comments on commit 0f96998

Please sign in to comment.