Skip to content

Commit

Permalink
feat(metrics): add inflight operations duration histogram
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored and qrort committed Dec 17, 2024
1 parent 2a4f9f1 commit 4332f1c
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 45 deletions.
2 changes: 1 addition & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func main() {
zap.String("config", confStr),
)
}
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer)
metrics.InitializeMetricsRegistry(ctx, &wg, &configInstance.MetricsServer, clockwork.NewRealClock())
xlog.Info(ctx, "Initialized metrics registry")
server, err := server.NewServer(&configInstance.GRPCServer)
if err != nil {
Expand Down
25 changes: 15 additions & 10 deletions internal/handlers/take_backup_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,15 @@ func TestTBWRHandlerSuccess(t *testing.T) {

clientConnector := client.NewMockClientConnector()

metrics.InitializeMockMetricsRegistry()
clock := clockwork.NewFakeClockAt(t1.AsTime())
metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock))
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
config.S3Config{},
config.ClientConnectionConfig{},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
clock,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand Down Expand Up @@ -412,14 +413,15 @@ func TestTBWRHandlerSkipRunning(t *testing.T) {

clientConnector := client.NewMockClientConnector()

metrics.InitializeMockMetricsRegistry()
clock := clockwork.NewFakeClockAt(t1.AsTime())
metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock))
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
config.S3Config{},
config.ClientConnectionConfig{},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
clock,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand Down Expand Up @@ -534,14 +536,15 @@ func TestTBWRHandlerError(t *testing.T) {

clientConnector := client.NewMockClientConnector()

metrics.InitializeMockMetricsRegistry()
clock := clockwork.NewFakeClockAt(t2.AsTime())
metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock))
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
config.S3Config{},
config.ClientConnectionConfig{},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t2.AsTime()),
clock,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand All @@ -557,7 +560,8 @@ func TestTBWRHandlerError(t *testing.T) {
}

