diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 08b67f57..679a6acb 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -31,6 +31,8 @@ type MetricsRegistry interface { IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) IncOperationsStartedCounter(operation types.Operation) + ResetOperationsInflight() + ReportOperationInflight(operation types.Operation) ReportOperationMetrics(operation types.Operation) IncHandlerRunsCount(containerId string, operationType string) IncFailedHandlerRunsCount(containerId string, operationType string) @@ -55,6 +57,7 @@ type MetricsRegistryImpl struct { operationsDuration *prometheus.HistogramVec operationsStarted *prometheus.CounterVec operationsFinished *prometheus.CounterVec + operationsInflight *prometheus.GaugeVec // operation processor metrics handlerRunsCount *prometheus.CounterVec @@ -100,6 +103,28 @@ func (s *MetricsRegistryImpl) IncOperationsStartedCounter(operation types.Operat ).Inc() } +func (s *MetricsRegistryImpl) ResetOperationsInflight() { + s.operationsInflight.Reset() +} + +func (s *MetricsRegistryImpl) ReportOperationInflight(operation types.Operation) { + label := NO_SCHEDULE_ID_LABEL + if operation.GetType() == types.OperationTypeTBWR { + tbwr := operation.(*types.TakeBackupWithRetryOperation) + if tbwr.ScheduleID != nil { + label = *tbwr.ScheduleID + } + } + + s.operationsInflight.WithLabelValues( + operation.GetContainerID(), + operation.GetDatabaseName(), + operation.GetType().String(), + operation.GetState().String(), + label, + ).Inc() +} + func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) { if !types.IsActive(operation) { if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { @@ -123,6 +148,7 @@ func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) s.operationsFinished.WithLabelValues( operation.GetContainerID(), operation.GetDatabaseName(), operation.GetType().String(), operation.GetState().String(), label, ).Inc() + } } @@ -208,6 +234,12 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met Help: "Total count of finished operations", }, []string{"container_id", "database", "type", "status", "schedule_id"}) + s.operationsInflight = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{ + Subsystem: "operations", + Name: "inflight", + Help: "Total count of active operations", + }, []string{"container_id", "database", "type", "status", "schedule_id"}) + s.handlerRunsCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ Subsystem: "operation_processor", Name: "handler_runs_count", diff --git a/internal/metrics/metrics_moc.go b/internal/metrics/metrics_mock.go similarity index 92% rename from internal/metrics/metrics_moc.go rename to internal/metrics/metrics_mock.go index af470d58..30684225 100644 --- a/internal/metrics/metrics_moc.go +++ b/internal/metrics/metrics_mock.go @@ -38,6 +38,13 @@ func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket s.metrics["storage_bytes_deleted"] += float64(bytes) } +func (s *MockMetricsRegistry) ResetOperationsInflight() { + s.metrics["operations_inflight"] = 0 +} +func (s *MockMetricsRegistry) ReportOperationInflight(operation types.Operation) { + s.metrics["operations_inflight"]++ +} + func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) { if !types.IsActive(operation) { s.metrics["operations_duration_seconds"]++ diff --git a/internal/processor/processor.go b/internal/processor/processor.go index ba3923ea..4b16282e 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -121,6 +121,7 @@ func (o *OperationProcessorImpl) processOperations() { xlog.Error(ctx, "cannot get Active Operations", zap.Error(err)) return } + metrics.GlobalMetricsRegistry.ResetOperationsInflight() for _, op := range operations { o.processOperation(op) } @@ -144,6 +145,8 @@ func (o *OperationProcessorImpl) processOperation(op types.Operation) { return } metrics.GlobalMetricsRegistry.IncHandlerRunsCount(op.GetContainerID(), op.GetType().String()) + metrics.GlobalMetricsRegistry.ReportOperationInflight(op) + o.runningOperations[op.GetID()] = true o.workersWaitGroup.Add(1) go func() { diff --git a/internal/processor/processor_test.go b/internal/processor/processor_test.go index 7a2b5641..8a221286 100644 --- a/internal/processor/processor_test.go +++ b/internal/processor/processor_test.go @@ -99,4 +99,7 @@ func TestProcessor(t *testing.T) { op, err := db.GetOperation(ctx, opID) assert.Empty(t, err) assert.Equal(t, op.GetState(), types.OperationStateDone, "operation state should be Done") + val, ok := metrics.GetMetrics()["operations_inflight"] + assert.True(t, ok) // to show that it has been incremented + assert.Equal(t, float64(0), val) //to show it has been reset to 0 }