From 2c412d9c543dcdeba584ee6c25969f613f8780c0 Mon Sep 17 00:00:00 2001 From: Nathan Oorloff Date: Thu, 3 Oct 2024 19:31:55 -0400 Subject: [PATCH 1/2] basic batch metrics structure --- common/metrics/batchmetrics.go | 91 ++++++++++++++++++++ service/history/workflow/metrics.go | 60 +++++++------ service/history/workflow/transaction_impl.go | 12 ++- 3 files changed, 131 insertions(+), 32 deletions(-) create mode 100644 common/metrics/batchmetrics.go diff --git a/common/metrics/batchmetrics.go b/common/metrics/batchmetrics.go new file mode 100644 index 00000000000..46e807ad37c --- /dev/null +++ b/common/metrics/batchmetrics.go @@ -0,0 +1,91 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination batchmetrics_mock.go + +package metrics + +import "time" + +// The BatchMetrics interfaces provide a way to emit "wide events" within the codebase. If a wide event +// implementation is not provided we default to the implementations provided below which will maintain +// backwards compatibility by emitting each field as an individual metric. Fields can be labeled using +// the metricDefinition.Name() accessor in the respective With() methods. + +type ( + BatchMetricsHandler interface { + CreateBatch(string, ...Tag) BatchMetric + } + + BatchMetric interface { + WithHistogram(histogramDefinition, int64) BatchMetric + WithTimer(timerDefinition, int64) BatchMetric + WithCounter(counterDefinition, int64) BatchMetric + WithGauge(gaugeDefinition, int64) BatchMetric + Emit() + } + + BatchHandlerImpl struct { + metricsHandler Handler + } + BatchMetricImpl struct { + emitter Handler + tags []Tag + } +) + +func NewBatchMetricHandler(metricHandler Handler) *BatchHandlerImpl { + return &BatchHandlerImpl{metricHandler} +} + +// CreateBatch will emit a BatchMetric that will emit +func (bh *BatchHandlerImpl) CreateBatch(_ string, tags ...Tag) *BatchMetricImpl { + return &BatchMetricImpl{ + emitter: bh.metricsHandler.WithTags(tags...), + tags: tags, + } +} + +func (bm *BatchMetricImpl) WithHistogram(def histogramDefinition, value int64) *BatchMetricImpl { + def.With(bm.emitter).Record(value) + return bm +} + +func (bm *BatchMetricImpl) WithTimer(def timerDefinition, value time.Duration) *BatchMetricImpl { + def.With(bm.emitter).Record(value) + return bm +} + +func (bm *BatchMetricImpl) WithCounter(def counterDefinition, value int64) *BatchMetricImpl { + def.With(bm.emitter).Record(value) + return bm +} + +func (bm *BatchMetricImpl) WithGauge(def gaugeDefinition, value float64) *BatchMetricImpl { + def.With(bm.emitter).Record(value) + return bm +} + +// Emit is a no-op since there's no actual wide event being sent in this implementation. +func (bm *BatchMetricImpl) Emit() {} diff --git a/service/history/workflow/metrics.go b/service/history/workflow/metrics.go index 7e39759dda8..4dd2005e3f7 100644 --- a/service/history/workflow/metrics.go +++ b/service/history/workflow/metrics.go @@ -54,44 +54,48 @@ func emitWorkflowHistoryStats( } func emitMutableStateStatus( - metricsHandler metrics.Handler, stats *persistence.MutableStateStatistics, + metricsHandler metrics.Handler, + batchMetricHandler metrics.BatchMetricsHandler, + tags ...metrics.Tag, ) { if stats == nil { return } - metrics.MutableStateSize.With(metricsHandler).Record(int64(stats.TotalSize)) - metrics.ExecutionInfoSize.With(metricsHandler).Record(int64(stats.ExecutionInfoSize)) - metrics.ExecutionStateSize.With(metricsHandler).Record(int64(stats.ExecutionStateSize)) - metrics.ActivityInfoSize.With(metricsHandler).Record(int64(stats.ActivityInfoSize)) - metrics.ActivityInfoCount.With(metricsHandler).Record(int64(stats.ActivityInfoCount)) - metrics.TotalActivityCount.With(metricsHandler).Record(stats.TotalActivityCount) - metrics.TimerInfoSize.With(metricsHandler).Record(int64(stats.TimerInfoSize)) - metrics.TimerInfoCount.With(metricsHandler).Record(int64(stats.TimerInfoCount)) - metrics.TotalUserTimerCount.With(metricsHandler).Record(stats.TotalUserTimerCount) - metrics.ChildInfoSize.With(metricsHandler).Record(int64(stats.ChildInfoSize)) - metrics.ChildInfoCount.With(metricsHandler).Record(int64(stats.ChildInfoCount)) - metrics.TotalChildExecutionCount.With(metricsHandler).Record(stats.TotalChildExecutionCount) - metrics.RequestCancelInfoSize.With(metricsHandler).Record(int64(stats.RequestCancelInfoSize)) - metrics.RequestCancelInfoCount.With(metricsHandler).Record(int64(stats.RequestCancelInfoCount)) - metrics.TotalRequestCancelExternalCount.With(metricsHandler).Record(stats.TotalRequestCancelExternalCount) - metrics.SignalInfoSize.With(metricsHandler).Record(int64(stats.SignalInfoSize)) - metrics.SignalInfoCount.With(metricsHandler).Record(int64(stats.SignalInfoCount)) - metrics.TotalSignalExternalCount.With(metricsHandler).Record(stats.TotalSignalExternalCount) - metrics.SignalRequestIDSize.With(metricsHandler).Record(int64(stats.SignalRequestIDSize)) - metrics.SignalRequestIDCount.With(metricsHandler).Record(int64(stats.SignalRequestIDCount)) - metrics.TotalSignalCount.With(metricsHandler).Record(stats.TotalSignalCount) - metrics.BufferedEventsSize.With(metricsHandler).Record(int64(stats.BufferedEventsSize)) - metrics.BufferedEventsCount.With(metricsHandler).Record(int64(stats.BufferedEventsCount)) + batch := batchMetricHandler.CreateBatch("mutable_state_status", tags...) + batch.WithHistogram(metrics.MutableStateSize, int64(stats.TotalSize)) + batch.WithHistogram(metrics.ExecutionInfoSize, int64(stats.ExecutionInfoSize)) + batch.WithHistogram(metrics.ExecutionStateSize, int64(stats.ExecutionStateSize)) + batch.WithHistogram(metrics.ActivityInfoSize, int64(stats.ActivityInfoSize)) + batch.WithHistogram(metrics.ActivityInfoCount, int64(stats.ActivityInfoCount)) + batch.WithHistogram(metrics.TotalActivityCount, stats.TotalActivityCount) + batch.WithHistogram(metrics.TimerInfoSize, int64(stats.TimerInfoSize)) + batch.WithHistogram(metrics.TimerInfoCount, int64(stats.TimerInfoCount)) + batch.WithHistogram(metrics.TotalUserTimerCount, stats.TotalUserTimerCount) + batch.WithHistogram(metrics.ChildInfoSize, int64(stats.ChildInfoSize)) + batch.WithHistogram(metrics.ChildInfoCount, int64(stats.ChildInfoCount)) + batch.WithHistogram(metrics.TotalChildExecutionCount, stats.TotalChildExecutionCount) + batch.WithHistogram(metrics.RequestCancelInfoSize, int64(stats.RequestCancelInfoSize)) + batch.WithHistogram(metrics.RequestCancelInfoCount, int64(stats.RequestCancelInfoCount)) + batch.WithHistogram(metrics.TotalRequestCancelExternalCount, stats.TotalRequestCancelExternalCount) + batch.WithHistogram(metrics.SignalInfoSize, int64(stats.SignalInfoSize)) + batch.WithHistogram(metrics.SignalInfoCount, int64(stats.SignalInfoCount)) + batch.WithHistogram(metrics.TotalSignalExternalCount, stats.TotalSignalExternalCount) + batch.WithHistogram(metrics.SignalRequestIDSize, int64(stats.SignalRequestIDSize)) + batch.WithHistogram(metrics.SignalRequestIDCount, int64(stats.SignalRequestIDCount)) + batch.WithHistogram(metrics.TotalSignalCount, stats.TotalSignalCount) + batch.WithHistogram(metrics.BufferedEventsSize, int64(stats.BufferedEventsSize)) + batch.WithHistogram(metrics.BufferedEventsCount, int64(stats.BufferedEventsCount)) if stats.HistoryStatistics != nil { - metrics.HistorySize.With(metricsHandler).Record(int64(stats.HistoryStatistics.SizeDiff)) - metrics.HistoryCount.With(metricsHandler).Record(int64(stats.HistoryStatistics.CountDiff)) + batch.WithHistogram(metrics.HistorySize, int64(stats.HistoryStatistics.SizeDiff)) + batch.WithHistogram(metrics.HistoryCount, int64(stats.HistoryStatistics.CountDiff)) } + batch.Emit() + metricsHandler = metricsHandler.WithTags(tags...) for category, taskCount := range stats.TaskCountByCategory { - metrics.TaskCount.With(metricsHandler). - Record(int64(taskCount), metrics.TaskCategoryTag(category)) + metrics.TaskCount.With(metricsHandler).Record(int64(taskCount), metrics.TaskCategoryTag(category)) } } diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index a9399317c1f..862c31343aa 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -668,12 +668,14 @@ func emitMutationMetrics( namespace *namespace.Namespace, stats ...*persistence.MutableStateStatistics, ) { - metricsHandler := shard.GetMetricsHandler() namespaceName := namespace.Name() for _, stat := range stats { emitMutableStateStatus( - metricsHandler.WithTags(metrics.OperationTag(metrics.SessionStatsScope), metrics.NamespaceTag(namespaceName.String())), stat, + shard.GetMetricsHandler(), + shard.GetBatchMetricsHandler(), + metrics.OperationTag(metrics.SessionStatsScope), + metrics.NamespaceTag(namespaceName.String()), ) } } @@ -683,12 +685,14 @@ func emitGetMetrics( namespace *namespace.Namespace, stats ...*persistence.MutableStateStatistics, ) { - metricsHandler := shard.GetMetricsHandler() namespaceName := namespace.Name() for _, stat := range stats { emitMutableStateStatus( - metricsHandler.WithTags(metrics.OperationTag(metrics.ExecutionStatsScope), metrics.NamespaceTag(namespaceName.String())), stat, + shard.GetMetricsHandler(), + shard.GetBatchMetricsHandler(), + metrics.OperationTag(metrics.ExecutionStatsScope), + metrics.NamespaceTag(namespaceName.String()), ) } } From 5f9d1f1b4799ddbc7df22ebf762b87fa05ed3d04 Mon Sep 17 00:00:00 2001 From: Nathan Oorloff Date: Fri, 4 Oct 2024 14:11:02 -0400 Subject: [PATCH 2/2] Fix implementation, plumb through server --- common/metrics/batchmetrics.go | 35 ++--- common/metrics/batchmetrics_mock.go | 174 ++++++++++++++++++++++ service/history/history_engine.go | 2 + service/history/shard/context.go | 1 + service/history/shard/context_factory.go | 2 + service/history/shard/context_impl.go | 7 + service/history/shard/context_mock.go | 28 ++++ service/history/shard/context_testutil.go | 1 + service/history/shard/controller_test.go | 1 + temporal/fx.go | 13 ++ temporal/server_option.go | 10 +- temporal/server_options.go | 1 + 12 files changed, 257 insertions(+), 18 deletions(-) create mode 100644 common/metrics/batchmetrics_mock.go diff --git a/common/metrics/batchmetrics.go b/common/metrics/batchmetrics.go index 46e807ad37c..965a79c927b 100644 --- a/common/metrics/batchmetrics.go +++ b/common/metrics/batchmetrics.go @@ -29,9 +29,9 @@ package metrics import "time" // The BatchMetrics interfaces provide a way to emit "wide events" within the codebase. If a wide event -// implementation is not provided we default to the implementations provided below which will maintain -// backwards compatibility by emitting each field as an individual metric. Fields can be labeled using -// the metricDefinition.Name() accessor in the respective With() methods. +// implementation is not provided we default to the implementation provided below which will maintain +// backwards compatibility by emitting each field as an individual metric. In custom implementations, fields +// can be labeled using the metricDefinition.Name() accessor within the respective With() methods. type ( BatchMetricsHandler interface { @@ -40,9 +40,9 @@ type ( BatchMetric interface { WithHistogram(histogramDefinition, int64) BatchMetric - WithTimer(timerDefinition, int64) BatchMetric + WithTimer(timerDefinition, time.Duration) BatchMetric WithCounter(counterDefinition, int64) BatchMetric - WithGauge(gaugeDefinition, int64) BatchMetric + WithGauge(gaugeDefinition, float64) BatchMetric Emit() } @@ -51,41 +51,42 @@ type ( } BatchMetricImpl struct { emitter Handler - tags []Tag } ) -func NewBatchMetricHandler(metricHandler Handler) *BatchHandlerImpl { - return &BatchHandlerImpl{metricHandler} +func NewBatchMetricsHandler(metricHandler Handler) BatchHandlerImpl { + return BatchHandlerImpl{metricHandler} } -// CreateBatch will emit a BatchMetric that will emit -func (bh *BatchHandlerImpl) CreateBatch(_ string, tags ...Tag) *BatchMetricImpl { +func (bh BatchHandlerImpl) CreateBatch(_ string, tags ...Tag) BatchMetric { + emitter := bh.metricsHandler + if len(tags) > 0 { + emitter = emitter.WithTags(tags...) + } return &BatchMetricImpl{ - emitter: bh.metricsHandler.WithTags(tags...), - tags: tags, + emitter: emitter, } } -func (bm *BatchMetricImpl) WithHistogram(def histogramDefinition, value int64) *BatchMetricImpl { +func (bm *BatchMetricImpl) WithHistogram(def histogramDefinition, value int64) BatchMetric { def.With(bm.emitter).Record(value) return bm } -func (bm *BatchMetricImpl) WithTimer(def timerDefinition, value time.Duration) *BatchMetricImpl { +func (bm *BatchMetricImpl) WithTimer(def timerDefinition, value time.Duration) BatchMetric { def.With(bm.emitter).Record(value) return bm } -func (bm *BatchMetricImpl) WithCounter(def counterDefinition, value int64) *BatchMetricImpl { +func (bm *BatchMetricImpl) WithCounter(def counterDefinition, value int64) BatchMetric { def.With(bm.emitter).Record(value) return bm } -func (bm *BatchMetricImpl) WithGauge(def gaugeDefinition, value float64) *BatchMetricImpl { +func (bm *BatchMetricImpl) WithGauge(def gaugeDefinition, value float64) BatchMetric { def.With(bm.emitter).Record(value) return bm } -// Emit is a no-op since there's no actual wide event being sent in this implementation. +// Emit is a no-op in this implementation since we don't actually send wide events. func (bm *BatchMetricImpl) Emit() {} diff --git a/common/metrics/batchmetrics_mock.go b/common/metrics/batchmetrics_mock.go new file mode 100644 index 00000000000..50d64e13826 --- /dev/null +++ b/common/metrics/batchmetrics_mock.go @@ -0,0 +1,174 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: batchmetrics.go +// +// Generated by this command: +// +// mockgen -copyright_file ../../LICENSE -package metrics -source batchmetrics.go -destination batchmetrics_mock.go +// + +// Package metrics is a generated GoMock package. +package metrics + +import ( + reflect "reflect" + time "time" + + gomock "go.uber.org/mock/gomock" +) + +// MockBatchMetricsHandler is a mock of BatchMetricsHandler interface. +type MockBatchMetricsHandler struct { + ctrl *gomock.Controller + recorder *MockBatchMetricsHandlerMockRecorder +} + +// MockBatchMetricsHandlerMockRecorder is the mock recorder for MockBatchMetricsHandler. +type MockBatchMetricsHandlerMockRecorder struct { + mock *MockBatchMetricsHandler +} + +// NewMockBatchMetricsHandler creates a new mock instance. +func NewMockBatchMetricsHandler(ctrl *gomock.Controller) *MockBatchMetricsHandler { + mock := &MockBatchMetricsHandler{ctrl: ctrl} + mock.recorder = &MockBatchMetricsHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchMetricsHandler) EXPECT() *MockBatchMetricsHandlerMockRecorder { + return m.recorder +} + +// CreateBatch mocks base method. +func (m *MockBatchMetricsHandler) CreateBatch(arg0 string, arg1 ...Tag) BatchMetric { + m.ctrl.T.Helper() + varargs := []any{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CreateBatch", varargs...) + ret0, _ := ret[0].(BatchMetric) + return ret0 +} + +// CreateBatch indicates an expected call of CreateBatch. +func (mr *MockBatchMetricsHandlerMockRecorder) CreateBatch(arg0 any, arg1 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateBatch", reflect.TypeOf((*MockBatchMetricsHandler)(nil).CreateBatch), varargs...) +} + +// MockBatchMetric is a mock of BatchMetric interface. +type MockBatchMetric struct { + ctrl *gomock.Controller + recorder *MockBatchMetricMockRecorder +} + +// MockBatchMetricMockRecorder is the mock recorder for MockBatchMetric. +type MockBatchMetricMockRecorder struct { + mock *MockBatchMetric +} + +// NewMockBatchMetric creates a new mock instance. +func NewMockBatchMetric(ctrl *gomock.Controller) *MockBatchMetric { + mock := &MockBatchMetric{ctrl: ctrl} + mock.recorder = &MockBatchMetricMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchMetric) EXPECT() *MockBatchMetricMockRecorder { + return m.recorder +} + +// Emit mocks base method. +func (m *MockBatchMetric) Emit() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Emit") +} + +// Emit indicates an expected call of Emit. +func (mr *MockBatchMetricMockRecorder) Emit() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Emit", reflect.TypeOf((*MockBatchMetric)(nil).Emit)) +} + +// WithCounter mocks base method. +func (m *MockBatchMetric) WithCounter(arg0 counterDefinition, arg1 int64) BatchMetric { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WithCounter", arg0, arg1) + ret0, _ := ret[0].(BatchMetric) + return ret0 +} + +// WithCounter indicates an expected call of WithCounter. +func (mr *MockBatchMetricMockRecorder) WithCounter(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithCounter", reflect.TypeOf((*MockBatchMetric)(nil).WithCounter), arg0, arg1) +} + +// WithGauge mocks base method. +func (m *MockBatchMetric) WithGauge(arg0 gaugeDefinition, arg1 float64) BatchMetric { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WithGauge", arg0, arg1) + ret0, _ := ret[0].(BatchMetric) + return ret0 +} + +// WithGauge indicates an expected call of WithGauge. +func (mr *MockBatchMetricMockRecorder) WithGauge(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithGauge", reflect.TypeOf((*MockBatchMetric)(nil).WithGauge), arg0, arg1) +} + +// WithHistogram mocks base method. +func (m *MockBatchMetric) WithHistogram(arg0 histogramDefinition, arg1 int64) BatchMetric { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WithHistogram", arg0, arg1) + ret0, _ := ret[0].(BatchMetric) + return ret0 +} + +// WithHistogram indicates an expected call of WithHistogram. +func (mr *MockBatchMetricMockRecorder) WithHistogram(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithHistogram", reflect.TypeOf((*MockBatchMetric)(nil).WithHistogram), arg0, arg1) +} + +// WithTimer mocks base method. +func (m *MockBatchMetric) WithTimer(arg0 timerDefinition, arg1 time.Duration) BatchMetric { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WithTimer", arg0, arg1) + ret0, _ := ret[0].(BatchMetric) + return ret0 +} + +// WithTimer indicates an expected call of WithTimer. +func (mr *MockBatchMetricMockRecorder) WithTimer(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithTimer", reflect.TypeOf((*MockBatchMetric)(nil).WithTimer), arg0, arg1) +} diff --git a/service/history/history_engine.go b/service/history/history_engine.go index 5cf332264dd..79ea76e0432 100644 --- a/service/history/history_engine.go +++ b/service/history/history_engine.go @@ -128,6 +128,7 @@ type ( eventNotifier events.Notifier tokenSerializer common.TaskTokenSerializer metricsHandler metrics.Handler + batchMetricsHandler metrics.BatchMetricsHandler logger log.Logger throttledLogger log.Logger config *configs.Config @@ -200,6 +201,7 @@ func NewEngineWithShardContext( logger: log.With(logger, tag.ComponentHistoryEngine), throttledLogger: log.With(shard.GetThrottledLogger(), tag.ComponentHistoryEngine), metricsHandler: shard.GetMetricsHandler(), + batchMetricsHandler: shard.GetBatchMetricsHandler(), eventNotifier: eventNotifier, config: config, sdkClientFactory: sdkClientFactory, diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 27992d834c6..6663604c4ae 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -67,6 +67,7 @@ type ( GetLogger() log.Logger GetThrottledLogger() log.Logger GetMetricsHandler() metrics.Handler + GetBatchMetricsHandler() metrics.BatchMetricsHandler GetTimeSource() clock.TimeSource GetRemoteAdminClient(string) (adminservice.AdminServiceClient, error) diff --git a/service/history/shard/context_factory.go b/service/history/shard/context_factory.go index fd08c25c4e1..587e95d8287 100644 --- a/service/history/shard/context_factory.go +++ b/service/history/shard/context_factory.go @@ -68,6 +68,7 @@ type ( HostInfoProvider membership.HostInfoProvider Logger log.Logger MetricsHandler metrics.Handler + BatchMetricsHandler metrics.BatchMetricsHandler NamespaceRegistry namespace.Registry PayloadSerializer serialization.Serializer PersistenceExecutionManager persistence.ExecutionManager @@ -110,6 +111,7 @@ func (c *contextFactoryImpl) CreateContext( c.ClientBean, c.HistoryClient, c.MetricsHandler, + c.BatchMetricsHandler, c.PayloadSerializer, c.TimeSource, c.NamespaceRegistry, diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 66b6f8822dc..11eeb95761c 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -110,6 +110,7 @@ type ( stringRepr string executionManager persistence.ExecutionManager metricsHandler metrics.Handler + batchMetricsHandler metrics.BatchMetricsHandler eventsCache events.Cache closeCallback CloseCallback config *configs.Config @@ -2063,6 +2064,7 @@ func newContext( clientBean client.Bean, historyClient historyservice.HistoryServiceClient, metricsHandler metrics.Handler, + batchMetricsHandler metrics.BatchMetricsHandler, payloadSerializer serialization.Serializer, timeSource cclock.TimeSource, namespaceRegistry namespace.Registry, @@ -2097,6 +2099,7 @@ func newContext( stringRepr: fmt.Sprintf("Shard(%d)", shardID), executionManager: persistenceExecutionManager, metricsHandler: metricsHandler, + batchMetricsHandler: batchMetricsHandler, closeCallback: closeCallback, config: historyConfig, finalizer: finalizer.New(taggedLogger, metricsHandler), @@ -2196,6 +2199,10 @@ func (s *ContextImpl) GetMetricsHandler() metrics.Handler { return s.metricsHandler } +func (s *ContextImpl) GetBatchMetricsHandler() metrics.BatchMetricsHandler { + return s.batchMetricsHandler +} + func (s *ContextImpl) GetTimeSource() cclock.TimeSource { return s.timeSource } diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 5f69f9d50f8..6a8af4baa62 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -244,6 +244,20 @@ func (mr *MockContextMockRecorder) GetArchivalMetadata() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetArchivalMetadata", reflect.TypeOf((*MockContext)(nil).GetArchivalMetadata)) } +// GetBatchMetricsHandler mocks base method. +func (m *MockContext) GetBatchMetricsHandler() metrics.BatchMetricsHandler { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBatchMetricsHandler") + ret0, _ := ret[0].(metrics.BatchMetricsHandler) + return ret0 +} + +// GetBatchMetricsHandler indicates an expected call of GetBatchMetricsHandler. +func (mr *MockContextMockRecorder) GetBatchMetricsHandler() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBatchMetricsHandler", reflect.TypeOf((*MockContext)(nil).GetBatchMetricsHandler)) +} + // GetClusterMetadata mocks base method. func (m *MockContext) GetClusterMetadata() cluster.Metadata { m.ctrl.T.Helper() @@ -972,6 +986,20 @@ func (mr *MockControllableContextMockRecorder) GetArchivalMetadata() *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetArchivalMetadata", reflect.TypeOf((*MockControllableContext)(nil).GetArchivalMetadata)) } +// GetBatchMetricsHandler mocks base method. +func (m *MockControllableContext) GetBatchMetricsHandler() metrics.BatchMetricsHandler { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBatchMetricsHandler") + ret0, _ := ret[0].(metrics.BatchMetricsHandler) + return ret0 +} + +// GetBatchMetricsHandler indicates an expected call of GetBatchMetricsHandler. +func (mr *MockControllableContextMockRecorder) GetBatchMetricsHandler() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBatchMetricsHandler", reflect.TypeOf((*MockControllableContext)(nil).GetBatchMetricsHandler)) +} + // GetClusterMetadata mocks base method. func (m *MockControllableContext) GetClusterMetadata() cluster.Metadata { m.ctrl.T.Helper() diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index 0f9b6e406d6..dfb85cf8daf 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -153,6 +153,7 @@ func newTestContext(t *resourcetest.Test, eventsCache events.Cache, config Conte stringRepr: fmt.Sprintf("Shard(%d)", config.ShardInfo.GetShardId()), executionManager: executionManager, metricsHandler: t.MetricsHandler, + batchMetricsHandler: metrics.NewBatchMetricsHandler(t.MetricsHandler), eventsCache: eventsCache, config: config.Config, contextTaggedLogger: t.GetLogger(), diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index 88a36caa968..2915591634a 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -102,6 +102,7 @@ func NewTestController( HostInfoProvider: hostInfoProvider, Logger: resource.GetLogger(), MetricsHandler: metricsTestHandler, + BatchMetricsHandler: metrics.NewBatchMetricsHandler(metricsTestHandler), NamespaceRegistry: resource.GetNamespaceRegistry(), PayloadSerializer: resource.GetPayloadSerializer(), PersistenceExecutionManager: resource.GetExecutionManager(), diff --git a/temporal/fx.go b/temporal/fx.go index 75498ec89cd..a7b8dbb6d64 100644 --- a/temporal/fx.go +++ b/temporal/fx.go @@ -140,6 +140,7 @@ type ( EsConfig *esclient.Config EsClient esclient.Client MetricsHandler metrics.Handler + BatchMetricsHandler metrics.BatchMetricsHandler } ) @@ -217,6 +218,13 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { } } + // BatchMetricsHandler + batchMetricsHandler := so.batchMetricsHandler + if batchMetricsHandler == nil { + // If no custom handler is provided we fall back to one backed by the metrics handler. + batchMetricsHandler = metrics.NewBatchMetricsHandler(metricHandler) + } + // DynamicConfigClient dcClient := so.dynamicConfigClient if dcClient == nil { @@ -312,6 +320,7 @@ func ServerOptionsProvider(opts []ServerOption) (serverOptionsProvider, error) { EsConfig: esConfig, EsClient: esClient, MetricsHandler: metricHandler, + BatchMetricsHandler: batchMetricsHandler, }, nil } @@ -357,6 +366,7 @@ type ( NamespaceLogger resource.NamespaceLogger DynamicConfigClient dynamicconfig.Client MetricsHandler metrics.Handler + BatchMetricsHandler metrics.BatchMetricsHandler EsConfig *esclient.Config EsClient esclient.Client TlsConfigProvider encryption.TLSConfigProvider @@ -440,6 +450,9 @@ func (params ServiceProviderParamsCommon) GetCommonServiceOptions(serviceName pr func() metrics.Handler { return params.MetricsHandler.WithTags(metrics.ServiceNameTag(serviceName)) }, + func() metrics.BatchMetricsHandler { + return params.BatchMetricsHandler + }, func() esclient.Client { return params.EsClient }, diff --git a/temporal/server_option.go b/temporal/server_option.go index 0c520219f21..52a215e5202 100644 --- a/temporal/server_option.go +++ b/temporal/server_option.go @@ -198,10 +198,18 @@ func WithChainedFrontendGrpcInterceptors( }) } -// WithCustomerMetricsProvider sets a custom implementation of the metrics.MetricsHandler interface +// WithCustomMetricsHandler sets a custom implementation of the metrics.MetricsHandler interface // metrics.MetricsHandler is the base interface for publishing metric events func WithCustomMetricsHandler(provider metrics.Handler) ServerOption { return applyFunc(func(s *serverOptions) { s.metricHandler = provider }) } + +// WithCustomBatchMetricsHandler sets a custom implementation of the metrics.BatchMetricsHandler interface +// to emit wide events in place of individual metric events where appropriate. +func WithCustomBatchMetricsHandler(provider metrics.BatchMetricsHandler) ServerOption { + return applyFunc(func(s *serverOptions) { + s.batchMetricsHandler = provider + }) +} diff --git a/temporal/server_options.go b/temporal/server_options.go index 55a281eab8c..e20fbb3e3d9 100644 --- a/temporal/server_options.go +++ b/temporal/server_options.go @@ -77,6 +77,7 @@ type ( searchAttributesMapper searchattribute.Mapper customFrontendInterceptors []grpc.UnaryServerInterceptor metricHandler metrics.Handler + batchMetricsHandler metrics.BatchMetricsHandler } )