Skip to content

Commit

Permalink
MetricsRegistry moved to a global variable
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Dec 6, 2024
1 parent ae95cff commit a0c5e18
Show file tree
Hide file tree
Showing 22 changed files with 130 additions and 170 deletions.
20 changes: 9 additions & 11 deletions cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func main() {
zap.String("config", confStr),
)
}
metrics := metrics.NewMetricsRegistry(ctx, &wg, &configInstance.MetricsServer)
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer)
xlog.Info(ctx, "Initialized metrics registry")
server, err := server.NewServer(&configInstance.GRPCServer)
if err != nil {
Expand Down Expand Up @@ -127,10 +127,9 @@ func main() {
authProvider,
configInstance.ClientConnection.AllowedEndpointDomains,
configInstance.ClientConnection.AllowInsecureEndpoint,
metrics,
).Register(server)
operation.NewOperationService(dbConnector, authProvider, metrics).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider, metrics).Register(server)
operation.NewOperationService(dbConnector, authProvider).Register(server)
backup_schedule.NewBackupScheduleService(dbConnector, clientConnector, authProvider).Register(server)
if err := server.Start(ctx, &wg); err != nil {
xlog.Error(ctx, "Error start GRPC server", zap.Error(err))
os.Exit(1)
Expand All @@ -142,7 +141,7 @@ func main() {
if err := handlersRegistry.Add(
types.OperationTypeTB,
handlers.NewTBOperationHandler(
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics,
dbConnector, clientConnector, s3Connector, configInstance, queries.NewWriteTableQuery,
),
); err != nil {
xlog.Error(ctx, "failed to register TB handler", zap.Error(err))
Expand All @@ -151,15 +150,15 @@ func main() {

if err := handlersRegistry.Add(
types.OperationTypeRB,
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance, metrics),
handlers.NewRBOperationHandler(dbConnector, clientConnector, configInstance),
); err != nil {
xlog.Error(ctx, "failed to register RB handler", zap.Error(err))
os.Exit(1)
}

if err := handlersRegistry.Add(
types.OperationTypeDB,
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery, metrics),
handlers.NewDBOperationHandler(dbConnector, s3Connector, configInstance, queries.NewWriteTableQuery),
); err != nil {
xlog.Error(ctx, "failed to register DB handler", zap.Error(err))
os.Exit(1)
Expand All @@ -174,21 +173,20 @@ func main() {
configInstance.ClientConnection,
queries.NewWriteTableQuery,
clockwork.NewRealClock(),
metrics,
),
); err != nil {
xlog.Error(ctx, "failed to register TBWR handler", zap.Error(err))
os.Exit(1)
}

processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry, metrics)
processor.NewOperationProcessor(ctx, &wg, dbConnector, handlersRegistry)
xlog.Info(ctx, "Initialized OperationProcessor")
ttl_watcher.NewTtlWatcher(ctx, &wg, dbConnector, queries.NewWriteTableQuery)
xlog.Info(ctx, "Created TtlWatcher")

backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, metrics, clockwork.NewRealClock())
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock())

schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler, metrics, clockwork.NewRealClock())
schedule_watcher.NewScheduleWatcher(ctx, &wg, dbConnector, backupScheduleHandler, clockwork.NewRealClock())

xlog.Info(ctx, "Created ScheduleWatcher")

Expand Down
4 changes: 2 additions & 2 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func (c *MockDBConnector) SelectBackupSchedules(
}

func (c *MockDBConnector) SelectBackupSchedulesWithRPOInfo(
_ context.Context, _ queries.ReadTableQuery,
a context.Context, b queries.ReadTableQuery,
) ([]*types.BackupSchedule, error) {
panic("not implemented")
return c.SelectBackupSchedules(a, b)
}

