Skip to content

Commit

Permalink
add operations inflight metric
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Dec 10, 2024
1 parent 02a321e commit e14d0d6
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 0 deletions.
32 changes: 32 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type MetricsRegistry interface {
IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64)
IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64)
IncOperationsStartedCounter(operation types.Operation)
ResetOperationsInflight()
ReportOperationInflight(operation types.Operation)
ReportOperationMetrics(operation types.Operation)
IncHandlerRunsCount(containerId string, operationType string)
IncFailedHandlerRunsCount(containerId string, operationType string)
Expand All @@ -55,6 +57,7 @@ type MetricsRegistryImpl struct {
operationsDuration *prometheus.HistogramVec
operationsStarted *prometheus.CounterVec
operationsFinished *prometheus.CounterVec
operationsInflight *prometheus.GaugeVec

// operation processor metrics
handlerRunsCount *prometheus.CounterVec
Expand Down Expand Up @@ -100,6 +103,28 @@ func (s *MetricsRegistryImpl) IncOperationsStartedCounter(operation types.Operat
).Inc()
}

func (s *MetricsRegistryImpl) ResetOperationsInflight() {
s.operationsInflight.Reset()
}

func (s *MetricsRegistryImpl) ReportOperationInflight(operation types.Operation) {
label := NO_SCHEDULE_ID_LABEL
if operation.GetType() == types.OperationTypeTBWR {
tbwr := operation.(*types.TakeBackupWithRetryOperation)
if tbwr.ScheduleID != nil {
label = *tbwr.ScheduleID
}
}

s.operationsInflight.WithLabelValues(
operation.GetContainerID(),
operation.GetDatabaseName(),
operation.GetType().String(),
operation.GetState().String(),
label,
).Inc()
}

func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) {
if !types.IsActive(operation) {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
Expand All @@ -123,6 +148,7 @@ func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation)
s.operationsFinished.WithLabelValues(
operation.GetContainerID(), operation.GetDatabaseName(), operation.GetType().String(), operation.GetState().String(), label,
).Inc()

}
}

Expand Down Expand Up @@ -208,6 +234,12 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met
Help: "Total count of finished operations",
}, []string{"container_id", "database", "type", "status", "schedule_id"})

s.operationsInflight = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "operations",
Name: "inflight",
Help: "Total count of active operations",
}, []string{"container_id", "database", "type", "status", "schedule_id"})

s.handlerRunsCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "operation_processor",
Name: "handler_runs_count",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket
s.metrics["storage_bytes_deleted"] += float64(bytes)
}

func (s *MockMetricsRegistry) ResetOperationsInflight() {
s.metrics["operations_inflight"] = 0
}
func (s *MockMetricsRegistry) ReportOperationInflight(operation types.Operation) {
s.metrics["operations_inflight"]++
}

func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) {
if !types.IsActive(operation) {
s.metrics["operations_duration_seconds"]++
Expand Down
3 changes: 3 additions & 0 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (o *OperationProcessorImpl) processOperations() {
xlog.Error(ctx, "cannot get Active Operations", zap.Error(err))
return
}
metrics.GlobalMetricsRegistry.ResetOperationsInflight()
for _, op := range operations {
o.processOperation(op)
}
Expand All @@ -144,6 +145,8 @@ func (o *OperationProcessorImpl) processOperation(op types.Operation) {
return
}
metrics.GlobalMetricsRegistry.IncHandlerRunsCount(op.GetContainerID(), op.GetType().String())
metrics.GlobalMetricsRegistry.ReportOperationInflight(op)

o.runningOperations[op.GetID()] = true
o.workersWaitGroup.Add(1)
go func() {
Expand Down
3 changes: 3 additions & 0 deletions internal/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,7 @@ func TestProcessor(t *testing.T) {
op, err := db.GetOperation(ctx, opID)
assert.Empty(t, err)
assert.Equal(t, op.GetState(), types.OperationStateDone, "operation state should be Done")
val, ok := metrics.GetMetrics()["operations_inflight"]
assert.True(t, ok) // to show that it has been incremented
assert.Equal(t, float64(0), val) //to show it has been reset to 0
}

0 comments on commit e14d0d6

Please sign in to comment.