From f02ecae60a6a57afb00f40675087b50e1c4e2730 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 14 Nov 2023 15:38:21 -0800 Subject: [PATCH 01/16] Move hardware emit to separate component. --- internal/internal_worker_base.go | 107 ++++++----------- internal/internal_worker_usage_collector.go | 124 ++++++++++++++++++++ 2 files changed, 157 insertions(+), 74 deletions(-) create mode 100644 internal/internal_worker_usage_collector.go diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 04398926f..b2601ec9b 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -28,12 +28,10 @@ import ( "errors" "fmt" "os" - "runtime" "sync" "syscall" "time" - "github.com/shirou/gopsutil/cpu" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -140,10 +138,11 @@ type ( logger *zap.Logger metricsScope tally.Scope - pollerRequestCh chan struct{} - pollerAutoScaler *pollerAutoScaler - taskQueueCh chan interface{} - sessionTokenBucket *sessionTokenBucket + pollerRequestCh chan struct{} + pollerAutoScaler *pollerAutoScaler + workerUsageCollector *workerUsageCollector + taskQueueCh chan interface{} + sessionTokenBucket *sessionTokenBucket } polledTask struct { @@ -173,17 +172,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t logger, ) } + // for now it's default to be enabled + var workerUC *workerUsageCollector + workerUC = newWorkerUsageCollector( + workerUsageCollectorOptions{ + Enabled: true, + Cooldown: 30 * time.Second, + Host: options.host, + MetricsScope: metricsScope, + }, + logger, + ) bw := &baseWorker{ - options: options, - shutdownCh: make(chan struct{}), - taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), - retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), - logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), - metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), - pollerRequestCh: make(chan struct{}, options.maxConcurrentTask), - pollerAutoScaler: pollerAS, - taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. + options: options, + shutdownCh: make(chan struct{}), + taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1), + retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy), + logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}), + metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType), + pollerRequestCh: make(chan struct{}, options.maxConcurrentTask), + pollerAutoScaler: pollerAS, + workerUsageCollector: workerUC, + taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched. limiterContext: ctx, limiterContextCancel: cancel, @@ -207,6 +218,10 @@ func (bw *baseWorker) Start() { bw.pollerAutoScaler.Start() } + if bw.workerUsageCollector != nil { + bw.workerUsageCollector.Start() + } + for i := 0; i < bw.options.pollerCount; i++ { bw.shutdownWG.Add(1) go bw.runPoller() @@ -215,15 +230,6 @@ func (bw *baseWorker) Start() { bw.shutdownWG.Add(1) go bw.runTaskDispatcher() - // We want the emit function run once per host instead of run once per worker - //collectHardwareUsageOnce.Do(func() { - // bw.shutdownWG.Add(1) - // go bw.emitHardwareUsage() - //}) - - bw.shutdownWG.Add(1) - go bw.emitHardwareUsage() - bw.isWorkerStarted = true traceLog(func() { bw.logger.Info("Started Worker", @@ -407,6 +413,9 @@ func (bw *baseWorker) Stop() { if bw.pollerAutoScaler != nil { bw.pollerAutoScaler.Stop() } + if bw.workerUsageCollector != nil { + bw.workerUsageCollector.Stop() + } if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success { traceLog(func() { @@ -420,53 +429,3 @@ func (bw *baseWorker) Stop() { } return } - -func (bw *baseWorker) emitHardwareUsage() { - defer func() { - if p := recover(); p != nil { - bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1) - topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType) - st := getStackTraceRaw(topLine, 7, 0) - bw.logger.Error("Unhandled panic in hardware emitting.", - zap.String(tagPanicError, fmt.Sprintf("%v", p)), - zap.String(tagPanicStack, st)) - } - }() - defer bw.shutdownWG.Done() - collectHardwareUsageOnce.Do( - func() { - ticker := time.NewTicker(hardwareMetricsCollectInterval) - for { - select { - case <-bw.shutdownCh: - ticker.Stop() - return - case <-ticker.C: - host := bw.options.host - scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host}) - - cpuPercent, err := cpu.Percent(0, false) - if err != nil { - bw.logger.Warn("Failed to get cpu percent", zap.Error(err)) - return - } - cpuCores, err := cpu.Counts(false) - if err != nil { - bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) - return - } - scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores)) - scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0]) - - var memStats runtime.MemStats - runtime.ReadMemStats(&memStats) - - scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine())) - scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys)) - scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse)) - scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse)) - } - } - }) - -} diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go new file mode 100644 index 000000000..3286526a4 --- /dev/null +++ b/internal/internal_worker_usage_collector.go @@ -0,0 +1,124 @@ +package internal + +import ( + "context" + "github.com/shirou/gopsutil/cpu" + "github.com/uber-go/tally" + "go.uber.org/cadence/internal/common/metrics" + "go.uber.org/zap" + "runtime" + "sync" + "time" +) + +type ( + workerUsageCollector struct { + cooldownTime time.Duration + logger *zap.Logger + ctx context.Context + wg *sync.WaitGroup // graceful stop + cancel context.CancelFunc + metricsScope tally.Scope + host string + } + + workerUsageCollectorOptions struct { + Enabled bool + Cooldown time.Duration + Host string + MetricsScope tally.Scope + } + + hardwareUsage struct { + NumCPUCores int + CPUPercent float64 + NumGoRoutines int + TotalMemory float64 + MemoryUsedHeap float64 + MemoryUsedStack float64 + } +) + +func newWorkerUsageCollector( + options workerUsageCollectorOptions, + logger *zap.Logger, +) *workerUsageCollector { + if !options.Enabled { + return nil + } + ctx, cancel := context.WithCancel(context.Background()) + return &workerUsageCollector{ + cooldownTime: options.Cooldown, + host: options.Host, + metricsScope: options.MetricsScope, + logger: logger, + ctx: ctx, + cancel: cancel, + wg: &sync.WaitGroup{}, + } +} + +func (w *workerUsageCollector) Start() { + w.wg.Add(1) + go func() { + defer func() { + if p := recover(); p != nil { + w.logger.Error("Unhandled panic in workerUsageCollector.") + } + }() + defer w.wg.Done() + collectHardwareUsageOnce.Do( + func() { + ticker := time.NewTicker(w.cooldownTime) + for { + select { + case <-w.ctx.Done(): + return + case <-ticker.C: + hardwareUsageData := w.collectHardwareUsage() + w.emitHardwareUsage(hardwareUsageData) + + } + } + }) + }() + return +} + +func (w *workerUsageCollector) Stop() { + w.cancel() + w.wg.Wait() +} + +func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { + cpuPercent, err := cpu.Percent(0, false) + if err != nil { + w.logger.Warn("Failed to get cpu percent", zap.Error(err)) + } + cpuCores, err := cpu.Counts(false) + if err != nil { + w.logger.Warn("Failed to get number of cpu cores", zap.Error(err)) + } + + var memStats runtime.MemStats + runtime.ReadMemStats(&memStats) + return hardwareUsage{ + NumCPUCores: cpuCores, + CPUPercent: cpuPercent[0], + NumGoRoutines: runtime.NumGoroutine(), + TotalMemory: float64(memStats.Sys), + MemoryUsedHeap: float64(memStats.HeapAlloc), + MemoryUsedStack: float64(memStats.StackInuse), + } +} + +// emitHardwareUsage emits collected hardware usage metrics to metrics scope +func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { + scope := w.metricsScope.Tagged(map[string]string{clientHostTag: w.host}) + scope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) + scope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) + scope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) + scope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) + scope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) + scope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) +} From e8abca3a9b4cc375b49215db5d06ab39ad9d6c54 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 15 Nov 2023 10:33:40 -0800 Subject: [PATCH 02/16] add npe check --- internal/internal_worker_usage_collector.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 3286526a4..c0f68f7a3 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -64,6 +64,7 @@ func (w *workerUsageCollector) Start() { defer func() { if p := recover(); p != nil { w.logger.Error("Unhandled panic in workerUsageCollector.") + w.logger.Error(p.(error).Error()) } }() defer w.wg.Done() @@ -76,8 +77,9 @@ func (w *workerUsageCollector) Start() { return case <-ticker.C: hardwareUsageData := w.collectHardwareUsage() - w.emitHardwareUsage(hardwareUsageData) - + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) + } } } }) From 51cb9885948dd8d06e612b52e62a41cf3a39909f Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 15 Nov 2023 20:42:08 -0800 Subject: [PATCH 03/16] remove Sync.Once --- internal/internal_worker_base.go | 2 -- internal/internal_worker_usage_collector.go | 25 +++++++++------------ 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b2601ec9b..e3884c778 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -55,8 +55,6 @@ var ( var errShutdown = errors.New("worker shutting down") -var collectHardwareUsageOnce sync.Once - type ( // resultHandler that returns result resultHandler func(result []byte, err error) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index c0f68f7a3..3716c4cb3 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -68,21 +68,18 @@ func (w *workerUsageCollector) Start() { } }() defer w.wg.Done() - collectHardwareUsageOnce.Do( - func() { - ticker := time.NewTicker(w.cooldownTime) - for { - select { - case <-w.ctx.Done(): - return - case <-ticker.C: - hardwareUsageData := w.collectHardwareUsage() - if w.metricsScope != nil { - w.emitHardwareUsage(hardwareUsageData) - } - } + ticker := time.NewTicker(w.cooldownTime) + for { + select { + case <-w.ctx.Done(): + return + case <-ticker.C: + hardwareUsageData := w.collectHardwareUsage() + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) } - }) + } + } }() return } From 4d28be19bcd95dc2d3daf5fec4a3842cffa082e1 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 16 Nov 2023 09:36:25 -0800 Subject: [PATCH 04/16] remove Sync.Once and add workertype field --- internal/internal_worker_base.go | 2 +- internal/internal_worker_usage_collector.go | 29 ++++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index e3884c778..78d2eb873 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -176,8 +176,8 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t workerUsageCollectorOptions{ Enabled: true, Cooldown: 30 * time.Second, - Host: options.host, MetricsScope: metricsScope, + WorkerType: options.workerType, }, logger, ) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 3716c4cb3..759dc1027 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -13,20 +13,20 @@ import ( type ( workerUsageCollector struct { + workerType string cooldownTime time.Duration logger *zap.Logger ctx context.Context wg *sync.WaitGroup // graceful stop cancel context.CancelFunc metricsScope tally.Scope - host string } workerUsageCollectorOptions struct { Enabled bool Cooldown time.Duration - Host string MetricsScope tally.Scope + WorkerType string } hardwareUsage struct { @@ -48,8 +48,8 @@ func newWorkerUsageCollector( } ctx, cancel := context.WithCancel(context.Background()) return &workerUsageCollector{ + workerType: options.WorkerType, cooldownTime: options.Cooldown, - host: options.Host, metricsScope: options.MetricsScope, logger: logger, ctx: ctx, @@ -74,9 +74,13 @@ func (w *workerUsageCollector) Start() { case <-w.ctx.Done(): return case <-ticker.C: - hardwareUsageData := w.collectHardwareUsage() - if w.metricsScope != nil { - w.emitHardwareUsage(hardwareUsageData) + // Given that decision worker and activity worker are running in the same host, we only need to collect + // hardware usage from one of them. + if w.workerType == "DecisionWorker" { + hardwareUsageData := w.collectHardwareUsage() + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) + } } } } @@ -113,11 +117,10 @@ func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { // emitHardwareUsage emits collected hardware usage metrics to metrics scope func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { - scope := w.metricsScope.Tagged(map[string]string{clientHostTag: w.host}) - scope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) - scope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) - scope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) - scope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) - scope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) - scope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) + w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) + w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) + w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) + w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) + w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) + w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) } From ffd2d75ca94e3bf3c0927a1469b3f037910631c8 Mon Sep 17 00:00:00 2001 From: Tim Li <47233368+timl3136@users.noreply.github.com> Date: Wed, 15 Nov 2023 10:26:38 -0800 Subject: [PATCH 05/16] Calculate workflow history size and count and expose that to client (#1270) Enable client side estimated history size exposure via API --- internal/common/metrics/constants.go | 2 + internal/compatibility/proto/response.go | 1 + internal/compatibility/thrift/response.go | 1 + internal/internal_event_handlers.go | 7 +- internal/internal_event_handlers_test.go | 70 +++++++++++ internal/internal_task_handlers.go | 4 + internal/internal_task_pollers.go | 9 +- internal/internal_utils.go | 139 ++++++++++++++++++++++ internal/workflow.go | 15 +++ workflow/workflow.go | 10 ++ 10 files changed, 255 insertions(+), 3 deletions(-) diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 2cb0e1a7c..31b78f189 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -115,6 +115,8 @@ const ( MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack" NumGoRoutines = CadenceMetricsPrefix + "num-go-routines" + EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size" + ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size" ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota" PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage" ) diff --git a/internal/compatibility/proto/response.go b/internal/compatibility/proto/response.go index 2fc755e96..c57e473a9 100644 --- a/internal/compatibility/proto/response.go +++ b/internal/compatibility/proto/response.go @@ -223,6 +223,7 @@ func PollForDecisionTaskResponse(t *shared.PollForDecisionTaskResponse) *apiv1.P StartedTime: unixNanoToTime(t.StartedTimestamp), Queries: WorkflowQueryMap(t.Queries), NextEventId: t.GetNextEventId(), + TotalHistoryBytes: t.GetTotalHistoryBytes(), } } diff --git a/internal/compatibility/thrift/response.go b/internal/compatibility/thrift/response.go index 365ebd3ab..1247618b6 100644 --- a/internal/compatibility/thrift/response.go +++ b/internal/compatibility/thrift/response.go @@ -223,6 +223,7 @@ func PollForDecisionTaskResponse(t *apiv1.PollForDecisionTaskResponse) *shared.P StartedTimestamp: timeToUnixNano(t.StartedTime), Queries: WorkflowQueryMap(t.Queries), NextEventId: &t.NextEventId, + TotalHistoryBytes: &t.TotalHistoryBytes, } } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index c9e32f607..37c9b05a6 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -29,6 +29,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -43,7 +44,8 @@ import ( ) const ( - queryResultSizeLimit = 2000000 // 2MB + queryResultSizeLimit = 2000000 // 2MB + historySizeEstimationBuffer = 400 // 400B for common fields in history event ) // Make sure that interfaces are implemented @@ -940,6 +942,9 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( return err } + historySum := estimateHistorySize(weh.logger, event) + atomic.AddInt64(&weh.workflowInfo.TotalHistoryBytes, int64(historySum)) + // When replaying histories to get stack trace or current state the last event might be not // decision started. So always call OnDecisionTaskStarted on the last event. // Don't call for EventType_DecisionTaskStarted as it was already called when handling it. diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index 2a97272f3..628d4cb37 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -22,6 +22,7 @@ package internal import ( "encoding/json" + "go.uber.org/cadence/internal/common" "testing" "github.com/stretchr/testify/assert" @@ -312,3 +313,72 @@ func Test_CreateSearchAttributesForChangeVersion(t *testing.T) { require.True(t, ok, "Remember to update related key on server side") require.Equal(t, []string{"cid-1"}, val) } + +func TestHistoryEstimationforSmallEvents(t *testing.T) { + taskList := "tasklist" + testEvents := []*s.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskScheduled(2, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskStarted(3), + { + EventId: common.Int64Ptr(4), + EventType: common.EventTypePtr(s.EventTypeDecisionTaskFailed), + }, + { + EventId: common.Int64Ptr(5), + EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionSignaled), + }, + createTestEventDecisionTaskScheduled(6, &s.DecisionTaskScheduledEventAttributes{TaskList: &s.TaskList{Name: &taskList}}), + createTestEventDecisionTaskStarted(7), + } + core, _ := observer.New(zapcore.InfoLevel) + logger := zap.New(core, zap.Development()) + w := workflowExecutionEventHandlerImpl{ + workflowEnvironmentImpl: &workflowEnvironmentImpl{logger: logger}, + } + + w.logger = logger + historySizeSum := 0 + for _, event := range testEvents { + sum := estimateHistorySize(logger, event) + historySizeSum += sum + } + trueSize := len(testEvents) * historySizeEstimationBuffer + + assert.Equal(t, trueSize, historySizeSum) +} + +func TestHistoryEstimationforPackedEvents(t *testing.T) { + // create an array of bytes for testing + var byteArray []byte + byteArray = append(byteArray, 100) + taskList := "tasklist" + testEvents := []*s.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &s.WorkflowExecutionStartedEventAttributes{ + TaskList: &s.TaskList{Name: &taskList}, + Input: byteArray, + ContinuedFailureDetails: byteArray}), + createTestEventWorkflowExecutionStarted(2, &s.WorkflowExecutionStartedEventAttributes{ + TaskList: &s.TaskList{Name: &taskList}, + Input: byteArray, + ContinuedFailureDetails: byteArray}), + createTestEventWorkflowExecutionStarted(3, &s.WorkflowExecutionStartedEventAttributes{ + TaskList: &s.TaskList{Name: &taskList}, + Input: byteArray, + ContinuedFailureDetails: byteArray}), + } + core, _ := observer.New(zapcore.InfoLevel) + logger := zap.New(core, zap.Development()) + w := workflowExecutionEventHandlerImpl{ + workflowEnvironmentImpl: &workflowEnvironmentImpl{logger: logger}, + } + + w.logger = logger + historySizeSum := 0 + for _, event := range testEvents { + sum := estimateHistorySize(logger, event) + historySizeSum += sum + } + trueSize := len(testEvents)*historySizeEstimationBuffer + len(byteArray)*2*len(testEvents) + assert.Equal(t, trueSize, historySizeSum) +} diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 430caee3f..c872b87e0 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -843,6 +843,8 @@ process_Workflow_Loop: func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) { task := workflowTask.task historyIterator := workflowTask.historyIterator + w.workflowInfo.HistoryBytesServer = task.GetTotalHistoryBytes() + w.workflowInfo.HistoryCount = task.GetNextEventId() - 1 if err := w.ResetIfStale(task, historyIterator); err != nil { return nil, err } @@ -866,6 +868,8 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo ProcessEvents: for { reorderedEvents, markers, binaryChecksum, err := reorderedHistory.NextDecisionEvents() + w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge(metrics.EstimatedHistorySize).Update(float64(w.workflowInfo.TotalHistoryBytes)) + w.wth.metricsScope.GetTaggedScope("workflowtype", w.workflowInfo.WorkflowType.Name).Gauge(metrics.ServerSideHistorySize).Update(float64(w.workflowInfo.HistoryBytesServer)) if err != nil { return nil, err } diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index c709765ad..37c2f3f54 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -117,7 +117,7 @@ type ( service workflowserviceclient.Interface metricsScope tally.Scope startedEventID int64 - maxEventID int64 + maxEventID int64 // Equivalent to History Count featureFlags FeatureFlags } @@ -330,7 +330,6 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { }) return nil } - doneCh := make(chan struct{}) laResultCh := make(chan *localActivityResult) // close doneCh so local activity worker won't get blocked forever when trying to send back result to laResultCh. @@ -341,6 +340,7 @@ func (wtp *workflowTaskPoller) processWorkflowTask(task *workflowTask) error { startTime := time.Now() task.doneCh = doneCh task.laResultCh = laResultCh + // Process the task. completedRequest, err := wtp.taskHandler.ProcessWorkflowTask( task, func(response interface{}, startTime time.Time) (*workflowTask, error) { @@ -897,6 +897,11 @@ func (h *historyIteratorImpl) HasNextPage() bool { return h.nextPageToken != nil } +// GetHistoryCount returns History Event Count of current history (aka maxEventID) +func (h *historyIteratorImpl) GetHistoryCount() int64 { + return h.maxEventID +} + func newGetHistoryPageFunc( ctx context.Context, service workflowserviceclient.Interface, diff --git a/internal/internal_utils.go b/internal/internal_utils.go index d6b45b227..d12746c6f 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -26,6 +26,7 @@ import ( "context" "encoding/json" "fmt" + "go.uber.org/zap" "os" "os/signal" "strings" @@ -354,3 +355,141 @@ func getTimeoutTypeFromErrReason(reason string) (s.TimeoutType, error) { } return timeoutType, nil } + +func estimateHistorySize(logger *zap.Logger, event *s.HistoryEvent) int { + sum := historySizeEstimationBuffer + switch event.GetEventType() { + case s.EventTypeWorkflowExecutionStarted: + if event.WorkflowExecutionStartedEventAttributes != nil { + sum += len(event.WorkflowExecutionStartedEventAttributes.Input) + sum += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails) + sum += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult) + sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Memo.GetFields()) + sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.Header.GetFields()) + sum += sizeOf(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.GetIndexedFields()) + } + case s.EventTypeWorkflowExecutionCompleted: + if event.WorkflowExecutionCompletedEventAttributes != nil { + sum += len(event.WorkflowExecutionCompletedEventAttributes.Result) + } + case s.EventTypeWorkflowExecutionSignaled: + if event.WorkflowExecutionSignaledEventAttributes != nil { + sum += len(event.WorkflowExecutionSignaledEventAttributes.Input) + } + case s.EventTypeWorkflowExecutionFailed: + if event.WorkflowExecutionFailedEventAttributes != nil { + sum += len(event.WorkflowExecutionFailedEventAttributes.Details) + } + case s.EventTypeDecisionTaskStarted: + if event.DecisionTaskStartedEventAttributes != nil { + sum += getLengthOfStringPointer(event.DecisionTaskStartedEventAttributes.Identity) + } + case s.EventTypeDecisionTaskCompleted: + if event.DecisionTaskCompletedEventAttributes != nil { + sum += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext) + sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.Identity) + sum += getLengthOfStringPointer(event.DecisionTaskCompletedEventAttributes.BinaryChecksum) + } + case s.EventTypeDecisionTaskFailed: + if event.DecisionTaskFailedEventAttributes != nil { + sum += len(event.DecisionTaskFailedEventAttributes.Details) + } + case s.EventTypeActivityTaskScheduled: + if event.ActivityTaskScheduledEventAttributes != nil { + sum += len(event.ActivityTaskScheduledEventAttributes.Input) + sum += sizeOf(event.ActivityTaskScheduledEventAttributes.Header.GetFields()) + } + case s.EventTypeActivityTaskStarted: + if event.ActivityTaskStartedEventAttributes != nil { + sum += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails) + } + case s.EventTypeActivityTaskCompleted: + if event.ActivityTaskCompletedEventAttributes != nil { + sum += len(event.ActivityTaskCompletedEventAttributes.Result) + sum += getLengthOfStringPointer(event.ActivityTaskCompletedEventAttributes.Identity) + } + case s.EventTypeActivityTaskFailed: + if event.ActivityTaskFailedEventAttributes != nil { + sum += len(event.ActivityTaskFailedEventAttributes.Details) + } + case s.EventTypeActivityTaskTimedOut: + if event.ActivityTaskTimedOutEventAttributes != nil { + sum += len(event.ActivityTaskTimedOutEventAttributes.Details) + sum += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails) + } + case s.EventTypeActivityTaskCanceled: + if event.ActivityTaskCanceledEventAttributes != nil { + sum += len(event.ActivityTaskCanceledEventAttributes.Details) + } + case s.EventTypeMarkerRecorded: + if event.MarkerRecordedEventAttributes != nil { + sum += len(event.MarkerRecordedEventAttributes.Details) + } + case s.EventTypeWorkflowExecutionTerminated: + if event.WorkflowExecutionTerminatedEventAttributes != nil { + sum += len(event.WorkflowExecutionTerminatedEventAttributes.Details) + } + case s.EventTypeWorkflowExecutionCanceled: + if event.WorkflowExecutionCanceledEventAttributes != nil { + sum += len(event.WorkflowExecutionCanceledEventAttributes.Details) + } + case s.EventTypeWorkflowExecutionContinuedAsNew: + if event.WorkflowExecutionContinuedAsNewEventAttributes != nil { + sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input) + sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.FailureDetails) + sum += len(event.WorkflowExecutionContinuedAsNewEventAttributes.LastCompletionResult) + sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.GetFields()) + sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.GetFields()) + sum += sizeOf(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.GetIndexedFields()) + } + case s.EventTypeStartChildWorkflowExecutionInitiated: + if event.StartChildWorkflowExecutionInitiatedEventAttributes != nil { + sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input) + sum += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control) + sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.GetFields()) + sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.GetFields()) + sum += sizeOf(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.GetIndexedFields()) + } + case s.EventTypeChildWorkflowExecutionCompleted: + if event.ChildWorkflowExecutionCompletedEventAttributes != nil { + sum += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result) + } + case s.EventTypeChildWorkflowExecutionFailed: + if event.ChildWorkflowExecutionFailedEventAttributes != nil { + sum += len(event.ChildWorkflowExecutionFailedEventAttributes.Details) + sum += getLengthOfStringPointer(event.ChildWorkflowExecutionFailedEventAttributes.Reason) + } + case s.EventTypeChildWorkflowExecutionCanceled: + if event.ChildWorkflowExecutionCanceledEventAttributes != nil { + sum += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details) + } + case s.EventTypeSignalExternalWorkflowExecutionInitiated: + if event.SignalExternalWorkflowExecutionInitiatedEventAttributes != nil { + sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control) + sum += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input) + } + + default: + logger.Warn("unknown event type", zap.String("Event Type", event.GetEventType().String())) + // Do not fail to be forward compatible with new events + } + + return sum +} + +// simple function to estimate the size of a map[string][]byte +func sizeOf(o map[string][]byte) int { + sum := 0 + for k, v := range o { + sum += len(k) + len(v) + } + return sum +} + +// simple function to estimate the size of a string pointer +func getLengthOfStringPointer(s *string) int { + if s == nil { + return 0 + } + return len(*s) +} diff --git a/internal/workflow.go b/internal/workflow.go index 9f6fa747e..5568e1608 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1111,6 +1111,9 @@ type WorkflowInfo struct { BinaryChecksum *string // The identifier(generated by md5sum by default) of worker code that is making the current decision(can be used for auto-reset feature) DecisionStartedEventID int64 // the eventID of DecisionStarted that is making the current decision(can be used for reset API) RetryPolicy *s.RetryPolicy + TotalHistoryBytes int64 + HistoryBytesServer int64 + HistoryCount int64 } // GetBinaryChecksum returns the binary checksum(identifier) of this worker @@ -1158,6 +1161,18 @@ func (wc *workflowEnvironmentInterceptor) GetMetricsScope(ctx Context) tally.Sco return wc.env.GetMetricsScope() } +// GetTotalEstimatedHistoryBytes returns the current history size of that workflow +func GetTotalEstimatedHistoryBytes(ctx Context) int64 { + i := getWorkflowInterceptor(ctx) + return i.GetWorkflowInfo(ctx).TotalHistoryBytes +} + +// GetHistoryCount returns the current number of history events of that workflow +func GetHistoryCount(ctx Context) int64 { + i := getWorkflowInterceptor(ctx) + return i.GetWorkflowInfo(ctx).HistoryCount +} + // Now returns the current time in UTC. It corresponds to the time when the decision task is started or replayed. // Workflow needs to use this method to get the wall clock time instead of the one from the golang library. func Now(ctx Context) time.Time { diff --git a/workflow/workflow.go b/workflow/workflow.go index fe27cdbbb..b1df0af10 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -217,6 +217,16 @@ func GetMetricsScope(ctx Context) tally.Scope { return internal.GetMetricsScope(ctx) } +// GetTotalHistoryBytes returns the current history size of that workflow +func GetTotalHistoryBytes(ctx Context) int64 { + return internal.GetTotalEstimatedHistoryBytes(ctx) +} + +// GetHistoryCount returns the current number of history event of that workflow +func GetHistoryCount(ctx Context) int64 { + return internal.GetHistoryCount(ctx) +} + // RequestCancelExternalWorkflow can be used to request cancellation of an external workflow. // Input workflowID is the workflow ID of target workflow. // Input runID indicates the instance of a workflow. Input runID is optional (default is ""). When runID is not specified, From 51f7207bf70dcd47f673f4544991144da670dd7d Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 16 Nov 2023 11:09:25 -0800 Subject: [PATCH 06/16] Resolve comments and add a new workerUsageCollectorPanic metric --- internal/common/metrics/constants.go | 6 ++++-- internal/internal_worker_base.go | 3 +-- internal/internal_worker_usage_collector.go | 10 ++++++++-- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/common/metrics/constants.go b/internal/common/metrics/constants.go index 31b78f189..fcfa227ca 100644 --- a/internal/common/metrics/constants.go +++ b/internal/common/metrics/constants.go @@ -115,8 +115,10 @@ const ( MemoryUsedStack = CadenceMetricsPrefix + "memory-used-stack" NumGoRoutines = CadenceMetricsPrefix + "num-go-routines" - EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size" - ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size" + EstimatedHistorySize = CadenceMetricsPrefix + "estimated-history-size" + ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size" ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota" PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage" + + WorkerUsageCollectorPanic = CadenceMetricsPrefix + "worker-metrics-collector-panic" ) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 78d2eb873..cc88f9e47 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -171,8 +171,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t ) } // for now it's default to be enabled - var workerUC *workerUsageCollector - workerUC = newWorkerUsageCollector( + workerUC := newWorkerUsageCollector( workerUsageCollectorOptions{ Enabled: true, Cooldown: 30 * time.Second, diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 759dc1027..76e5a32b8 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -2,6 +2,7 @@ package internal import ( "context" + "fmt" "github.com/shirou/gopsutil/cpu" "github.com/uber-go/tally" "go.uber.org/cadence/internal/common/metrics" @@ -63,12 +64,17 @@ func (w *workerUsageCollector) Start() { go func() { defer func() { if p := recover(); p != nil { - w.logger.Error("Unhandled panic in workerUsageCollector.") - w.logger.Error(p.(error).Error()) + w.metricsScope.Counter(metrics.WorkerUsageCollectorPanic).Inc(1) + topLine := fmt.Sprintf("WorkerUsageCollector panic for workertype: %v", w.workerType) + st := getStackTraceRaw(topLine, 7, 0) + w.logger.Error("WorkerUsageCollector panic.", + zap.String(tagPanicError, fmt.Sprintf("%v", p)), + zap.String(tagPanicStack, st)) } }() defer w.wg.Done() ticker := time.NewTicker(w.cooldownTime) + defer ticker.Stop() for { select { case <-w.ctx.Done(): From e3040349c8dcfafd2e6d19e64943a9bc9dfaf625 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Fri, 17 Nov 2023 13:46:42 -0800 Subject: [PATCH 07/16] Add Sync.once back and change test so that it won't block testing --- internal/internal_worker_base.go | 1 + internal/internal_worker_interfaces_test.go | 11 +++++++++++ internal/internal_worker_usage_collector.go | 21 +++++++++++++-------- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index cc88f9e47..ace6ac194 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -180,6 +180,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t }, logger, ) + collectHardwareUsageOnce = &sync.Once{} bw := &baseWorker{ options: options, diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 827abe0e0..185de32ba 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -56,8 +56,17 @@ type ( mockCtrl *gomock.Controller service *workflowservicetest.MockClient } + + // fakeSyncOnce is a fake implementation of oncePerHost interface + // that DOES NOT ensure run only once per host + fakeSyncOnce struct { + } ) +func (m *fakeSyncOnce) Do(f func()) { + f() +} + func helloWorldWorkflowFunc(ctx Context, input []byte) error { queryResult := startingQueryValue SetQueryHandler(ctx, queryType, func() (string, error) { @@ -195,6 +204,8 @@ func (s *InterfacesTestSuite) TestInterface() { }, } + collectHardwareUsageOnce = &fakeSyncOnce{} + // mocks s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(domainDesc, nil).AnyTimes() s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(&m.PollForActivityTaskResponse{}, nil).AnyTimes() diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 76e5a32b8..2ecaaf72c 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -38,8 +38,14 @@ type ( MemoryUsedHeap float64 MemoryUsedStack float64 } + + oncePerHost interface { + Do(func()) + } ) +var collectHardwareUsageOnce oncePerHost + func newWorkerUsageCollector( options workerUsageCollectorOptions, logger *zap.Logger, @@ -80,14 +86,13 @@ func (w *workerUsageCollector) Start() { case <-w.ctx.Done(): return case <-ticker.C: - // Given that decision worker and activity worker are running in the same host, we only need to collect - // hardware usage from one of them. - if w.workerType == "DecisionWorker" { - hardwareUsageData := w.collectHardwareUsage() - if w.metricsScope != nil { - w.emitHardwareUsage(hardwareUsageData) - } - } + collectHardwareUsageOnce.Do( + func() { + hardwareUsageData := w.collectHardwareUsage() + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) + } + }) } } }() From 24b4a847fdc5b71ed52bcda4b620b2c0ce61c991 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Sun, 19 Nov 2023 12:44:12 -0800 Subject: [PATCH 08/16] further testing --- internal/internal_worker_base.go | 4 +++- internal/internal_worker_interfaces_test.go | 3 +-- internal/internal_worker_usage_collector.go | 7 ++++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index ace6ac194..98ce6b7af 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -55,6 +55,8 @@ var ( var errShutdown = errors.New("worker shutting down") +var emitOnce sync.Once + type ( // resultHandler that returns result resultHandler func(result []byte, err error) @@ -177,10 +179,10 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t Cooldown: 30 * time.Second, MetricsScope: metricsScope, WorkerType: options.workerType, + EmitOnce: &emitOnce, }, logger, ) - collectHardwareUsageOnce = &sync.Once{} bw := &baseWorker{ options: options, diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 185de32ba..01a33390c 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -204,8 +204,6 @@ func (s *InterfacesTestSuite) TestInterface() { }, } - collectHardwareUsageOnce = &fakeSyncOnce{} - // mocks s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(domainDesc, nil).AnyTimes() s.service.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), callOptionsWithIsolationGroupHeader()...).Return(&m.PollForActivityTaskResponse{}, nil).AnyTimes() @@ -232,6 +230,7 @@ func (s *InterfacesTestSuite) TestInterface() { // Register activity instances and launch the worker. activityWorker := newActivityWorker(s.service, domain, activityExecutionParameters, nil, registry, nil) + //activityWorker.worker.workerUsageCollector.emitOnce = &fakeSyncOnce{} defer activityWorker.Stop() activityWorker.Start() diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 2ecaaf72c..45eb8eaa8 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -21,6 +21,7 @@ type ( wg *sync.WaitGroup // graceful stop cancel context.CancelFunc metricsScope tally.Scope + emitOnce oncePerHost } workerUsageCollectorOptions struct { @@ -28,6 +29,7 @@ type ( Cooldown time.Duration MetricsScope tally.Scope WorkerType string + EmitOnce oncePerHost } hardwareUsage struct { @@ -44,8 +46,6 @@ type ( } ) -var collectHardwareUsageOnce oncePerHost - func newWorkerUsageCollector( options workerUsageCollectorOptions, logger *zap.Logger, @@ -62,6 +62,7 @@ func newWorkerUsageCollector( ctx: ctx, cancel: cancel, wg: &sync.WaitGroup{}, + emitOnce: options.EmitOnce, } } @@ -86,7 +87,7 @@ func (w *workerUsageCollector) Start() { case <-w.ctx.Done(): return case <-ticker.C: - collectHardwareUsageOnce.Do( + emitOnce.Do( func() { hardwareUsageData := w.collectHardwareUsage() if w.metricsScope != nil { From a89107fb9c3928d91e9053b72b90dc96132395e4 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Sun, 19 Nov 2023 13:14:22 -0800 Subject: [PATCH 09/16] more --- internal/internal_worker_interfaces_test.go | 1 - internal/internal_worker_usage_collector.go | 57 ++++++++++----------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 01a33390c..a09ccf66e 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -230,7 +230,6 @@ func (s *InterfacesTestSuite) TestInterface() { // Register activity instances and launch the worker. activityWorker := newActivityWorker(s.service, domain, activityExecutionParameters, nil, registry, nil) - //activityWorker.worker.workerUsageCollector.emitOnce = &fakeSyncOnce{} defer activityWorker.Stop() activityWorker.Start() diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 45eb8eaa8..b68e045b1 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -14,14 +14,14 @@ import ( type ( workerUsageCollector struct { - workerType string - cooldownTime time.Duration - logger *zap.Logger - ctx context.Context - wg *sync.WaitGroup // graceful stop - cancel context.CancelFunc - metricsScope tally.Scope - emitOnce oncePerHost + workerType string + cooldownTime time.Duration + logger *zap.Logger + ctx context.Context + wg *sync.WaitGroup // graceful stop + cancel context.CancelFunc + metricsScope tally.Scope + emitOncePerHost oncePerHost } workerUsageCollectorOptions struct { @@ -55,14 +55,14 @@ func newWorkerUsageCollector( } ctx, cancel := context.WithCancel(context.Background()) return &workerUsageCollector{ - workerType: options.WorkerType, - cooldownTime: options.Cooldown, - metricsScope: options.MetricsScope, - logger: logger, - ctx: ctx, - cancel: cancel, - wg: &sync.WaitGroup{}, - emitOnce: options.EmitOnce, + workerType: options.WorkerType, + cooldownTime: options.Cooldown, + metricsScope: options.MetricsScope, + logger: logger, + ctx: ctx, + cancel: cancel, + wg: &sync.WaitGroup{}, + emitOncePerHost: options.EmitOnce, } } @@ -87,13 +87,10 @@ func (w *workerUsageCollector) Start() { case <-w.ctx.Done(): return case <-ticker.C: - emitOnce.Do( - func() { - hardwareUsageData := w.collectHardwareUsage() - if w.metricsScope != nil { - w.emitHardwareUsage(hardwareUsageData) - } - }) + hardwareUsageData := w.collectHardwareUsage() + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) + } } } }() @@ -129,10 +126,12 @@ func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { // emitHardwareUsage emits collected hardware usage metrics to metrics scope func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { - w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) - w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) - w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) - w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) - w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) - w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) + emitOnce.Do(func() { + w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) + w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) + w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) + w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) + w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) + w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) + }) } From a9c526f78d5ce82d647a8ac03c77385b63863797 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Sun, 19 Nov 2023 14:36:37 -0800 Subject: [PATCH 10/16] Change to shutdownCh instead of ctx.cancel --- internal/internal_worker_usage_collector.go | 48 ++++++++++++--------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index b68e045b1..7ff8cbb53 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -18,8 +18,8 @@ type ( cooldownTime time.Duration logger *zap.Logger ctx context.Context + shutdownCh chan struct{} wg *sync.WaitGroup // graceful stop - cancel context.CancelFunc metricsScope tally.Scope emitOncePerHost oncePerHost } @@ -53,16 +53,16 @@ func newWorkerUsageCollector( if !options.Enabled { return nil } - ctx, cancel := context.WithCancel(context.Background()) + ctx, _ := context.WithCancel(context.Background()) return &workerUsageCollector{ workerType: options.WorkerType, cooldownTime: options.Cooldown, metricsScope: options.MetricsScope, logger: logger, ctx: ctx, - cancel: cancel, wg: &sync.WaitGroup{}, emitOncePerHost: options.EmitOnce, + shutdownCh: make(chan struct{}), } } @@ -82,24 +82,34 @@ func (w *workerUsageCollector) Start() { defer w.wg.Done() ticker := time.NewTicker(w.cooldownTime) defer ticker.Stop() + + w.wg.Add(1) + go w.runHardwareCollector(ticker) + + }() + return +} + +func (w *workerUsageCollector) Stop() { + close(w.shutdownCh) + w.wg.Wait() +} + +func (w *workerUsageCollector) runHardwareCollector(tick *time.Ticker) { + defer w.wg.Done() + w.emitOncePerHost.Do(func() { for { select { - case <-w.ctx.Done(): + case <-w.shutdownCh: return - case <-ticker.C: + case <-tick.C: hardwareUsageData := w.collectHardwareUsage() if w.metricsScope != nil { w.emitHardwareUsage(hardwareUsageData) } } } - }() - return -} - -func (w *workerUsageCollector) Stop() { - w.cancel() - w.wg.Wait() + }) } func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { @@ -126,12 +136,10 @@ func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { // emitHardwareUsage emits collected hardware usage metrics to metrics scope func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) { - emitOnce.Do(func() { - w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) - w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) - w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) - w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) - w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) - w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) - }) + w.metricsScope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores)) + w.metricsScope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent) + w.metricsScope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines)) + w.metricsScope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory)) + w.metricsScope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap)) + w.metricsScope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack)) } From fa1e1901f952c2b1eacd9b9c626d3d614a7d6b85 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Sun, 19 Nov 2023 14:57:08 -0800 Subject: [PATCH 11/16] add ctx.canel back --- internal/internal_worker_usage_collector.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 7ff8cbb53..ed72e25b5 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -20,6 +20,7 @@ type ( ctx context.Context shutdownCh chan struct{} wg *sync.WaitGroup // graceful stop + cancel context.CancelFunc metricsScope tally.Scope emitOncePerHost oncePerHost } @@ -53,13 +54,14 @@ func newWorkerUsageCollector( if !options.Enabled { return nil } - ctx, _ := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) return &workerUsageCollector{ workerType: options.WorkerType, cooldownTime: options.Cooldown, metricsScope: options.MetricsScope, logger: logger, ctx: ctx, + cancel: cancel, wg: &sync.WaitGroup{}, emitOncePerHost: options.EmitOnce, shutdownCh: make(chan struct{}), @@ -91,6 +93,7 @@ func (w *workerUsageCollector) Start() { } func (w *workerUsageCollector) Stop() { + w.cancel() close(w.shutdownCh) w.wg.Wait() } From 3628eb93c1927cd9e2b4f6bf6dfc24d158e8a315 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Sun, 19 Nov 2023 18:36:41 -0800 Subject: [PATCH 12/16] remove cancel and add logger --- internal/internal_worker_interfaces_test.go | 2 ++ internal/internal_worker_usage_collector.go | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index a09ccf66e..4068e3a37 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -215,6 +215,7 @@ func (s *InterfacesTestSuite) TestInterface() { registry := newRegistry() // Launch worker. workflowWorker := newWorkflowWorker(s.service, domain, workflowExecutionParameters, nil, registry, nil) + workflowWorker.worker.workerUsageCollector.emitOncePerHost = &fakeSyncOnce{} defer workflowWorker.Stop() workflowWorker.Start() @@ -230,6 +231,7 @@ func (s *InterfacesTestSuite) TestInterface() { // Register activity instances and launch the worker. activityWorker := newActivityWorker(s.service, domain, activityExecutionParameters, nil, registry, nil) + activityWorker.worker.workerUsageCollector.emitOncePerHost = &fakeSyncOnce{} defer activityWorker.Stop() activityWorker.Start() diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index ed72e25b5..3a04c6940 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -86,6 +86,7 @@ func (w *workerUsageCollector) Start() { defer ticker.Stop() w.wg.Add(1) + w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) go w.runHardwareCollector(ticker) }() @@ -93,14 +94,16 @@ func (w *workerUsageCollector) Start() { } func (w *workerUsageCollector) Stop() { - w.cancel() close(w.shutdownCh) w.wg.Wait() + w.cancel() + } func (w *workerUsageCollector) runHardwareCollector(tick *time.Ticker) { defer w.wg.Done() w.emitOncePerHost.Do(func() { + w.logger.Info(fmt.Sprintf("Started worker usage collector for workertype: %v", w.workerType)) for { select { case <-w.shutdownCh: From 96e426765d15637c61f0360f142cda2e902f1567 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Sun, 19 Nov 2023 19:26:28 -0800 Subject: [PATCH 13/16] move ticker --- internal/internal_worker_usage_collector.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 3a04c6940..3c110996e 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -82,12 +82,10 @@ func (w *workerUsageCollector) Start() { } }() defer w.wg.Done() - ticker := time.NewTicker(w.cooldownTime) - defer ticker.Stop() w.wg.Add(1) w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) - go w.runHardwareCollector(ticker) + go w.runHardwareCollector() }() return @@ -100,15 +98,17 @@ func (w *workerUsageCollector) Stop() { } -func (w *workerUsageCollector) runHardwareCollector(tick *time.Ticker) { +func (w *workerUsageCollector) runHardwareCollector() { defer w.wg.Done() + ticker := time.NewTicker(w.cooldownTime) + defer ticker.Stop() w.emitOncePerHost.Do(func() { w.logger.Info(fmt.Sprintf("Started worker usage collector for workertype: %v", w.workerType)) for { select { case <-w.shutdownCh: return - case <-tick.C: + case <-ticker.C: hardwareUsageData := w.collectHardwareUsage() if w.metricsScope != nil { w.emitHardwareUsage(hardwareUsageData) From 50926068608611b2e99d2eb42df9198fb63a619c Mon Sep 17 00:00:00 2001 From: Tim Li Date: Tue, 21 Nov 2023 20:50:05 -0800 Subject: [PATCH 14/16] add sync.Once into a worker option so that test code will not be blocked by sync.Once --- internal/internal_worker.go | 9 ++-- internal/internal_worker_base.go | 15 +++++- internal/internal_worker_interfaces_test.go | 11 +++-- internal/internal_worker_test.go | 10 ++-- internal/internal_worker_usage_collector.go | 52 +++++++-------------- internal/internal_workers_test.go | 15 ++++-- internal/worker.go | 6 +++ internal/workflow_shadower_worker_test.go | 18 ++++--- 8 files changed, 80 insertions(+), 56 deletions(-) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 75beddda3..0f60a7640 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -281,7 +281,8 @@ func newWorkflowTaskWorkerInternal( taskWorker: poller, identity: params.Identity, workerType: "DecisionWorker", - shutdownTimeout: params.WorkerStopTimeout}, + shutdownTimeout: params.WorkerStopTimeout, + sync: ¶ms.Sync}, params.Logger, params.MetricsScope, nil, @@ -304,7 +305,8 @@ func newWorkflowTaskWorkerInternal( taskWorker: localActivityTaskPoller, identity: params.Identity, workerType: "LocalActivityWorker", - shutdownTimeout: params.WorkerStopTimeout}, + shutdownTimeout: params.WorkerStopTimeout, + sync: ¶ms.Sync}, params.Logger, params.MetricsScope, nil, @@ -482,7 +484,8 @@ func newActivityTaskWorker( identity: workerParams.Identity, workerType: workerType, shutdownTimeout: workerParams.WorkerStopTimeout, - userContextCancel: workerParams.UserContextCancel}, + userContextCancel: workerParams.UserContextCancel, + sync: &workerParams.Sync}, workerParams.Logger, workerParams.MetricsScope, sessionTokenBucket, diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 98ce6b7af..c390b8be9 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -122,6 +122,7 @@ type ( shutdownTimeout time.Duration userContextCancel context.CancelFunc host string + sync *oncePerHost } // baseWorker that wraps worker activities. @@ -148,6 +149,10 @@ type ( polledTask struct { task interface{} } + + oncePerHost interface { + Do(func()) + } ) func createPollRetryPolicy() backoff.RetryPolicy { @@ -172,6 +177,14 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t logger, ) } + + var once oncePerHost + if options.sync == nil { + once = &emitOnce + } else { + once = *options.sync + } + // for now it's default to be enabled workerUC := newWorkerUsageCollector( workerUsageCollectorOptions{ @@ -179,7 +192,7 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t Cooldown: 30 * time.Second, MetricsScope: metricsScope, WorkerType: options.workerType, - EmitOnce: &emitOnce, + EmitOnce: once, }, logger, ) diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 4068e3a37..52258e024 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -63,6 +63,8 @@ type ( } ) +var fakeSyncOnceValue fakeSyncOnce + func (m *fakeSyncOnce) Do(f func()) { f() } @@ -188,12 +190,12 @@ func (s *InterfacesTestSuite) TestInterface() { domain := "testDomain" // Workflow execution parameters. workflowExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 4, MaxConcurrentDecisionTaskPollers: 4, Logger: zaptest.NewLogger(s.T()), - Tracer: opentracing.NoopTracer{}}, + Tracer: opentracing.NoopTracer{}, + Sync: &fakeSyncOnce{}}, } domainStatus := m.DomainStatusRegistered @@ -215,7 +217,6 @@ func (s *InterfacesTestSuite) TestInterface() { registry := newRegistry() // Launch worker. workflowWorker := newWorkflowWorker(s.service, domain, workflowExecutionParameters, nil, registry, nil) - workflowWorker.worker.workerUsageCollector.emitOncePerHost = &fakeSyncOnce{} defer workflowWorker.Stop() workflowWorker.Start() @@ -226,12 +227,12 @@ func (s *InterfacesTestSuite) TestInterface() { MaxConcurrentActivityTaskPollers: 10, MaxConcurrentDecisionTaskPollers: 10, Logger: zaptest.NewLogger(s.T()), - Tracer: opentracing.NoopTracer{}}, + Tracer: opentracing.NoopTracer{}, + Sync: &fakeSyncOnce{}}, } // Register activity instances and launch the worker. activityWorker := newActivityWorker(s.service, domain, activityExecutionParameters, nil, registry, nil) - activityWorker.worker.workerUsageCollector.emitOncePerHost = &fakeSyncOnce{} defer activityWorker.Stop() activityWorker.Start() diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 764efd111..805881b92 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -367,6 +367,7 @@ func createShadowWorker( return createWorkerWithThrottle(t, service, 0, WorkerOptions{ EnableShadowWorker: true, ShadowOptions: *shadowOptions, + Sync: &fakeSyncOnce{}, }) } @@ -409,6 +410,7 @@ func createWorkerWithThrottle( workerOptions.TaskListActivitiesPerSecond = activitiesPerSecond workerOptions.Logger = zaptest.NewLogger(t) workerOptions.EnableSessionWorker = true + workerOptions.Sync = &fakeSyncOnce{} // Start Worker. worker := NewWorker( @@ -423,14 +425,14 @@ func createWorkerWithDataConverter( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter(), Sync: &fakeSyncOnce{}}) } func createWorkerWithAutoscaler( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}, Sync: &fakeSyncOnce{}}) } func createWorkerWithStrictNonDeterminismDisabled( @@ -444,7 +446,7 @@ func createWorkerWithHost( t *testing.T, service *workflowservicetest.MockClient, ) *aggregatedWorker { - return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"}) + return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host", Sync: &fakeSyncOnce{}}) } func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) { @@ -1031,7 +1033,7 @@ func TestActivityNilArgs(t *testing.T) { func TestWorkerOptionDefaults(t *testing.T) { domain := "worker-options-test" taskList := "worker-options-tl" - aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{Sync: &fakeSyncOnce{}}) decisionWorker := aggWorker.workflowWorker require.True(t, decisionWorker.executionParameters.Identity != "") require.NotNil(t, decisionWorker.executionParameters.Logger) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 3c110996e..89a6e889b 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -41,10 +41,6 @@ type ( MemoryUsedHeap float64 MemoryUsedStack float64 } - - oncePerHost interface { - Do(func()) - } ) func newWorkerUsageCollector( @@ -69,26 +65,16 @@ func newWorkerUsageCollector( } func (w *workerUsageCollector) Start() { - w.wg.Add(1) - go func() { - defer func() { - if p := recover(); p != nil { - w.metricsScope.Counter(metrics.WorkerUsageCollectorPanic).Inc(1) - topLine := fmt.Sprintf("WorkerUsageCollector panic for workertype: %v", w.workerType) - st := getStackTraceRaw(topLine, 7, 0) - w.logger.Error("WorkerUsageCollector panic.", - zap.String(tagPanicError, fmt.Sprintf("%v", p)), - zap.String(tagPanicStack, st)) - } - }() - defer w.wg.Done() - w.wg.Add(1) - w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) - go w.runHardwareCollector() + if w.emitOncePerHost != nil { + w.emitOncePerHost.Do( + func() { + w.wg.Add(1) + w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) + go w.runHardwareCollector() + }) + } - }() - return } func (w *workerUsageCollector) Stop() { @@ -102,20 +88,18 @@ func (w *workerUsageCollector) runHardwareCollector() { defer w.wg.Done() ticker := time.NewTicker(w.cooldownTime) defer ticker.Stop() - w.emitOncePerHost.Do(func() { - w.logger.Info(fmt.Sprintf("Started worker usage collector for workertype: %v", w.workerType)) - for { - select { - case <-w.shutdownCh: - return - case <-ticker.C: - hardwareUsageData := w.collectHardwareUsage() - if w.metricsScope != nil { - w.emitHardwareUsage(hardwareUsageData) - } + w.logger.Info(fmt.Sprintf("Started worker usage collector for workertype: %v", w.workerType)) + for { + select { + case <-w.shutdownCh: + return + case <-ticker.C: + hardwareUsageData := w.collectHardwareUsage() + if w.metricsScope != nil { + w.emitHardwareUsage(hardwareUsageData) } } - }) + } } func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage { diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index 1ca2a2020..a87471fa5 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -95,7 +95,8 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { TaskList: "testTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentDecisionTaskPollers: 5, - Logger: logger}, + Logger: logger, + Sync: &fakeSyncOnce{}}, UserContext: ctx, UserContextCancel: cancel, } @@ -127,7 +128,8 @@ func (s *WorkersTestSuite) testActivityWorker(useLocallyDispatched bool) { TaskList: "testTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentActivityTaskPollers: 5, - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, } overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler(), useLocallyDispatchedActivityPoller: useLocallyDispatched} a := &greeterActivity{} @@ -174,6 +176,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { MaxConcurrentActivityTaskPollers: 5, MaxConcurrentActivityExecutionSize: 2, Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}, }, ), UserContext: ctx, @@ -212,7 +215,8 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() { TaskList: "testDecisionTaskList", WorkerOptions: WorkerOptions{ MaxConcurrentDecisionTaskPollers: 5, - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, } overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()} workflowWorker := newWorkflowWorkerInternal( @@ -339,6 +343,7 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() { Logger: zaptest.NewLogger(s.T()), DisableActivityWorker: true, Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -514,6 +519,7 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() { // and we can force clear the cache when polling the query task. // See the mock function for the second PollForDecisionTask call above. MaxConcurrentDecisionTaskExecutionSize: 1, + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -637,6 +643,7 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() { Logger: zaptest.NewLogger(s.T()), DisableActivityWorker: true, Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -747,6 +754,7 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() { options := WorkerOptions{ Logger: zaptest.NewLogger(s.T()), Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } worker := newAggregatedWorker(s.service, domain, taskList, options) worker.RegisterWorkflowWithOptions( @@ -812,6 +820,7 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() { options := WorkerOptions{ Logger: zaptest.NewLogger(s.T()), Identity: "test-worker-identity", + Sync: &fakeSyncOnce{}, } s.service.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), callOptions()...).Return(nil, nil).AnyTimes() diff --git a/internal/worker.go b/internal/worker.go index abef02a5b..15a3fe91a 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -271,6 +271,12 @@ type ( // // Deprecated: All bugports are always deprecated and may be removed at any time. WorkerBugPorts WorkerBugPorts + + // Optional: This implementation ensures that a specific function is executed only once per instance. + // The mechanism can be overridden by other interfaces that implement the 'Do()' method. + // + // default: nil, that would ensure some functions are executed only once + Sync oncePerHost } // WorkerBugPorts allows opt-in enabling of older, possibly buggy behavior, primarily intended to allow temporarily diff --git a/internal/workflow_shadower_worker_test.go b/internal/workflow_shadower_worker_test.go index 29b38983c..30a89f4a2 100644 --- a/internal/workflow_shadower_worker_test.go +++ b/internal/workflow_shadower_worker_test.go @@ -70,7 +70,8 @@ func (s *shadowWorkerSuite) TestNewShadowWorker() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, registry, ) @@ -102,7 +103,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_InvalidShadowOption() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -122,7 +124,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_DomainNotExist() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -141,7 +144,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_TaskListNotSpecified() ShadowOptions{}, workerExecutionParameters{ WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -165,7 +169,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Failed_StartWorkflowError() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) @@ -209,7 +214,8 @@ func (s *shadowWorkerSuite) TestStartShadowWorker_Succeed() { workerExecutionParameters{ TaskList: testTaskList, WorkerOptions: WorkerOptions{ - Logger: zaptest.NewLogger(s.T())}, + Logger: zaptest.NewLogger(s.T()), + Sync: &fakeSyncOnce{}}, }, newRegistry(), ) From 1a52b180853f4ca5b05da433d2244f9f7556f0c2 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 22 Nov 2023 12:36:49 -0800 Subject: [PATCH 15/16] minor change --- internal/internal_worker_usage_collector.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 89a6e889b..997fb08a9 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -66,14 +66,12 @@ func newWorkerUsageCollector( func (w *workerUsageCollector) Start() { - if w.emitOncePerHost != nil { - w.emitOncePerHost.Do( - func() { - w.wg.Add(1) - w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) - go w.runHardwareCollector() - }) - } + w.emitOncePerHost.Do( + func() { + w.wg.Add(1) + w.logger.Info(fmt.Sprintf("Going to start hardware collector for workertype: %v", w.workerType)) + go w.runHardwareCollector() + }) } From 7f8a1654588bbba4b514fc1fae6b2e03e05c407a Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 22 Nov 2023 12:38:13 -0800 Subject: [PATCH 16/16] minor change --- internal/internal_worker_usage_collector.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/internal_worker_usage_collector.go b/internal/internal_worker_usage_collector.go index 997fb08a9..357f4b5bc 100644 --- a/internal/internal_worker_usage_collector.go +++ b/internal/internal_worker_usage_collector.go @@ -65,6 +65,7 @@ func newWorkerUsageCollector( } func (w *workerUsageCollector) Start() { + w.logger.Info("Starting worker usage collector", zap.String("workerType", w.workerType)) w.emitOncePerHost.Do( func() {