func (c *MockDBConnector) SelectBackupsByStatus(
Expand Down
8 changes: 3 additions & 5 deletions internal/handlers/delete_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ func NewDBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBuilderFactory,
mon metrics.MetricsRegistry,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
err := DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory, mon)
err := DBOperationHandler(ctx, op, db, s3, config, queryBuilderFactory)
if err == nil {
mon.ReportOperationMetrics(op)
metrics.GlobalMetricsRegistry.ReportOperationMetrics(op)
}
return err
}
Expand All @@ -40,7 +39,6 @@ func DBOperationHandler(
s3 s3.S3Connector,
config config.Config,
queryBuilderFactory queries.WriteQueryBuilderFactory,
mon metrics.MetricsRegistry,
) error {
xlog.Info(ctx, "DBOperationHandler", zap.String("OperationMessage", operation.GetMessage()))

Expand Down Expand Up @@ -116,7 +114,7 @@ func DBOperationHandler(
return fmt.Errorf("failed to delete backup data: %v", err)
}

mon.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, backup.DatabaseName, size)
metrics.GlobalMetricsRegistry.IncBytesDeletedCounter(backup.ContainerID, backup.S3Bucket, backup.DatabaseName, size)

backup.Status = types.BackupStateDeleted
operation.SetState(types.OperationStateDone)
Expand Down
18 changes: 7 additions & 11 deletions internal/handlers/delete_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) {
metrics.InitializeMockMetricsRegistry()
ctx := context.Background()
opId := types.GenerateObjectID()
backupID := types.GenerateObjectID()
Expand Down Expand Up @@ -54,7 +55,6 @@ func TestDBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) {
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 0,
}, queries.NewWriteTableQueryMock,
metrics.NewMockMetricsRegistry(),
)

err := handler(ctx, &dbOp)
Expand Down Expand Up @@ -122,12 +122,11 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {

s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

mon := metrics.NewMockMetricsRegistry()
metrics.InitializeMockMetricsRegistry()
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
mon,
)

err := handler(ctx, &dbOp)
Expand All @@ -152,7 +151,7 @@ func TestDBOperationHandlerPendingOperationCompletedSuccessfully(t *testing.T) {
assert.Empty(t, objects)

// check metrics
assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"])
assert.Equal(t, float64(450), metrics.GetMetrics()["storage_bytes_deleted"])
}

func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
Expand Down Expand Up @@ -203,12 +202,11 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {

s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

mon := metrics.NewMockMetricsRegistry()
metrics.InitializeMockMetricsRegistry()
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
mon,
)

err := handler(ctx, &dbOp)
Expand All @@ -233,7 +231,7 @@ func TestDBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
assert.Empty(t, objects)

// check metrics
assert.Equal(t, float64(450), mon.GetMetrics()["storage_bytes_deleted"])
assert.Equal(t, float64(450), metrics.GetMetrics()["storage_bytes_deleted"])
}

func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) {
Expand Down Expand Up @@ -287,7 +285,6 @@ func TestDBOperationHandlerUnexpectedBackupStatus(t *testing.T) {
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
metrics.NewMockMetricsRegistry(),
)

err := handler(ctx, &dbOp)
Expand Down Expand Up @@ -353,12 +350,11 @@ func TestDBOperationHandlerDeleteMoreThanAllowedLimit(t *testing.T) {

s3Connector := s3Client.NewMockS3Connector(s3ObjectsMap)

mon := metrics.NewMockMetricsRegistry()
metrics.InitializeMockMetricsRegistry()
handler := NewDBOperationHandler(
dbConnector, s3Connector, config.Config{
OperationTtlSeconds: 1000,
}, queries.NewWriteTableQueryMock,
mon,
)

err := handler(ctx, &dbOp)
Expand All @@ -383,5 +379,5 @@ func TestDBOperationHandlerDeleteMoreThanAllowedLimit(t *testing.T) {
assert.Empty(t, objects)

// check metrics
assert.Equal(t, float64(objectsListSize*10), mon.GetMetrics()["storage_bytes_deleted"])
assert.Equal(t, float64(objectsListSize*10), metrics.GetMetrics()["storage_bytes_deleted"])
}
7 changes: 3 additions & 4 deletions internal/handlers/restore_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package handlers
import (
"context"
"fmt"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

Expand All @@ -17,12 +16,12 @@ import (
)

func NewRBOperationHandler(
db db.DBConnector, client client.ClientConnector, config config.Config, mon metrics.MetricsRegistry,
db db.DBConnector, client client.ClientConnector, config config.Config,
) types.OperationHandler {
return func(ctx context.Context, op types.Operation) error {
err := RBOperationHandler(ctx, op, db, client, config)
if err == nil {
mon.ReportOperationMetrics(op)
metrics.GlobalMetricsRegistry.ReportOperationMetrics(op)
}
return err
}
Expand Down
15 changes: 3 additions & 12 deletions internal/handlers/restore_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package handlers
import (
"context"
"testing"
"ydbcp/internal/metrics"

"ydbcp/internal/config"
"ydbcp/internal/connectors/client"
"ydbcp/internal/connectors/db"
Expand Down Expand Up @@ -42,7 +40,7 @@ func TestRBOperationHandlerInvalidOperationResponse(t *testing.T) {
)

// try to handle rb operation with non-existing ydb operation id
handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}, metrics.NewMockMetricsRegistry())
handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{})
err := handler(ctx, &rbOp)
assert.Empty(t, err)

Expand Down Expand Up @@ -85,7 +83,7 @@ func TestRBOperationHandlerDeadlineExceededForRunningOperation(t *testing.T) {
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

// try to handle pending rb operation with zero ttl
handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}, metrics.NewMockMetricsRegistry())
handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{})
err := handler(ctx, &rbOp)
assert.Empty(t, err)

