Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create separate worker usage data collection and move hardware emit there #1293

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/common/metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,6 @@ const (
ServerSideHistorySize = CadenceMetricsPrefix + "server-side-history-size"
ConcurrentTaskQuota = CadenceMetricsPrefix + "concurrent-task-quota"
PollerRequestBufferUsage = CadenceMetricsPrefix + "poller-request-buffer-usage"

WorkerUsageCollectorPanic = CadenceMetricsPrefix + "worker-metrics-collector-panic"
)
9 changes: 6 additions & 3 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func newWorkflowTaskWorkerInternal(
taskWorker: poller,
identity: params.Identity,
workerType: "DecisionWorker",
shutdownTimeout: params.WorkerStopTimeout},
shutdownTimeout: params.WorkerStopTimeout,
sync: &params.Sync},
params.Logger,
params.MetricsScope,
nil,
Expand All @@ -304,7 +305,8 @@ func newWorkflowTaskWorkerInternal(
taskWorker: localActivityTaskPoller,
identity: params.Identity,
workerType: "LocalActivityWorker",
shutdownTimeout: params.WorkerStopTimeout},
shutdownTimeout: params.WorkerStopTimeout,
sync: &params.Sync},
params.Logger,
params.MetricsScope,
nil,
Expand Down Expand Up @@ -482,7 +484,8 @@ func newActivityTaskWorker(
identity: workerParams.Identity,
workerType: workerType,
shutdownTimeout: workerParams.WorkerStopTimeout,
userContextCancel: workerParams.UserContextCancel},
userContextCancel: workerParams.UserContextCancel,
sync: &workerParams.Sync},
workerParams.Logger,
workerParams.MetricsScope,
sessionTokenBucket,
Expand Down
118 changes: 47 additions & 71 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
"errors"
"fmt"
"os"
"runtime"
"sync"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -57,7 +55,7 @@

var errShutdown = errors.New("worker shutting down")

var collectHardwareUsageOnce sync.Once
var emitOnce sync.Once

type (
// resultHandler that returns result
Expand Down Expand Up @@ -127,6 +125,7 @@
shutdownTimeout time.Duration
userContextCancel context.CancelFunc
host string
sync *oncePerHost
}

// baseWorker that wraps worker activities.
Expand All @@ -143,15 +142,20 @@
logger *zap.Logger
metricsScope tally.Scope

pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
pollerRequestCh chan struct{}
pollerAutoScaler *pollerAutoScaler
workerUsageCollector *workerUsageCollector
taskQueueCh chan interface{}
sessionTokenBucket *sessionTokenBucket
}

polledTask struct {
task interface{}
}

oncePerHost interface {
Do(func())
}
)

func createPollRetryPolicy() backoff.RetryPolicy {
Expand All @@ -177,16 +181,36 @@
)
}

var once oncePerHost
if options.sync == nil {
once = &emitOnce

Check warning on line 186 in internal/internal_worker_base.go

View check run for this annotation

Codecov / codecov/patch

internal/internal_worker_base.go#L186

Added line #L186 was not covered by tests
} else {
once = *options.sync
}

// for now it's default to be enabled
workerUC := newWorkerUsageCollector(
workerUsageCollectorOptions{
Enabled: true,
Cooldown: 30 * time.Second,
MetricsScope: metricsScope,
WorkerType: options.workerType,
EmitOnce: once,
},
logger,
)

bw := &baseWorker{
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.
options: options,
shutdownCh: make(chan struct{}),
taskLimiter: rate.NewLimiter(rate.Limit(options.maxTaskPerSecond), 1),
retrier: backoff.NewConcurrentRetrier(pollOperationRetryPolicy),
logger: logger.With(zapcore.Field{Key: tagWorkerType, Type: zapcore.StringType, String: options.workerType}),
metricsScope: tagScope(metricsScope, tagWorkerType, options.workerType),
pollerRequestCh: make(chan struct{}, options.maxConcurrentTask),
pollerAutoScaler: pollerAS,
workerUsageCollector: workerUC,
taskQueueCh: make(chan interface{}), // no buffer, so poller only able to poll new task after previous is dispatched.

limiterContext: ctx,
limiterContextCancel: cancel,
Expand All @@ -210,6 +234,10 @@
bw.pollerAutoScaler.Start()
}

if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Start()
}