func TestTBWRHandlerAlwaysRunOnce(t *testing.T) {
metrics.InitializeMockMetricsRegistry()
clock := clockwork.NewFakeClockAt(t1.AsTime())
metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock))
ctx := context.Background()
tbwrID := types.GenerateObjectID()
tbwr := types.TakeBackupWithRetryOperation{
Expand Down Expand Up @@ -600,7 +604,7 @@ func TestTBWRHandlerAlwaysRunOnce(t *testing.T) {
AllowInsecureEndpoint: true,
},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
clock,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand Down Expand Up @@ -851,7 +855,8 @@ func TestTBWRHandlerFullCancel(t *testing.T) {

clientConnector := client.NewMockClientConnector()

metrics.InitializeMockMetricsRegistry()
clock := clockwork.NewFakeClockAt(t1.AsTime())
metrics.InitializeMockMetricsRegistry(metrics.WithClock(clock))
handler := NewTBWROperationHandler(
dbConnector,
clientConnector,
Expand All @@ -863,7 +868,7 @@ func TestTBWRHandlerFullCancel(t *testing.T) {
AllowInsecureEndpoint: true,
},
queries.NewWriteTableQueryMock,
clockwork.NewFakeClockAt(t1.AsTime()),
clock,
)
err := handler(ctx, &tbwr)
assert.Empty(t, err)
Expand Down
55 changes: 38 additions & 17 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ type MetricsRegistry interface {
IncFailedHandlerRunsCount(containerId string, operationType string)
IncSuccessfulHandlerRunsCount(containerId string, operationType string)
IncCompletedBackupsCount(containerId string, database string, scheduleId *string, code Ydb.StatusIds_StatusCode)
IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error)
IncScheduleCounters(schedule *types.BackupSchedule, err error)
}

type MetricsRegistryImpl struct {
server *http.Server
reg *prometheus.Registry
cfg config.MetricsServerConfig
clock clockwork.Clock

// api metrics
apiCallsCounter *prometheus.CounterVec
Expand All @@ -54,10 +55,11 @@ type MetricsRegistryImpl struct {
bytesDeletedCounter *prometheus.CounterVec

// operation metrics
operationsDuration *prometheus.HistogramVec
operationsStarted *prometheus.CounterVec
operationsFinished *prometheus.CounterVec
operationsInflight *prometheus.GaugeVec
completedOperationsDuration *prometheus.HistogramVec
inflightOperationsDuration *prometheus.HistogramVec
operationsStarted *prometheus.CounterVec
operationsFinished *prometheus.CounterVec
operationsInflight *prometheus.GaugeVec

// operation processor metrics
handlerRunsCount *prometheus.CounterVec
Expand Down Expand Up @@ -105,6 +107,7 @@ func (s *MetricsRegistryImpl) IncOperationsStartedCounter(operation types.Operat

func (s *MetricsRegistryImpl) ResetOperationsInflight() {
s.operationsInflight.Reset()
s.inflightOperationsDuration.Reset()
}

func (s *MetricsRegistryImpl) ReportOperationInflight(operation types.Operation) {
Expand All @@ -123,13 +126,23 @@ func (s *MetricsRegistryImpl) ReportOperationInflight(operation types.Operation)
operation.GetState().String(),
label,
).Inc()

if operation.GetAudit() != nil && operation.GetAudit().CreatedAt != nil {
duration := s.clock.Now().Sub(operation.GetAudit().CreatedAt.AsTime())
s.inflightOperationsDuration.WithLabelValues(
operation.GetContainerID(),
operation.GetDatabaseName(),
operation.GetType().String(),
operation.GetState().String(),
).Observe(duration.Seconds())
}
}

func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) {
if !types.IsActive(operation) {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
duration := operation.GetAudit().CompletedAt.AsTime().Sub(operation.GetAudit().CreatedAt.AsTime())
s.operationsDuration.WithLabelValues(
s.completedOperationsDuration.WithLabelValues(
operation.GetContainerID(),
operation.GetDatabaseName(),
operation.GetType().String(),
Expand Down Expand Up @@ -173,15 +186,15 @@ func (s *MetricsRegistryImpl) IncCompletedBackupsCount(containerId string, datab
}

if code == Ydb.StatusIds_SUCCESS {
s.backupsSucceededCount.WithLabelValues(containerId, database, scheduleIdLabel).Inc()
s.backupsSucceededCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(1)
s.backupsFailedCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(0)
} else {
s.backupsSucceededCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(0)
s.backupsFailedCount.WithLabelValues(containerId, database, scheduleIdLabel).Inc()
s.backupsFailedCount.WithLabelValues(containerId, database, scheduleIdLabel).Set(1)
}
}

func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) {
func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, err error) {
if err != nil {
s.scheduleActionFailedCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc()
} else {
Expand All @@ -190,20 +203,21 @@ func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule
if schedule.RecoveryPoint != nil {
s.scheduleLastBackupTimestamp.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Set(float64(schedule.RecoveryPoint.Unix()))
}
info := schedule.GetBackupInfo(clock)
info := schedule.GetBackupInfo(s.clock)
if info != nil {
s.scheduleRPOMarginRatio.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Set(info.LastBackupRpoMarginRatio)
}
}

func InitializeMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig) {
GlobalMetricsRegistry = newMetricsRegistry(ctx, wg, cfg)
func InitializeMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig, clock clockwork.Clock) {
GlobalMetricsRegistry = newMetricsRegistry(ctx, wg, cfg, clock)
}

func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig) *MetricsRegistryImpl {
func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.MetricsServerConfig, clock clockwork.Clock) *MetricsRegistryImpl {
s := &MetricsRegistryImpl{
reg: prometheus.NewRegistry(),
cfg: *cfg,
reg: prometheus.NewRegistry(),
cfg: *cfg,
clock: clock,
}

s.apiCallsCounter = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Expand All @@ -224,13 +238,20 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met
Help: "Count of bytes deleted from storage",
}, []string{"container_id", "bucket", "database"})

s.operationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{
s.completedOperationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "operations",
Name: "duration_seconds",
Help: "Duration of operations in seconds",
Help: "Duration of completed operations in seconds",
Buckets: prometheus.ExponentialBuckets(10, 2, 8),
}, []string{"container_id", "database", "type", "status"})

s.inflightOperationsDuration = promauto.With(s.reg).NewHistogramVec(prometheus.HistogramOpts{
Subsystem: "operations",
Name: "inflight_duration_seconds",
Help: "Duration of running operations in seconds",
Buckets: prometheus.ExponentialBuckets(10, 2, 8),
}, []string{"container_id", "database", "type", "state"})

s.operationsStarted = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "operations",
Name: "started_counter",
Expand Down
58 changes: 52 additions & 6 deletions internal/metrics/metrics_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@ package metrics
import (
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"sync"
"ydbcp/internal/types"
)

type MockMetricsRegistry struct {
mutex sync.Mutex
metrics map[string]float64
clock clockwork.Clock
}

func (s *MockMetricsRegistry) IncOperationsStartedCounter(operation types.Operation) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["operations_started_count"]++
}

