Skip to content

Commit

Permalink
metrics for started/finished operations
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Dec 6, 2024
1 parent a0c5e18 commit 02a321e
Show file tree
Hide file tree
Showing 12 changed files with 84 additions and 38 deletions.
2 changes: 2 additions & 0 deletions cmd/integration/orm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
)
Expand Down Expand Up @@ -129,6 +130,7 @@ func TestTBWROperationORM(ctx context.Context, ydbConn *db.YdbConnector, operati
}

func main() {
metrics.InitializeMockMetricsRegistry()
ctx := context.Background()
conn := common.CreateGRPCClient(ydbcpEndpoint)
defer func(conn *grpc.ClientConn) {
Expand Down
6 changes: 6 additions & 0 deletions internal/connectors/db/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"

Expand Down Expand Up @@ -268,6 +269,11 @@ func (d *YdbConnector) ExecuteUpsert(ctx context.Context, queryBuilder queries.W
xlog.Error(ctx, "Error executing query", zap.Error(err))
return err
}
if ops := queryBuilder.GetOperations(); len(ops) > 0 {
for _, op := range ops {
metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(op)
}
}
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions internal/connectors/db/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"ydbcp/internal/metrics"

"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
Expand Down Expand Up @@ -248,6 +249,7 @@ func (c *MockDBConnector) ExecuteUpsert(_ context.Context, queryBuilder queries.
queryBuilderMock := queryBuilder.(*queries.WriteTableQueryMock)
if queryBuilderMock.Operation != nil {
c.operations[(*queryBuilderMock.Operation).GetID()] = *queryBuilderMock.Operation
metrics.GlobalMetricsRegistry.IncOperationsStartedCounter(*queryBuilderMock.Operation)
}
if queryBuilderMock.Backup != nil {
c.backups[queryBuilderMock.Backup.ID] = *queryBuilderMock.Backup
Expand Down
7 changes: 7 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type WriteTableQuery interface {
FormatQuery(ctx context.Context) (*FormatQueryResult, error)
GetOperations() []types.Operation
WithCreateBackup(backup types.Backup) WriteTableQuery
WithCreateOperation(operation types.Operation) WriteTableQuery
WithCreateBackupSchedule(schedule types.BackupSchedule) WriteTableQuery
Expand All @@ -26,6 +27,7 @@ type WriteTableQuery interface {

type WriteTableQueryImpl struct {
tableQueries []WriteSingleTableQueryImpl
operations []types.Operation
}

type WriteSingleTableQueryImpl struct {
Expand Down Expand Up @@ -425,6 +427,10 @@ func NewWriteTableQuery() WriteTableQuery {
return &WriteTableQueryImpl{}
}

func (d *WriteTableQueryImpl) GetOperations() []types.Operation {
return d.operations
}

func (d *WriteTableQueryImpl) WithCreateBackup(backup types.Backup) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateBackupQuery(backup, index))
Expand All @@ -446,6 +452,7 @@ func (d *WriteTableQueryImpl) WithUpdateOperation(operation types.Operation) Wri
func (d *WriteTableQueryImpl) WithCreateOperation(operation types.Operation) WriteTableQuery {
index := len(d.tableQueries)
d.tableQueries = append(d.tableQueries, BuildCreateOperationQuery(operation, index))
d.operations = append(d.operations, operation)
return d
}

Expand Down
8 changes: 8 additions & 0 deletions internal/connectors/db/yql/queries/write_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ func NewWriteTableQueryMock() WriteTableQuery {
return &WriteTableQueryMock{}
}

func (w *WriteTableQueryMock) GetOperations() []types.Operation {
if w.Operation == nil {
return nil
} else {
return []types.Operation{*w.Operation}
}
}

func (w *WriteTableQueryMock) FormatQuery(_ context.Context) (*FormatQueryResult, error) {
return &FormatQueryResult{}, nil
}
Expand Down
3 changes: 0 additions & 3 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/xlog"
pb "ydbcp/pkg/proto/ydbcp/v1alpha1"
Expand Down Expand Up @@ -81,8 +80,6 @@ func BackupScheduleHandler(
zap.String("TakeBackupWithRetryOperation", tbwr.Proto().String()),
)

metrics.GlobalMetricsRegistry.IncScheduledBackupsCount(schedule)

err = schedule.UpdateNextLaunch(clock.Now())
if err != nil {
return err
Expand Down
8 changes: 7 additions & 1 deletion internal/handlers/take_backup_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,16 @@ func TBWROperationHandler(
return err
}
if do != Error {
fields := []zap.Field{
zap.String("decision", do.String()),
}
if len(ops) > 0 {
fields = append(fields, zap.String("TBOperationID", ops[len(ops)-1].GetID()))
}
xlog.Info(
ctx,
"TBWROperationHandler",
zap.String("decision", do.String()),
fields...,
)
}
switch do {
Expand Down
8 changes: 4 additions & 4 deletions internal/handlers/take_backup_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestTBWRHandlerSuccess(t *testing.T) {
assert.NotEmpty(t, op)
assert.Equal(t, types.OperationStateDone, op.GetState())
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(1), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_finished_count"])
}

func TestTBWRHandlerSkipRunning(t *testing.T) {
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestTBWRHandlerSkipRunning(t *testing.T) {
assert.Empty(t, err)
assert.Equal(t, 3, len(operations))
assert.Equal(t, float64(0), metrics.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(0), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
assert.Equal(t, float64(0), metrics.GetMetrics()["operations_finished_count"])
}

func TestTBWRHandlerSkipError(t *testing.T) {
Expand Down Expand Up @@ -441,7 +441,7 @@ func TestTBWRHandlerError(t *testing.T) {
assert.Equal(t, types.OperationStateError, op.GetState())
assert.Equal(t, fmt.Sprintf("retry attempts exceeded limit: 1. Launched operations %s", ops[0].GetID()), op.GetMessage())
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(1), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_finished_count"])

}

Expand Down Expand Up @@ -661,5 +661,5 @@ func TestTBWRHandlerFullCancel(t *testing.T) {
assert.Equal(t, types.OperationStateCancelled, tbwr.State)
assert.Equal(t, "Success", tbwr.Message)
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_duration_seconds"])
assert.Equal(t, float64(1), metrics.GetMetrics()["schedule_finished_take_backup_with_retry_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_finished_count"])
}
60 changes: 37 additions & 23 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type MetricsRegistry interface {
IncApiCallsCounter(serviceName string, methodName string, status string)
IncBytesWrittenCounter(containerId string, bucket string, database string, bytes int64)
IncBytesDeletedCounter(containerId string, bucket string, database string, bytes int64)
IncOperationsStartedCounter(operation types.Operation)
ReportOperationMetrics(operation types.Operation)
IncHandlerRunsCount(containerId string, operationType string)
IncFailedHandlerRunsCount(containerId string, operationType string)
IncSuccessfulHandlerRunsCount(containerId string, operationType string)
IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode)
IncScheduledBackupsCount(schedule *types.BackupSchedule)
IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error)
}

Expand All @@ -53,6 +53,8 @@ type MetricsRegistryImpl struct {

// operation metrics
operationsDuration *prometheus.HistogramVec
operationsStarted *prometheus.CounterVec
operationsFinished *prometheus.CounterVec

// operation processor metrics
handlerRunsCount *prometheus.CounterVec
Expand All @@ -66,8 +68,6 @@ type MetricsRegistryImpl struct {
// schedule metrics
scheduleActionFailedCount *prometheus.CounterVec
scheduleActionSucceededCount *prometheus.CounterVec
scheduleLaunchedTBWRCount *prometheus.CounterVec
scheduleFinishedTBWRCount *prometheus.CounterVec
scheduleLastBackupTimestamp *prometheus.GaugeVec
scheduleRPOMarginRatio *prometheus.GaugeVec
}
Expand All @@ -84,6 +84,22 @@ func (s *MetricsRegistryImpl) IncBytesDeletedCounter(containerId string, bucket
s.bytesDeletedCounter.WithLabelValues(containerId, bucket, database).Add(float64(bytes))
}

func (s *MetricsRegistryImpl) IncOperationsStartedCounter(operation types.Operation) {
label := NO_SCHEDULE_ID_LABEL
if operation.GetType() == types.OperationTypeTBWR {
tbwr := operation.(*types.TakeBackupWithRetryOperation)
if tbwr.ScheduleID != nil {
label = *tbwr.ScheduleID
}
}
s.operationsStarted.WithLabelValues(
operation.GetContainerID(),
operation.GetDatabaseName(),
operation.GetType().String(),
label,
).Inc()
}

func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation) {
if !types.IsActive(operation) {
if operation.GetAudit() != nil && operation.GetAudit().CompletedAt != nil {
Expand All @@ -95,16 +111,18 @@ func (s *MetricsRegistryImpl) ReportOperationMetrics(operation types.Operation)
operation.GetState().String(),
).Observe(duration.Seconds())
}

label := NO_SCHEDULE_ID_LABEL
if operation.GetType() == types.OperationTypeTBWR {
tbwr := operation.(*types.TakeBackupWithRetryOperation)
label := NO_SCHEDULE_ID_LABEL
if tbwr.ScheduleID != nil {
label = *tbwr.ScheduleID
}
s.scheduleFinishedTBWRCount.WithLabelValues(
operation.GetContainerID(), operation.GetDatabaseName(), label, operation.GetState().String(),
).Inc()
}

s.operationsFinished.WithLabelValues(
operation.GetContainerID(), operation.GetDatabaseName(), operation.GetType().String(), operation.GetState().String(), label,
).Inc()
}
}

Expand All @@ -128,10 +146,6 @@ func (s *MetricsRegistryImpl) IncCompletedBackupsCount(containerId string, datab
}
}

func (s *MetricsRegistryImpl) IncScheduledBackupsCount(schedule *types.BackupSchedule) {
s.scheduleLaunchedTBWRCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc()
}

func (s *MetricsRegistryImpl) IncScheduleCounters(schedule *types.BackupSchedule, clock clockwork.Clock, err error) {
if err != nil {
s.scheduleActionFailedCount.WithLabelValues(schedule.ContainerID, schedule.DatabaseName, schedule.ID).Inc()
Expand Down Expand Up @@ -182,6 +196,18 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met
Buckets: prometheus.ExponentialBuckets(10, 2, 8),
}, []string{"container_id", "database", "type", "status"})

s.operationsStarted = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "operations",
Name: "started_counter",
Help: "Total count of started operations",
}, []string{"container_id", "database", "type", "schedule_id"})

s.operationsFinished = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "operations",
Name: "finished_counter",
Help: "Total count of finished operations",
}, []string{"container_id", "database", "type", "status", "schedule_id"})

s.handlerRunsCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "operation_processor",
Name: "handler_runs_count",
Expand Down Expand Up @@ -224,18 +250,6 @@ func newMetricsRegistry(ctx context.Context, wg *sync.WaitGroup, cfg *config.Met
Help: "Total count of successful scheduled backup runs",
}, []string{"container_id", "database", "schedule_id"})

s.scheduleFinishedTBWRCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "schedules",
Name: "finished_take_backup_with_retry_count",
Help: "Total count of TakeBackupWithRetry operations finished for this schedule",
}, []string{"container_id", "database", "schedule_id", "status"})