for i := 0; i < bw.options.pollerCount; i++ {
bw.shutdownWG.Add(1)
go bw.runPoller()
Expand All @@ -218,11 +246,6 @@
bw.shutdownWG.Add(1)
go bw.runTaskDispatcher()

// We want the emit function run once per host instead of run once per worker
// since the emit function is host level metric.
bw.shutdownWG.Add(1)
go bw.emitHardwareUsage()

bw.isWorkerStarted = true
traceLog(func() {
bw.logger.Info("Started Worker",
Expand Down Expand Up @@ -406,6 +429,9 @@
if bw.pollerAutoScaler != nil {
bw.pollerAutoScaler.Stop()
}
if bw.workerUsageCollector != nil {
bw.workerUsageCollector.Stop()
}

if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
traceLog(func() {
Expand All @@ -419,53 +445,3 @@
}
return
}

func (bw *baseWorker) emitHardwareUsage() {
defer func() {
if p := recover(); p != nil {
bw.metricsScope.Counter(metrics.WorkerPanicCounter).Inc(1)
topLine := fmt.Sprintf("base worker for %s [panic]:", bw.options.workerType)
st := getStackTraceRaw(topLine, 7, 0)
bw.logger.Error("Unhandled panic in hardware emitting.",
zap.String(tagPanicError, fmt.Sprintf("%v", p)),
zap.String(tagPanicStack, st))
}
}()
defer bw.shutdownWG.Done()
collectHardwareUsageOnce.Do(
func() {
ticker := time.NewTicker(hardwareMetricsCollectInterval)
for {
select {
case <-bw.shutdownCh:
ticker.Stop()
return
case <-ticker.C:
host := bw.options.host
scope := bw.metricsScope.Tagged(map[string]string{clientHostTag: host})

cpuPercent, err := cpu.Percent(0, false)
if err != nil {
bw.logger.Warn("Failed to get cpu percent", zap.Error(err))
return
}
cpuCores, err := cpu.Counts(false)
if err != nil {
bw.logger.Warn("Failed to get number of cpu cores", zap.Error(err))
return
}
scope.Gauge(metrics.NumCPUCores).Update(float64(cpuCores))
scope.Gauge(metrics.CPUPercentage).Update(cpuPercent[0])

var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)

scope.Gauge(metrics.NumGoRoutines).Update(float64(runtime.NumGoroutine()))
scope.Gauge(metrics.TotalMemory).Update(float64(memStats.Sys))
scope.Gauge(metrics.MemoryUsedHeap).Update(float64(memStats.HeapInuse))
scope.Gauge(metrics.MemoryUsedStack).Update(float64(memStats.StackInuse))
}
}
})

}
18 changes: 15 additions & 3 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,19 @@ type (
mockCtrl *gomock.Controller
service *workflowservicetest.MockClient
}

// fakeSyncOnce is a fake implementation of oncePerHost interface
// that DOES NOT ensure run only once per host
fakeSyncOnce struct {
}
)

var fakeSyncOnceValue fakeSyncOnce

func (m *fakeSyncOnce) Do(f func()) {
f()
}

func helloWorldWorkflowFunc(ctx Context, input []byte) error {
queryResult := startingQueryValue
SetQueryHandler(ctx, queryType, func() (string, error) {
Expand Down Expand Up @@ -179,12 +190,12 @@ func (s *InterfacesTestSuite) TestInterface() {
domain := "testDomain"
// Workflow execution parameters.
workflowExecutionParameters := workerExecutionParameters{
TaskList: "testTaskList",
WorkerOptions: WorkerOptions{
MaxConcurrentActivityTaskPollers: 4,
MaxConcurrentDecisionTaskPollers: 4,
Logger: zaptest.NewLogger(s.T()),
Tracer: opentracing.NoopTracer{}},
Tracer: opentracing.NoopTracer{},
Sync: &fakeSyncOnce{}},
}

domainStatus := m.DomainStatusRegistered
Expand Down Expand Up @@ -216,7 +227,8 @@ func (s *InterfacesTestSuite) TestInterface() {
MaxConcurrentActivityTaskPollers: 10,
MaxConcurrentDecisionTaskPollers: 10,
Logger: zaptest.NewLogger(s.T()),
Tracer: opentracing.NoopTracer{}},
Tracer: opentracing.NoopTracer{},
Sync: &fakeSyncOnce{}},
}

// Register activity instances and launch the worker.
Expand Down
10 changes: 6 additions & 4 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func createShadowWorker(
return createWorkerWithThrottle(t, service, 0, WorkerOptions{
EnableShadowWorker: true,
ShadowOptions: *shadowOptions,
Sync: &fakeSyncOnce{},
})
}

Expand Down Expand Up @@ -409,6 +410,7 @@ func createWorkerWithThrottle(
workerOptions.TaskListActivitiesPerSecond = activitiesPerSecond
workerOptions.Logger = zaptest.NewLogger(t)
workerOptions.EnableSessionWorker = true
workerOptions.Sync = &fakeSyncOnce{}

// Start Worker.
worker := NewWorker(
Expand All @@ -423,14 +425,14 @@ func createWorkerWithDataConverter(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter(), Sync: &fakeSyncOnce{}})
}

func createWorkerWithAutoscaler(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}, Sync: &fakeSyncOnce{}})
}

func createWorkerWithStrictNonDeterminismDisabled(
Expand All @@ -444,7 +446,7 @@ func createWorkerWithHost(
t *testing.T,
service *workflowservicetest.MockClient,
) *aggregatedWorker {
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host"})
return createWorkerWithThrottle(t, service, 0, WorkerOptions{Host: "test_host", Sync: &fakeSyncOnce{}})
}

func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {
Expand Down Expand Up @@ -1031,7 +1033,7 @@ func TestActivityNilArgs(t *testing.T) {
func TestWorkerOptionDefaults(t *testing.T) {
domain := "worker-options-test"
taskList := "worker-options-tl"
aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
aggWorker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{Sync: &fakeSyncOnce{}})
decisionWorker := aggWorker.workflowWorker
require.True(t, decisionWorker.executionParameters.Identity != "")
require.NotNil(t, decisionWorker.executionParameters.Logger)
Expand Down
Loading
Loading