From 02a321e98e4ee6eb878207b96e8a186601f798b6 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Fri, 6 Dec 2024 17:04:26 +0100 Subject: [PATCH] metrics for started/finished operations --- cmd/integration/orm/main.go | 2 + internal/connectors/db/connector.go | 6 ++ internal/connectors/db/mock.go | 2 + internal/connectors/db/yql/queries/write.go | 7 +++ .../connectors/db/yql/queries/write_mock.go | 8 +++ internal/handlers/schedule_backup.go | 3 - internal/handlers/take_backup_retry.go | 8 ++- internal/handlers/take_backup_retry_test.go | 8 +-- internal/metrics/metrics.go | 60 ++++++++++++------- internal/metrics/metrics_moc.go | 8 ++- .../schedule_watcher/schedule_watcher_test.go | 8 +-- .../watchers/ttl_watcher/ttl_watcher_test.go | 2 + 12 files changed, 84 insertions(+), 38 deletions(-) diff --git a/cmd/integration/orm/main.go b/cmd/integration/orm/main.go index 366e2a54..0254b297 100644 --- a/cmd/integration/orm/main.go +++ b/cmd/integration/orm/main.go @@ -13,6 +13,7 @@ import ( "ydbcp/internal/config" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" ) @@ -129,6 +130,7 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati } func main() { + metrics.InitializeMockMetricsRegistry() ctx := context.Background() conn := common.CreateGRPCClient(ydbcpEndpoint) defer func(conn *grpc.ClientConn) { diff --git a/internal/connectors/db/connector.go b/internal/connectors/db/connector.go index d491458e..5b67cf9c 100644 --- a/internal/connectors/db/connector.go +++ b/internal/connectors/db/connector.go @@ -7,6 +7,7 @@ import ( "time" "ydbcp/internal/config" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" @@ -268,6 +269,11 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W xlog.Error(ctx, "Error executing query", zap.Error(err)) return err } + if ops := queryBuilder.GetOperations(); len(ops) > 0 { + for _, op := range ops { + metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(op) + } + } return nil } diff --git a/internal/connectors/db/mock.go b/internal/connectors/db/mock.go index 451cbf18..112965cd 100644 --- a/internal/connectors/db/mock.go +++ b/internal/connectors/db/mock.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "ydbcp/internal/metrics" "ydbcp/internal/connectors/db/yql/queries" "ydbcp/internal/types" @@ -248,6 +249,7 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries. queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock) if queryBuilderMock.Operation != nil { c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation + metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(*queryBuilderMock.Operation) } if queryBuilderMock.Backup != nil { c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup diff --git a/internal/connectors/db/yql/queries/write.go b/internal/connectors/db/yql/queries/write.go index 5e3b2b6f..62adaf8f 100644 --- a/internal/connectors/db/yql/queries/write.go +++ b/internal/connectors/db/yql/queries/write.go @@ -16,6 +16,7 @@ import ( type WriteTableQuery interface { FormatQuery(ctx context.Context) (*FormatQueryResult, error) + GetOperations() []types.Operation WithCreateBackup(backup types.Backup) WriteTableQuery WithCreateOperation(operation types.Operation) WriteTableQuery WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery @@ -26,6 +27,7 @@ type WriteTableQuery interface { type WriteTableQueryImpl struct { tableQueries []WriteSingleTableQueryImpl + operations []types.Operation } type WriteSingleTableQueryImpl struct { @@ -425,6 +427,10 @@ func NewWriteTableQuery() WriteTableQuery { return &WriteTableQueryImpl{} } +func (d *WriteTableQueryImpl) GetOperations() []types.Operation { + return d.operations +} + func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery { index := len(d.tableQueries) d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index)) @@ -446,6 +452,7 @@ func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) Wri func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery { index := len(d.tableQueries) d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index)) + d.operations = append(d.operations, operation) return d } diff --git a/internal/connectors/db/yql/queries/write_mock.go b/internal/connectors/db/yql/queries/write_mock.go index 14f0c3c5..2797e374 100644 --- a/internal/connectors/db/yql/queries/write_mock.go +++ b/internal/connectors/db/yql/queries/write_mock.go @@ -18,6 +18,14 @@ func NewWriteTableQueryMock() WriteTableQuery { return &WriteTableQueryMock{} } +func (w *WriteTableQueryMock) GetOperations() []types.Operation { + if w.Operation == nil { + return nil + } else { + return []types.Operation{*w.Operation} + } +} + func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) { return &FormatQueryResult{}, nil } diff --git a/internal/handlers/schedule_backup.go b/internal/handlers/schedule_backup.go index bba32b6b..50047f8f 100644 --- a/internal/handlers/schedule_backup.go +++ b/internal/handlers/schedule_backup.go @@ -9,7 +9,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" - "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/xlog" pb "ydbcp/pkg/proto/ydbcp/v1alpha1" @@ -81,8 +80,6 @@ func BackupScheduleHandler( zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()), ) - metrics.GlobalMetricsRegistry.IncScheduledBackupsCount(schedule) - err = schedule.UpdateNextLaunch(clock.Now()) if err != nil { return err diff --git a/internal/handlers/take_backup_retry.go b/internal/handlers/take_backup_retry.go index a94b9096..08a37b66 100644 --- a/internal/handlers/take_backup_retry.go +++ b/internal/handlers/take_backup_retry.go @@ -199,10 +199,16 @@ func TBWROperationHandler( return err } if do != Error { + fields := []zap.Field{ + zap.String("decision", do.String()), + } + if len(ops) > 0 { + fields = append(fields, zap.String("TBOperationID", ops[len(ops)-1].GetID())) + } xlog.Info( ctx, "TBWROperationHandler", - zap.String("decision", do.String()), + fields..., ) } switch do { diff --git a/internal/handlers/take_backup_retry_test.go b/internal/handlers/take_backup_retry_test.go index f3833fe4..7d11690f 100644 --- a/internal/handlers/take_backup_retry_test.go +++ b/internal/handlers/take_backup_retry_test.go @@ -254,7 +254,7 @@ func TestTBWRHandlerSuccess(t *testing.T) { assert.NotEmpty(t, op) assert.Equal(t, types.OperationStateDone, op.GetState()) assert.Equal(t, float64(1), metrics.GetMetrics()["operations_duration_seconds"]) - assert.Equal(t, float64(1), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) + assert.Equal(t, float64(1), metrics.GetMetrics()["operations_finished_count"]) } func TestTBWRHandlerSkipRunning(t *testing.T) { @@ -321,7 +321,7 @@ func TestTBWRHandlerSkipRunning(t *testing.T) { assert.Empty(t, err) assert.Equal(t, 3, len(operations)) assert.Equal(t, float64(0), metrics.GetMetrics()["operations_duration_seconds"]) - assert.Equal(t, float64(0), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) + assert.Equal(t, float64(0), metrics.GetMetrics()["operations_finished_count"]) } func TestTBWRHandlerSkipError(t *testing.T) { @@ -441,7 +441,7 @@ func TestTBWRHandlerError(t *testing.T) { assert.Equal(t, types.OperationStateError, op.GetState()) assert.Equal(t, fmt.Sprintf("retry attempts exceeded limit: 1. Launched operations %s", ops[0].GetID()), op.GetMessage()) assert.Equal(t, float64(1), metrics.GetMetrics()["operations_duration_seconds"]) - assert.Equal(t, float64(1), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) + assert.Equal(t, float64(1), metrics.GetMetrics()["operations_finished_count"]) } @@ -661,5 +661,5 @@ func TestTBWRHandlerFullCancel(t *testing.T) { assert.Equal(t, types.OperationStateCancelled, tbwr.State) assert.Equal(t, "Success", tbwr.Message) assert.Equal(t, float64(1), metrics.GetMetrics()["operations_duration_seconds"]) - assert.Equal(t, float64(1), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"]) + assert.Equal(t, float64(1), metrics.GetMetrics()["operations_finished_count"]) } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 4a225748..08b67f57 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -30,12 +30,12 @@ type MetricsRegistry interface { IncApiCallsCounter(serviceName string, methodName string, status string) IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64) IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64) + IncOperationsStartedCounter(operation types.Operation) ReportOperationMetrics(operation types.Operation) IncHandlerRunsCount(containerId string, operationType string) IncFailedHandlerRunsCount(containerId string, operationType string) IncSuccessfulHandlerRunsCount(containerId string, operationType string) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) - IncScheduledBackupsCount(schedule *types.BackupSchedule) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) } @@ -53,6 +53,8 @@ type MetricsRegistryImpl struct { // operation metrics operationsDuration *prometheus.HistogramVec + operationsStarted *prometheus.CounterVec + operationsFinished *prometheus.CounterVec // operation processor metrics handlerRunsCount *prometheus.CounterVec @@ -66,8 +68,6 @@ type MetricsRegistryImpl struct { // schedule metrics scheduleActionFailedCount *prometheus.CounterVec scheduleActionSucceededCount *prometheus.CounterVec - scheduleLaunchedTBWRCount *prometheus.CounterVec - scheduleFinishedTBWRCount *prometheus.CounterVec scheduleLastBackupTimestamp *prometheus.GaugeVec scheduleRPOMarginRatio *prometheus.GaugeVec } @@ -84,6 +84,22 @@ func (s *MetricsRegistryImpl) IncBytesDeletedCounter(containerId string, bucket s.bytesDeletedCounter.WithLabelValues(containerId, bucket, database).Add(float64(bytes)) } +func (s *MetricsRegistryImpl) IncOperationsStartedCounter(operation types.Operation) { + label := NO_SCHEDULE_ID_LABEL + if operation.GetType() == types.OperationTypeTBWR { + tbwr := operation.(*types.TakeBackupWithRetryOperation) + if tbwr.ScheduleID != nil { + label = *tbwr.ScheduleID + } + } + s.operationsStarted.WithLabelValues( + operation.GetContainerID(), + operation.GetDatabaseName(), + operation.GetType().String(), + label, + ).Inc() +} + func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) { if !types.IsActive(operation) { if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil { @@ -95,16 +111,18 @@ func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) operation.GetState().String(), ).Observe(duration.Seconds()) } + + label := NO_SCHEDULE_ID_LABEL if operation.GetType() == types.OperationTypeTBWR { tbwr := operation.(*types.TakeBackupWithRetryOperation) - label := NO_SCHEDULE_ID_LABEL if tbwr.ScheduleID != nil { label = *tbwr.ScheduleID } - s.scheduleFinishedTBWRCount.WithLabelValues( - operation.GetContainerID(), operation.GetDatabaseName(), label, operation.GetState().String(), - ).Inc() } + + s.operationsFinished.WithLabelValues( + operation.GetContainerID(), operation.GetDatabaseName(), operation.GetType().String(), operation.GetState().String(), label, + ).Inc() } } @@ -128,10 +146,6 @@ func (s *MetricsRegistryImpl) IncCompletedBackupsCount(containerId string, datab } } -func (s *MetricsRegistryImpl) IncScheduledBackupsCount(schedule *types.BackupSchedule) { - s.scheduleLaunchedTBWRCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc() -} - func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) { if err != nil { s.scheduleActionFailedCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc() @@ -182,6 +196,18 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met Buckets: prometheus.ExponentialBuckets(10, 2, 8), }, []string{"container_id", "database", "type", "status"}) + s.operationsStarted = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "operations", + Name: "started_counter", + Help: "Total count of started operations", + }, []string{"container_id", "database", "type", "schedule_id"}) + + s.operationsFinished = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ + Subsystem: "operations", + Name: "finished_counter", + Help: "Total count of finished operations", + }, []string{"container_id", "database", "type", "status", "schedule_id"}) + s.handlerRunsCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ Subsystem: "operation_processor", Name: "handler_runs_count", @@ -224,18 +250,6 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met Help: "Total count of successful scheduled backup runs", }, []string{"container_id", "database", "schedule_id"}) - s.scheduleFinishedTBWRCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ - Subsystem: "schedules", - Name: "finished_take_backup_with_retry_count", - Help: "Total count of TakeBackupWithRetry operations finished for this schedule", - }, []string{"container_id", "database", "schedule_id", "status"}) - - s.scheduleLaunchedTBWRCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{ - Subsystem: "schedules", - Name: "launched_take_backup_with_retry_count", - Help: "Total count of TakeBackupWithRetry operations launched by this schedule", - }, []string{"container_id", "database", "schedule_id"}) - s.scheduleLastBackupTimestamp = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{ Subsystem: "schedules", Name: "last_backup_timestamp", diff --git a/internal/metrics/metrics_moc.go b/internal/metrics/metrics_moc.go index 38d36054..af470d58 100644 --- a/internal/metrics/metrics_moc.go +++ b/internal/metrics/metrics_moc.go @@ -10,6 +10,10 @@ type MockMetricsRegistry struct { metrics map[string]float64 } +func (s *MockMetricsRegistry) IncOperationsStartedCounter(operation types.Operation) { + s.metrics["operations_started_count"]++ +} + func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) { if code == Ydb.StatusIds_SUCCESS { s.metrics["backups_succeeded_count"]++ @@ -37,9 +41,7 @@ func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) { if !types.IsActive(operation) { s.metrics["operations_duration_seconds"]++ - if operation.GetType() == types.OperationTypeTBWR { - s.metrics["schedule_finished_take_backup_with_retry_count"]++ - } + s.metrics["operations_finished_count"]++ } } diff --git a/internal/watchers/schedule_watcher/schedule_watcher_test.go b/internal/watchers/schedule_watcher/schedule_watcher_test.go index 4c11d675..41f4a5ee 100644 --- a/internal/watchers/schedule_watcher/schedule_watcher_test.go +++ b/internal/watchers/schedule_watcher/schedule_watcher_test.go @@ -118,7 +118,7 @@ func TestScheduleWatcherSimple(t *testing.T) { assert.Equal(t, *schedules[0].NextLaunch, now.Add(time.Minute)) assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_succeeded_count"]) - assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) + assert.Equal(t, float64(1), metrics.GetMetrics()["operations_started_count"]) } func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { @@ -237,7 +237,7 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) { assert.Equal(t, *schedules[0].NextLaunch, m[schedules[0].ID]) assert.Equal(t, *schedules[1].NextLaunch, m[schedules[1].ID]) assert.Equal(t, float64(2), metrics.GetMetrics()["schedules_succeeded_count"]) - assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) + assert.Equal(t, float64(1), metrics.GetMetrics()["operations_started_count"]) } func TestScheduleWatcherTwoBackups(t *testing.T) { @@ -360,7 +360,7 @@ func TestScheduleWatcherTwoBackups(t *testing.T) { assert.Equal(t, *schedules[0].NextLaunch, m[schedules[0].ID]) assert.Equal(t, *schedules[1].NextLaunch, m[schedules[1].ID]) assert.Equal(t, float64(2), metrics.GetMetrics()["schedules_succeeded_count"]) - assert.Equal(t, float64(2), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) + assert.Equal(t, float64(2), metrics.GetMetrics()["operations_started_count"]) } func TestAllScheduleMetrics(t *testing.T) { @@ -469,7 +469,7 @@ func TestAllScheduleMetrics(t *testing.T) { assert.Equal(t, len(schedules), 1) assert.Equal(t, m[schedules[0].ID], *schedules[0].NextLaunch) assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_succeeded_count"]) - assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"]) + assert.Equal(t, float64(1), metrics.GetMetrics()["operations_started_count"]) assert.Equal(t, float64(schedules[0].RecoveryPoint.Unix()), metrics.GetMetrics()["schedules_last_backup_timestamp"]) assert.Equal(t, 0.5166666666666667, metrics.GetMetrics()["schedules_recovery_point_objective"]) } diff --git a/internal/watchers/ttl_watcher/ttl_watcher_test.go b/internal/watchers/ttl_watcher/ttl_watcher_test.go index 33f83531..89f96ce3 100644 --- a/internal/watchers/ttl_watcher/ttl_watcher_test.go +++ b/internal/watchers/ttl_watcher/ttl_watcher_test.go @@ -9,12 +9,14 @@ import ( "time" "ydbcp/internal/connectors/db" "ydbcp/internal/connectors/db/yql/queries" + "ydbcp/internal/metrics" "ydbcp/internal/types" "ydbcp/internal/util/ticker" "ydbcp/internal/watchers" ) func TestTtlWatcher(t *testing.T) { + metrics.InitializeMockMetricsRegistry() var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) defer cancel()