Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate metric storages #549

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/hook/controller/kubernetes_bindings_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (c *kubernetesBindingsController) UnlockEvents() {
// UnlockEventsFor turns on eventCb for matched monitor to emit events after Synchronization.
func (c *kubernetesBindingsController) UnlockEventsFor(monitorID string) {
m := c.kubeEventsManager.GetMonitor(monitorID)
if m == nil {
log.Warnf("monitor %q was not found", monitorID)
return
}
m.EnableKubeEventCb()
}

Expand Down
25 changes: 12 additions & 13 deletions pkg/shell-operator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func Init() (*ShellOperator, error) {
return nil, err
}

err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort)
err = op.AssembleCommonOperator(app.ListenAddress, app.ListenPort, map[string]string{
"hook": "",
"binding": "",
"queue": "",
})
if err != nil {
log.Errorf("Fatal: %s", err)
return nil, err
Expand All @@ -68,19 +72,14 @@ func Init() (*ShellOperator, error) {
// AssembleCommonOperator instantiate common dependencies. These dependencies
// may be used for shell-operator derivatives, like addon-operator.
// requires listenAddress, listenPort to run http server for operator APIs
func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string) (err error) {
func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string, kubeEventsManagerLabels map[string]string) (err error) {
op.APIServer = newBaseHTTPServer(listenAddress, listenPort)

op.MetricStorage = defaultMetricStorage(op.ctx)
// built-in metrics
op.setupMetricStorage(kubeEventsManagerLabels)

// metrics from user's hooks
op.setupHookMetricStorage()
if err != nil {
return fmt.Errorf("start HTTP server for hook metrics: %s", err)
}
// Set to common metric storage if separate port is not set.
if op.HookMetricStorage == nil {
op.HookMetricStorage = op.MetricStorage
}

// 'main' Kubernetes client.
op.KubeClient, err = initDefaultMainKubeClient(op.MetricStorage)
Expand Down Expand Up @@ -112,14 +111,14 @@ func (op *ShellOperator) AssembleCommonOperator(listenAddress, listenPort string
// - kubernetes events manager
// - schedule manager
func (op *ShellOperator) assembleShellOperator(hooksDir string, tempDir string, debugServer *debug.Server, runtimeConfig *config.Config) (err error) {
registerDefaultRoutes(op)
registerRootRoute(op)
// for shell-operator only
registerHookMetrics(op.HookMetricStorage)

op.RegisterDebugQueueRoutes(debugServer)
op.RegisterDebugHookRoutes(debugServer)
op.RegisterDebugConfigRoutes(debugServer, runtimeConfig)

registerShellOperatorMetrics(op.MetricStorage)

// Create webhookManagers with dependencies.
op.setupHookManagers(hooksDir, tempDir)

Expand Down
12 changes: 3 additions & 9 deletions pkg/shell-operator/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (bhs *baseHTTPServer) Start(ctx context.Context) {
srv := &http.Server{
Addr: bhs.address + ":" + bhs.port,
Handler: bhs.router,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
ReadTimeout: 90 * time.Second,
WriteTimeout: 90 * time.Second,
}

go func() {
Expand Down Expand Up @@ -109,7 +109,7 @@ func newBaseHTTPServer(address, port string) *baseHTTPServer {
return srv
}

func registerDefaultRoutes(op *ShellOperator) {
func registerRootRoute(op *ShellOperator) {
op.APIServer.RegisterRoute(http.MethodGet, "/", func(writer http.ResponseWriter, request *http.Request) {
_, _ = fmt.Fprintf(writer, `<html>
<head><title>Shell operator</title></head>
Expand All @@ -125,10 +125,4 @@ func registerDefaultRoutes(op *ShellOperator) {
</body>
</html>`, app.ListenPort)
})

op.APIServer.RegisterRoute(http.MethodGet, "/metrics", func(writer http.ResponseWriter, request *http.Request) {
if op.MetricStorage != nil {
op.MetricStorage.Handler().ServeHTTP(writer, request)
}
})
}
65 changes: 64 additions & 1 deletion pkg/shell-operator/metrics_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,68 @@ func (op *ShellOperator) setupHookMetricStorage() {
op.APIServer.RegisterRoute(http.MethodGet, "/metrics/hooks", metricStorage.Handler().ServeHTTP)
// create new metric storage for hooks
// register scrape handler
op.MetricStorage = metricStorage
op.HookMetricStorage = metricStorage
}

// specific metrics for shell-operator HookManager
func registerHookMetrics(metricStorage *metric_storage.MetricStorage) {
// Metrics for enable kubernetes bindings.
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_seconds", map[string]string{"hook": ""})
metricStorage.RegisterCounter("{PREFIX}hook_enable_kubernetes_bindings_errors_total", map[string]string{"hook": ""})
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_success", map[string]string{"hook": ""})

// Metrics for hook executions.
labels := map[string]string{
"hook": "",
"binding": "",
"queue": "",
}
// Duration of hook execution.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)

// System CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_user_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// User CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_sys_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// Max RSS in bytes.
metricStorage.RegisterGauge("{PREFIX}hook_run_max_rss_bytes", labels)

metricStorage.RegisterCounter("{PREFIX}hook_run_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_allowed_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_success_total", labels)
// hook_run task waiting time
metricStorage.RegisterCounter("{PREFIX}task_wait_in_queue_seconds_total", labels)
}
102 changes: 18 additions & 84 deletions pkg/shell-operator/metrics_operator.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,35 @@
package shell_operator

import (
"context"
"net/http"

"github.com/flant/shell-operator/pkg/app"
"github.com/flant/shell-operator/pkg/metric_storage"
)

func defaultMetricStorage(ctx context.Context) *metric_storage.MetricStorage {
metricStorage := metric_storage.NewMetricStorage(ctx, app.PrometheusMetricsPrefix, false)
return metricStorage
}
// setupMetricStorage creates and initializes metrics storage for built-in operator metrics
func (op *ShellOperator) setupMetricStorage(kubeEventsManagerLabels map[string]string) {
metricStorage := metric_storage.NewMetricStorage(op.ctx, app.PrometheusMetricsPrefix, false)

registerCommonMetrics(metricStorage)
registerTaskQueueMetrics(metricStorage)
registerKubeEventsManagerMetrics(metricStorage, kubeEventsManagerLabels)

// registerShellOperatorMetrics register all metrics needed for the ShellOperator.
func registerShellOperatorMetrics(metricStorage *metric_storage.MetricStorage) {
RegisterCommonMetrics(metricStorage)
RegisterTaskQueueMetrics(metricStorage)
RegisterKubeEventsManagerMetrics(metricStorage, map[string]string{
"hook": "",
"binding": "",
"queue": "",
})
registerHookMetrics(metricStorage)
op.APIServer.RegisterRoute(http.MethodGet, "/metrics", metricStorage.Handler().ServeHTTP)
// create new metric storage for hooks
// register scrape handler
op.MetricStorage = metricStorage
}

// RegisterCommonMetrics register base metric
// registerCommonMetrics register base metric
// This function is used in the addon-operator
func RegisterCommonMetrics(metricStorage *metric_storage.MetricStorage) {
func registerCommonMetrics(metricStorage *metric_storage.MetricStorage) {
metricStorage.RegisterCounter("{PREFIX}live_ticks", map[string]string{})
}

// RegisterTaskQueueMetrics
// registerTaskQueueMetrics
// This function is used in the addon-operator
func RegisterTaskQueueMetrics(metricStorage *metric_storage.MetricStorage) {
func registerTaskQueueMetrics(metricStorage *metric_storage.MetricStorage) {
metricStorage.RegisterHistogram(
"{PREFIX}tasks_queue_action_duration_seconds",
map[string]string{
Expand All @@ -51,9 +48,9 @@ func RegisterTaskQueueMetrics(metricStorage *metric_storage.MetricStorage) {
metricStorage.RegisterGauge("{PREFIX}tasks_queue_length", map[string]string{"queue": ""})
}

// RegisterKubeEventsManagerMetrics registers metrics for kube_event_manager
// registerKubeEventsManagerMetrics registers metrics for kube_event_manager
// This function is used in the addon-operator
func RegisterKubeEventsManagerMetrics(metricStorage *metric_storage.MetricStorage, labels map[string]string) {
func registerKubeEventsManagerMetrics(metricStorage *metric_storage.MetricStorage, labels map[string]string) {
// Count of objects in snapshot for one kubernets bindings.
metricStorage.RegisterGauge("{PREFIX}kube_snapshot_objects", labels)
// Duration of jqFilter applying.
Expand Down Expand Up @@ -84,66 +81,3 @@ func RegisterKubeEventsManagerMetrics(metricStorage *metric_storage.MetricStorag
// Count of watch errors.
metricStorage.RegisterCounter("{PREFIX}kubernetes_client_watch_errors_total", map[string]string{"error_type": ""})
}

// Shell-operator specific metrics for HookManager
func registerHookMetrics(metricStorage *metric_storage.MetricStorage) {
// Metrics for enable kubernetes bindings.
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_seconds", map[string]string{"hook": ""})
metricStorage.RegisterCounter("{PREFIX}hook_enable_kubernetes_bindings_errors_total", map[string]string{"hook": ""})
metricStorage.RegisterGauge("{PREFIX}hook_enable_kubernetes_bindings_success", map[string]string{"hook": ""})

// Metrics for hook executions.
labels := map[string]string{
"hook": "",
"binding": "",
"queue": "",
}
// Duration of hook execution.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)

// System CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_user_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// User CPU usage.
metricStorage.RegisterHistogram(
"{PREFIX}hook_run_sys_cpu_seconds",
labels,
[]float64{
0.0,
0.02, 0.05, // 20,50 milliseconds
0.1, 0.2, 0.5, // 100,200,500 milliseconds
1, 2, 5, // 1,2,5 seconds
10, 20, 50, // 10,20,50 seconds
100, 200, 500, // 100,200,500 seconds
},
)
// Max RSS in bytes.
metricStorage.RegisterGauge("{PREFIX}hook_run_max_rss_bytes", labels)

metricStorage.RegisterCounter("{PREFIX}hook_run_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_allowed_errors_total", labels)
metricStorage.RegisterCounter("{PREFIX}hook_run_success_total", labels)
// hook_run task waiting time
metricStorage.RegisterCounter("{PREFIX}task_wait_in_queue_seconds_total", labels)
}
3 changes: 2 additions & 1 deletion pkg/shell-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type ShellOperator struct {
// APIServer common http server for liveness and metrics endpoints
APIServer *baseHTTPServer

// MetricStorage collects and store metrics for built-in operator primitives, hook execution
MetricStorage *metric_storage.MetricStorage
// separate metric storage for hook metrics if separate listen port is configured
// HookMetricStorage separate metric storage for metrics, which are returned by user hooks
HookMetricStorage *metric_storage.MetricStorage
KubeClient *klient.Client
ObjectPatcher *object_patch.ObjectPatcher
Expand Down
Loading