Expand Down Expand Up @@ -138,7 +136,6 @@ func TestRBOperationHandlerRunningOperationInProgress(t *testing.T) {
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 1000},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -191,7 +188,6 @@ func TestRBOperationHandlerRunningOperationCompletedSuccessfully(t *testing.T) {
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 1000},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -243,7 +239,6 @@ func TestRBOperationHandlerRunningOperationCancelled(t *testing.T) {
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -293,7 +288,7 @@ func TestRBOperationHandlerDeadlineExceededForCancellingOperation(t *testing.T)
dbConnector := db.NewMockDBConnector(db.WithOperations(opMap))

// try to handle cancelling rb operation with zero ttl
handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{}, metrics.NewMockMetricsRegistry())
handler := NewRBOperationHandler(dbConnector, clientConnector, config.Config{})
err := handler(ctx, &rbOp)
assert.Empty(t, err)

Expand Down Expand Up @@ -346,7 +341,6 @@ func TestRBOperationHandlerCancellingOperationInProgress(t *testing.T) {
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 1000},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -399,7 +393,6 @@ func TestRBOperationHandlerCancellingOperationCompletedSuccessfully(t *testing.T
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -452,7 +445,6 @@ func TestRBOperationHandlerCancellingOperationCancelled(t *testing.T) {
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down Expand Up @@ -505,7 +497,6 @@ func TestRBOperationHandlerRetriableErrorForRunningOperation(t *testing.T) {
dbConnector,
clientConnector,
config.Config{OperationTtlSeconds: 10},
metrics.NewMockMetricsRegistry(),
)
err := handler(ctx, &rbOp)
assert.Empty(t, err)
Expand Down
6 changes: 2 additions & 4 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.Back

func NewBackupScheduleHandler(
queryBuilderFactory queries.WriteQueryBuilderFactory,
mon metrics.MetricsRegistry,
clock clockwork.Clock,
) BackupScheduleHandlerType {
return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, driver, schedule,
queryBuilderFactory, mon, clock,
queryBuilderFactory, clock,
)
}
}
Expand All @@ -35,7 +34,6 @@ func BackupScheduleHandler(
driver db.DBConnector,
schedule *types.BackupSchedule,
queryBuilderFactory queries.WriteQueryBuilderFactory,
mon metrics.MetricsRegistry,
clock clockwork.Clock,
) error {
if schedule.Status != types.BackupScheduleStateActive {
Expand Down Expand Up @@ -83,7 +81,7 @@ func BackupScheduleHandler(
zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()),
)

mon.IncScheduledBackupsCount(schedule)
metrics.GlobalMetricsRegistry.IncScheduledBackupsCount(schedule)

err = schedule.UpdateNextLaunch(clock.Now())
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions internal/handlers/schedule_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)
Expand Down Expand Up @@ -41,7 +40,7 @@ func TestBackupScheduleHandler(t *testing.T) {
)

handler := NewBackupScheduleHandler(
queries.NewWriteTableQueryMock, metrics.NewMockMetricsRegistry(), clock,
queries.NewWriteTableQueryMock, clock,
)
err := handler(ctx, dbConnector, &schedule)
assert.Empty(t, err)
Expand Down
Loading

0 comments on commit a0c5e18

Please sign in to comment.