s.scheduleLaunchedTBWRCount = promauto.With(s.reg).NewCounterVec(prometheus.CounterOpts{
Subsystem: "schedules",
Name: "launched_take_backup_with_retry_count",
Help: "Total count of TakeBackupWithRetry operations launched by this schedule",
}, []string{"container_id", "database", "schedule_id"})

s.scheduleLastBackupTimestamp = promauto.With(s.reg).NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "schedules",
Name: "last_backup_timestamp",
Expand Down
8 changes: 5 additions & 3 deletions internal/metrics/metrics_moc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ type MockMetricsRegistry struct {
metrics map[string]float64
}

func (s *MockMetricsRegistry) IncOperationsStartedCounter(operation types.Operation) {
s.metrics["operations_started_count"]++
}

func (s *MockMetricsRegistry) IncCompletedBackupsCount(containerId string, database string, code Ydb.StatusIds_StatusCode) {
if code == Ydb.StatusIds_SUCCESS {
s.metrics["backups_succeeded_count"]++
Expand Down Expand Up @@ -37,9 +41,7 @@ func (s *MockMetricsRegistry) IncBytesDeletedCounter(containerId string, bucket
func (s *MockMetricsRegistry) ReportOperationMetrics(operation types.Operation) {
if !types.IsActive(operation) {
s.metrics["operations_duration_seconds"]++
if operation.GetType() == types.OperationTypeTBWR {
s.metrics["schedule_finished_take_backup_with_retry_count"]++
}
s.metrics["operations_finished_count"]++
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/watchers/schedule_watcher/schedule_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestScheduleWatcherSimple(t *testing.T) {
assert.Equal(t, *schedules[0].NextLaunch, now.Add(time.Minute))

assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_succeeded_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_started_count"])
}

func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) {
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestScheduleWatcherTwoSchedulesOneBackup(t *testing.T) {
assert.Equal(t, *schedules[0].NextLaunch, m[schedules[0].ID])
assert.Equal(t, *schedules[1].NextLaunch, m[schedules[1].ID])
assert.Equal(t, float64(2), metrics.GetMetrics()["schedules_succeeded_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_started_count"])
}

func TestScheduleWatcherTwoBackups(t *testing.T) {
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestScheduleWatcherTwoBackups(t *testing.T) {
assert.Equal(t, *schedules[0].NextLaunch, m[schedules[0].ID])
assert.Equal(t, *schedules[1].NextLaunch, m[schedules[1].ID])
assert.Equal(t, float64(2), metrics.GetMetrics()["schedules_succeeded_count"])
assert.Equal(t, float64(2), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"])
assert.Equal(t, float64(2), metrics.GetMetrics()["operations_started_count"])
}

func TestAllScheduleMetrics(t *testing.T) {
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestAllScheduleMetrics(t *testing.T) {
assert.Equal(t, len(schedules), 1)
assert.Equal(t, m[schedules[0].ID], *schedules[0].NextLaunch)
assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_succeeded_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["schedules_launched_take_backup_with_retry_count"])
assert.Equal(t, float64(1), metrics.GetMetrics()["operations_started_count"])
assert.Equal(t, float64(schedules[0].RecoveryPoint.Unix()), metrics.GetMetrics()["schedules_last_backup_timestamp"])
assert.Equal(t, 0.5166666666666667, metrics.GetMetrics()["schedules_recovery_point_objective"])
}
2 changes: 2 additions & 0 deletions internal/watchers/ttl_watcher/ttl_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"time"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/metrics"
"ydbcp/internal/types"
"ydbcp/internal/util/ticker"
"ydbcp/internal/watchers"
)

func TestTtlWatcher(t *testing.T) {
metrics.InitializeMockMetricsRegistry()
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 02a321e

Please sign in to comment.