Skip to content

Commit

Permalink
Move hardware emit to separate component.
Browse files Browse the repository at this point in the history
  • Loading branch information
timl3136 committed Nov 14, 2023
1 parent a16cf09 commit 8973aec
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 70 deletions.
103 changes: 33 additions & 70 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ import (
"errors"
"fmt"
"os"
"runtime"
"sync"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -140,10 +138,11 @@ type (
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
workerUsageCollector *workerUsageCollector
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
}

polledTask struct {
Expand Down Expand Up @@ -173,17 +172,29 @@ func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope t
logger,
)
}
// for now it's default to be enabled
var workerUC *workerUsageCollector
workerUC = newWorkerUsageCollector(
workerUsageCollectorOptions{
Enabled: true,
Cooldown: 30 * time.Second,
Host: options.host,
MetricsScope: metricsScope,
},
logger,
)

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
workerUsageCollector: workerUC,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.

limiterContext: ctx,
limiterContextCancel: cancel,
Expand All @@ -207,6 +218,10 @@ func (bw *baseWorker) Start() {
bw.pollerAutoScaler.Start()
}

if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Start()
}

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
go bw.runPoller()
Expand All @@ -215,11 +230,6 @@ func (bw *baseWorker) Start() {
bw.shutdownWG.Add(1)
go bw.runTaskDispatcher()

// We want the emit function run once per host instead of run once per worker
// since the emit function is host level metric.
bw.shutdownWG.Add(1)
go bw.emitHardwareUsage()

bw.isWorkerStarted = true
traceLog(func() {
bw.logger.Info("Started Worker",
Expand Down Expand Up @@ -401,6 +411,9 @@ func (bw *baseWorker) Stop() {
if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Stop()
}
if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Stop()
}

if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
Expand All @@ -414,53 +427,3 @@ func (bw *baseWorker) Stop() {
}
return
}

func (bw *baseWorker) emitHardwareUsage() {
defer func() {
if p := recover(); p != nil {
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
st := getStackTraceRaw(topLine, 7, 0)
bw.logger.Error("Unhandled panic in hardware emitting.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer bw.shutdownWG.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(hardwareMetricsCollectInterval)
for {
select {
case <-bw.shutdownCh:
ticker.Stop()
return
case <-ticker.C:
host := bw.options.host
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})

cpuPercent, err := cpu.Percent(0, false)
if err != nil {
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
return
}
cpuCores, err := cpu.Counts(false)
if err != nil {
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
return
}
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
}
}
})

}
124 changes: 124 additions & 0 deletions internal/internal_worker_usage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package internal

import (
"context"
"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/cadence/internal/common/metrics"
"go.uber.org/zap"
"runtime"
"sync"
"time"
)

type (
workerUsageCollector struct {
cooldownTime time.Duration
logger *zap.Logger
ctx context.Context
wg *sync.WaitGroup // graceful stop
cancel context.CancelFunc
metricsScope tally.Scope
host string
}

workerUsageCollectorOptions struct {
Enabled bool
Cooldown time.Duration
Host string
MetricsScope tally.Scope
}

hardwareUsage struct {
NumCPUCores int
CPUPercent float64
NumGoRoutines int
TotalMemory float64
MemoryUsedHeap float64
MemoryUsedStack float64
}
)

func newWorkerUsageCollector(
options workerUsageCollectorOptions,
logger *zap.Logger,
) *workerUsageCollector {
if !options.Enabled {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
return &workerUsageCollector{
cooldownTime: options.Cooldown,
host: options.Host,
metricsScope: options.MetricsScope,
logger: logger,
ctx: ctx,
cancel: cancel,
wg: &sync.WaitGroup{},
}
}

func (w *workerUsageCollector) Start() {
w.wg.Add(1)
go func() {
defer func() {
if p := recover(); p != nil {
w.logger.Error("Unhandled panic in workerUsageCollector.")
}
}()
defer w.wg.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(w.cooldownTime)
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
hardwareUsageData := w.collectHardwareUsage()
w.emitHardwareUsage(hardwareUsageData)

}
}
})
}()
return
}

func (w *workerUsageCollector) Stop() {
w.cancel()
w.wg.Wait()
}

func (w *workerUsageCollector) collectHardwareUsage() hardwareUsage {
cpuPercent, err := cpu.Percent(0, false)
if err != nil {
w.logger.Warn("Failed to get cpu percent", zap.Error(err))
}
cpuCores, err := cpu.Counts(false)
if err != nil {
w.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
}

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
return hardwareUsage{
NumCPUCores: cpuCores,
CPUPercent: cpuPercent[0],
NumGoRoutines: runtime.NumGoroutine(),
TotalMemory: float64(memStats.Sys),
MemoryUsedHeap: float64(memStats.HeapAlloc),
MemoryUsedStack: float64(memStats.StackInuse),
}
}

// emitHardwareUsage emits collected hardware usage metrics to metrics scope
func (w *workerUsageCollector) emitHardwareUsage(usage hardwareUsage) {
scope := w.metricsScope.Tagged(map[string]string{clientHostTag: w.host})
scope.Gauge(metrics.NumCPUCores).Update(float64(usage.NumCPUCores))
scope.Gauge(metrics.CPUPercentage).Update(usage.CPUPercent)
scope.Gauge(metrics.NumGoRoutines).Update(float64(usage.NumGoRoutines))
scope.Gauge(metrics.TotalMemory).Update(float64(usage.TotalMemory))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(usage.MemoryUsedHeap))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(usage.MemoryUsedStack))
}

0 comments on commit 8973aec

Please sign in to comment.