diff --git a/cmd/ydbcp/main.go b/cmd/ydbcp/main.go index f9d55537..8078354b 100644 --- a/cmd/ydbcp/main.go +++ b/cmd/ydbcp/main.go @@ -79,7 +79,7 @@ func main() { zap.String("config", confStr), ) } - metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer) + metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock()) xlog.Info(ctx, "Initialized metrics registry") server, err := server.NewServer(&configInstance.GRPCServer) if err != nil { diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index cd288e9d..cffddb97 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -348,14 +348,15 @@ func TestTBWRHandlerSuccess(t *testing.T) { clientConnector := client.NewMockClientConnector() - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClockAt(t1.AsTime()) + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := NewTBWROperationHandler( dbConnector, clientConnector, config.S3Config{}, config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, - clockwork.NewFakeClockAt(t1.AsTime()), + clock, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -412,14 +413,15 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { clientConnector := client.NewMockClientConnector() - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClockAt(t1.AsTime()) + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := NewTBWROperationHandler( dbConnector, clientConnector, config.S3Config{}, config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, - clockwork.NewFakeClockAt(t1.AsTime()), + clock, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -534,14 +536,15 @@ func TestTBWRHandlerError(t *testing.T) { clientConnector := client.NewMockClientConnector() - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClockAt(t2.AsTime()) + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := NewTBWROperationHandler( dbConnector, clientConnector, config.S3Config{}, config.ClientConnectionConfig{}, queries.NewWriteTableQueryMock, - clockwork.NewFakeClockAt(t2.AsTime()), + clock, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -557,7 +560,8 @@ func TestTBWRHandlerError(t *testing.T) { } func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClockAt(t1.AsTime()) + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) ctx := context.Background() tbwrID := types.GenerateObjectID() tbwr := types.TakeBackupWithRetryOperation{ @@ -600,7 +604,7 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) { AllowInsecureEndpoint: true, }, queries.NewWriteTableQueryMock, - clockwork.NewFakeClockAt(t1.AsTime()), + clock, ) err := handler(ctx, &tbwr) assert.Empty(t, err) @@ -851,7 +855,8 @@ func TestTBWRHandlerFullCancel(t *testing.T) { clientConnector := client.NewMockClientConnector() - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClockAt(t1.AsTime()) + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := NewTBWROperationHandler( dbConnector, clientConnector, @@ -863,7 +868,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) { AllowInsecureEndpoint: true, }, queries.NewWriteTableQueryMock, - clockwork.NewFakeClockAt(t1.AsTime()), + clock, ) err := handler(ctx, &tbwr) assert.Empty(t, err) diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 37a46980..c6186b07 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -38,13 +38,14 @@ type MetricsRegistry interface { IncFailedHandlerRunsCount(containerId string, operationType string) IncSuccessfulHandlerRunsCount(containerId string, operationType string) IncCompletedBackupsCount(containerId string, database string, scheduleId *string, code Ydb.StatusIds_StatusCode) - IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) + IncScheduleCounters(schedule *types.BackupSchedule, err error) } type MetricsRegistryImpl struct { server *http.Server reg *prometheus.Registry cfg config.MetricsServerConfig + clock clockwork.Clock // api metrics apiCallsCounter *prometheus.CounterVec @@ -54,10 +55,11 @@ type MetricsRegistryImpl struct { bytesDeletedCounter *prometheus.CounterVec // operation metrics - operationsDuration *prometheus.HistogramVec - operationsStarted *prometheus.CounterVec - operationsFinished *prometheus.CounterVec - operationsInflight *prometheus.GaugeVec + completedOperationsDuration *prometheus.HistogramVec + inflightOperationsDuration *prometheus.HistogramVec + operationsStarted *prometheus.CounterVec + operationsFinished *prometheus.CounterVec + operationsInflight *prometheus.GaugeVec // operation processor metrics handlerRunsCount *prometheus.CounterVec @@ -105,6 +107,7 @@ func (s *MetricsRegistryImpl) IncOperationsStartedCounter(operation types.Operat func (s *MetricsRegistryImpl) ResetOperationsInflight() { s.operationsInflight.Reset() + s.inflightOperationsDuration.Reset() } func (s *MetricsRegistryImpl) ReportOperationInflight(operation types.Operation) { @@ -123,13 +126,23 @@ func (s *MetricsRegistryImpl) ReportOperationInflight(operation types.Operation) operation.GetState().String(), label, ).Inc() + + if operation.GetAudit() != nil && operation.GetAudit().CreatedAt != nil { + duration := s.clock.Now().Sub(operation.GetAudit().CreatedAt.AsTime()) + s.inflightOperationsDuration.WithLabelValues( + operation.GetContainerID(), + operation.GetDatabaseName(), + operation.GetType().String(), + operation.GetState().String(), + ).Observe(duration.Seconds()) + } } func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) { if !types.IsActive(operation) { if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { duration := operation.GetAudit().CompletedAt.AsTime().Sub(operation.GetAudit().CreatedAt.AsTime()) - s.operationsDuration.WithLabelValues( + s.completedOperationsDuration.WithLabelValues( operation.GetContainerID(), operation.GetDatabaseName(), operation.GetType().String(), @@ -173,15 +186,15 @@ func (s *MetricsRegistryImpl) IncCompletedBackupsCount(containerId string, datab } if code == Ydb.StatusIds_SUCCESS { - s.backupsSucceededCount.WithLabelValues(containerId, database, scheduleIdLabel).Inc() + s.backupsSucceededCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(1) s.backupsFailedCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(0) } else { s.backupsSucceededCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(0) - s.backupsFailedCount.WithLabelValues(containerId, database, scheduleIdLabel).Inc() + s.backupsFailedCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(1) } } -func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) { +func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, err error) { if err != nil { s.scheduleActionFailedCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc() } else { @@ -190,20 +203,21 @@ func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule if schedule.RecoveryPoint != nil { s.scheduleLastBackupTimestamp.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Set(float64(schedule.RecoveryPoint.Unix())) } - info := schedule.GetBackupInfo(clock) + info := schedule.GetBackupInfo(s.clock) if info != nil { s.scheduleRPOMarginRatio.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Set(info.LastBackupRpoMarginRatio) } } -func InitializeMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig) { - GlobalMetricsRegistry = newMetricsRegistry(ctx, wg, cfg) +func InitializeMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig, clock clockwork.Clock) { + GlobalMetricsRegistry = newMetricsRegistry(ctx, wg, cfg, clock) } -func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig) *MetricsRegistryImpl { +func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig, clock clockwork.Clock) *MetricsRegistryImpl { s := &MetricsRegistryImpl{ - reg: prometheus.NewRegistry(), - cfg: *cfg, + reg: prometheus.NewRegistry(), + cfg: *cfg, + clock: clock, } s.apiCallsCounter = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ @@ -224,13 +238,20 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met Help: "Count of bytes deleted from storage", }, []string{"container_id", "bucket", "database"}) - s.operationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{ + s.completedOperationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{ Subsystem: "operations", Name: "duration_seconds", - Help: "Duration of operations in seconds", + Help: "Duration of completed operations in seconds", Buckets: prometheus.ExponentialBuckets(10, 2, 8), }, []string{"container_id", "database", "type", "status"}) + s.inflightOperationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{ + Subsystem: "operations", + Name: "inflight_duration_seconds", + Help: "Duration of running operations in seconds", + Buckets: prometheus.ExponentialBuckets(10, 2, 8), + }, []string{"container_id", "database", "type", "state"}) + s.operationsStarted = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ Subsystem: "operations", Name: "started_counter", diff --git a/internal/metrics/metrics_mock.go b/internal/metrics/metrics_mock.go index 727b98c1..dcdca009 100644 --- a/internal/metrics/metrics_mock.go +++ b/internal/metrics/metrics_mock.go @@ -3,18 +3,25 @@ package metrics import ( "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "sync" "ydbcp/internal/types" ) type MockMetricsRegistry struct { + mutex sync.Mutex metrics map[string]float64 + clock clockwork.Clock } func (s *MockMetricsRegistry) IncOperationsStartedCounter(operation types.Operation) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["operations_started_count"]++ } func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, scheduleId *string, code Ydb.StatusIds_StatusCode) { + s.mutex.Lock() + defer s.mutex.Unlock() if code == Ydb.StatusIds_SUCCESS { s.metrics["backups_succeeded_count"]++ } else { @@ -27,25 +34,39 @@ func (s *MockMetricsRegistry) GetMetrics() map[string]float64 { } func (s *MockMetricsRegistry) IncApiCallsCounter(serviceName string, methodName string, status string) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["api_calls_count"]++ } func (s *MockMetricsRegistry) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["storage_bytes_written"] += float64(bytes) } func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["storage_bytes_deleted"] += float64(bytes) } func (s *MockMetricsRegistry) ResetOperationsInflight() { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["operations_inflight"] = 0 + s.metrics["operations_inflight_duration_seconds"] = 0 } func (s *MockMetricsRegistry) ReportOperationInflight(operation types.Operation) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["operations_inflight"]++ + s.metrics["operations_inflight_duration_seconds"]++ } func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) { + s.mutex.Lock() + defer s.mutex.Unlock() if !types.IsActive(operation) { s.metrics["operations_duration_seconds"]++ s.metrics["operations_finished_count"]++ @@ -53,22 +74,32 @@ func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) } func (s *MockMetricsRegistry) IncHandlerRunsCount(containerId string, operationType string) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["operation_processor_handler_runs_count"]++ } func (s *MockMetricsRegistry) IncFailedHandlerRunsCount(containerId string, operationType string) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["operation_processor_handler_runs_failed_count"]++ } func (s *MockMetricsRegistry) IncSuccessfulHandlerRunsCount(containerId string, operationType string) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["operation_processor_handler_runs_successful_count"]++ } func (s *MockMetricsRegistry) IncScheduledBackupsCount(schedule *types.BackupSchedule) { + s.mutex.Lock() + defer s.mutex.Unlock() s.metrics["schedules_launched_take_backup_with_retry_count"]++ } -func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) { +func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule, err error) { + s.mutex.Lock() + defer s.mutex.Unlock() if err != nil { s.metrics["schedules_failed_count"]++ } else { @@ -77,22 +108,37 @@ func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule if schedule.RecoveryPoint != nil { s.metrics["schedules_last_backup_timestamp"] = float64(schedule.RecoveryPoint.Unix()) if schedule.ScheduleSettings.RecoveryPointObjective != nil { - info := schedule.GetBackupInfo(clock) + info := schedule.GetBackupInfo(s.clock) s.metrics["schedules_recovery_point_objective"] = info.LastBackupRpoMarginRatio } } } -func InitializeMockMetricsRegistry() { - GlobalMetricsRegistry = newMockMetricsRegistry() +type Option func(*MockMetricsRegistry) + +func WithClock(clock clockwork.Clock) Option { + return func(s *MockMetricsRegistry) { + s.clock = clock + } +} + +func InitializeMockMetricsRegistry(options ...Option) { + GlobalMetricsRegistry = newMockMetricsRegistry(options...) } func GetMetrics() map[string]float64 { return GlobalMetricsRegistry.(*MockMetricsRegistry).GetMetrics() } -func newMockMetricsRegistry() *MockMetricsRegistry { - return &MockMetricsRegistry{ +func newMockMetricsRegistry(options ...Option) *MockMetricsRegistry { + mock := &MockMetricsRegistry{ metrics: make(map[string]float64), + clock: clockwork.NewFakeClock(), } + + for _, option := range options { + option(mock) + } + + return mock } diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index 9ee0a613..773366b7 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -3,6 +3,7 @@ package metrics import ( "bytes" "context" + "github.com/jonboulle/clockwork" "io" "net/http" "strconv" @@ -24,7 +25,7 @@ func TestMetricsCount(t *testing.T) { BindPort: 8080, BindAddress: "127.0.0.1", } - InitializeMetricsRegistry(ctx, &wg, cfg) + InitializeMetricsRegistry(ctx, &wg, cfg, clockwork.NewFakeClock()) s := GlobalMetricsRegistry.(*MetricsRegistryImpl) s.apiCallsCounter.WithLabelValues("test_service", "test_method", "test_status").Add(123) diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index fb3f3445..9b6c7b17 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -18,7 +18,8 @@ import ( ) func TestProcessor(t *testing.T) { - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClock() + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -32,8 +33,6 @@ func TestProcessor(t *testing.T) { return fakeTicker } - clock := clockwork.NewFakeClock() - db := db.NewMockDBConnector() handlers := NewOperationHandlerRegistry() handlerCalled := make(chan struct{}) @@ -103,4 +102,7 @@ func TestProcessor(t *testing.T) { val, ok := metrics.GetMetrics()["operations_inflight"] assert.True(t, ok) // to show that it has been incremented assert.Equal(t, float64(0), val) //to show it has been reset to 0 + val, ok = metrics.GetMetrics()["operations_inflight_duration_seconds"] + assert.True(t, ok) // to show that it has been incremented + assert.Equal(t, float64(0), val) //to show it has been reset to 0 } diff --git a/internal/watchers/schedule_watcher/schedule_watcher.go b/internal/watchers/schedule_watcher/schedule_watcher.go index b1019cfe..a6f49c39 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher.go +++ b/internal/watchers/schedule_watcher/schedule_watcher.go @@ -68,7 +68,7 @@ func ScheduleWatcherAction( for _, schedule := range schedules { err = handler(ctx, db, schedule) - metrics.GlobalMetricsRegistry.IncScheduleCounters(schedule, clock, err) + metrics.GlobalMetricsRegistry.IncScheduleCounters(schedule, err) if err != nil { xlog.Error(ctx, "error handling backup schedule", zap.String("scheduleID", schedule.ID), zap.Error(err)) } diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index 41f4a5ee..777be5dc 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -61,7 +61,7 @@ func TestScheduleWatcherSimple(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) - metrics.InitializeMockMetricsRegistry() + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, @@ -174,7 +174,7 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) - metrics.InitializeMockMetricsRegistry() + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, ) @@ -293,7 +293,7 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) - metrics.InitializeMockMetricsRegistry() + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, @@ -409,7 +409,7 @@ func TestAllScheduleMetrics(t *testing.T) { db.WithBackupSchedules(scheduleMap), ) - metrics.InitializeMockMetricsRegistry() + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) handler := handlers.NewBackupScheduleHandler( queries.NewWriteTableQueryMock, clock, diff --git a/internal/watchers/ttl_watcher/ttl_watcher_test.go b/internal/watchers/ttl_watcher/ttl_watcher_test.go index 89f96ce3..021d9d50 100644 --- a/internal/watchers/ttl_watcher/ttl_watcher_test.go +++ b/internal/watchers/ttl_watcher/ttl_watcher_test.go @@ -16,13 +16,13 @@ import ( ) func TestTtlWatcher(t *testing.T) { - metrics.InitializeMockMetricsRegistry() + clock := clockwork.NewFakeClock() + metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock)) var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Prepare fake clock and ticker - clock := clockwork.NewFakeClock() var fakeTicker *ticker.FakeTicker tickerInitialized := make(chan struct{}) tickerProvider := func(duration time.Duration) ticker.Ticker {