func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, scheduleId *string, code Ydb.StatusIds_StatusCode) {
s.mutex.Lock()
defer s.mutex.Unlock()
if code == Ydb.StatusIds_SUCCESS {
s.metrics["backups_succeeded_count"]++
} else {
Expand All @@ -27,48 +34,72 @@ func (s *MockMetricsRegistry) GetMetrics() map[string]float64 {
}

func (s *MockMetricsRegistry) IncApiCallsCounter(serviceName string, methodName string, status string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["api_calls_count"]++
}

func (s *MockMetricsRegistry) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["storage_bytes_written"] += float64(bytes)
}

func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["storage_bytes_deleted"] += float64(bytes)
}

func (s *MockMetricsRegistry) ResetOperationsInflight() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["operations_inflight"] = 0
s.metrics["operations_inflight_duration_seconds"] = 0
}
func (s *MockMetricsRegistry) ReportOperationInflight(operation types.Operation) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["operations_inflight"]++
s.metrics["operations_inflight_duration_seconds"]++
}

func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) {
s.mutex.Lock()
defer s.mutex.Unlock()
if !types.IsActive(operation) {
s.metrics["operations_duration_seconds"]++
s.metrics["operations_finished_count"]++
}
}

func (s *MockMetricsRegistry) IncHandlerRunsCount(containerId string, operationType string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["operation_processor_handler_runs_count"]++
}

func (s *MockMetricsRegistry) IncFailedHandlerRunsCount(containerId string, operationType string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["operation_processor_handler_runs_failed_count"]++
}

func (s *MockMetricsRegistry) IncSuccessfulHandlerRunsCount(containerId string, operationType string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["operation_processor_handler_runs_successful_count"]++
}

func (s *MockMetricsRegistry) IncScheduledBackupsCount(schedule *types.BackupSchedule) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.metrics["schedules_launched_take_backup_with_retry_count"]++
}

func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) {
func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if err != nil {
s.metrics["schedules_failed_count"]++
} else {
Expand All @@ -77,22 +108,37 @@ func (s *MockMetricsRegistry) IncScheduleCounters(schedule *types.BackupSchedule
if schedule.RecoveryPoint != nil {
s.metrics["schedules_last_backup_timestamp"] = float64(schedule.RecoveryPoint.Unix())
if schedule.ScheduleSettings.RecoveryPointObjective != nil {
info := schedule.GetBackupInfo(clock)
info := schedule.GetBackupInfo(s.clock)
s.metrics["schedules_recovery_point_objective"] = info.LastBackupRpoMarginRatio
}
}
}

func InitializeMockMetricsRegistry() {
GlobalMetricsRegistry = newMockMetricsRegistry()
type Option func(*MockMetricsRegistry)

func WithClock(clock clockwork.Clock) Option {
return func(s *MockMetricsRegistry) {
s.clock = clock
}
}

func InitializeMockMetricsRegistry(options ...Option) {
GlobalMetricsRegistry = newMockMetricsRegistry(options...)
}

func GetMetrics() map[string]float64 {
return GlobalMetricsRegistry.(*MockMetricsRegistry).GetMetrics()
}

func newMockMetricsRegistry() *MockMetricsRegistry {
return &MockMetricsRegistry{
func newMockMetricsRegistry(options ...Option) *MockMetricsRegistry {
mock := &MockMetricsRegistry{
metrics: make(map[string]float64),
clock: clockwork.NewFakeClock(),
}

for _, option := range options {
option(mock)
}

return mock
}
3 changes: 2 additions & 1 deletion internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metrics
import (
"bytes"
"context"
"github.com/jonboulle/clockwork"
"io"
"net/http"
"strconv"
Expand All @@ -24,7 +25,7 @@ func TestMetricsCount(t *testing.T) {
BindPort: 8080,
BindAddress: "127.0.0.1",
}
InitializeMetricsRegistry(ctx, &wg, cfg)
InitializeMetricsRegistry(ctx, &wg, cfg, clockwork.NewFakeClock())

s := GlobalMetricsRegistry.(*MetricsRegistryImpl)
s.apiCallsCounter.WithLabelValues("test_service", "test_method", "test_status").Add(123)
Expand Down
Loading

0 comments on commit 4332f1c

Please sign in to comment.