From 85540e0747c501724c7c58618f7c71e3140dd1c6 Mon Sep 17 00:00:00 2001 From: Fabio Falzoi Date: Tue, 17 Oct 2023 18:17:45 +0200 Subject: [PATCH] sysdump: Add gops trace data Add the ability to scrape trace data from gops. See https://pkg.go.dev/runtime/trace for more information about the Go execution tracer. The overhead derived from enabling the Go execution tracer, while usually low and suitable for production use, might be higher than collecting pprof data, so the option is disabled by default. Signed-off-by: Fabio Falzoi --- internal/cli/cmd/sysdump.go | 3 + sysdump/constants.go | 1 + sysdump/defaults.go | 1 + sysdump/sysdump.go | 153 +++++++++++++++++++++++++----------- 4 files changed, 110 insertions(+), 48 deletions(-) diff --git a/internal/cli/cmd/sysdump.go b/internal/cli/cmd/sysdump.go index 966d047ad9..16c5d2fd22 100644 --- a/internal/cli/cmd/sysdump.go +++ b/internal/cli/cmd/sysdump.go @@ -98,6 +98,9 @@ func initSysdumpFlags(cmd *cobra.Command, options *sysdump.Options, optionPrefix cmd.Flags().BoolVar(&options.Profiling, optionPrefix+"profiling", sysdump.DefaultProfiling, "Whether to enable scraping profiling data") + cmd.Flags().BoolVar(&options.Tracing, + optionPrefix+"tracing", sysdump.DefaultTracing, + "Whether to enable scraping tracing data") cmd.Flags().StringArrayVar(&options.ExtraLabelSelectors, optionPrefix+"extra-label-selectors", nil, "Optional set of labels selectors used to target additional pods for log collection.") diff --git a/sysdump/constants.go b/sysdump/constants.go index 6e6d087c19..ccd69b50e3 100644 --- a/sysdump/constants.go +++ b/sysdump/constants.go @@ -150,6 +150,7 @@ var ( "pprof-heap", "pprof-cpu", } + gopsTrace = "trace" // Gateway API resource group versions used for sysdumping these gatewayClass = schema.GroupVersionResource{ diff --git a/sysdump/defaults.go b/sysdump/defaults.go index 84d2586372..0452cba082 100644 --- a/sysdump/defaults.go +++ b/sysdump/defaults.go @@ -24,6 +24,7 @@ const ( DefaultCiliumSpireServerLabelSelector = "app=spire-server" DefaultDebug = false DefaultProfiling = true + DefaultTracing = false DefaultHubbleLabelSelector = labelPrefix + "hubble" DefaultHubbleFlowsCount = 10000 DefaultHubbleFlowsTimeout = 5 * time.Second diff --git a/sysdump/sysdump.go b/sysdump/sysdump.go index 33a297e20f..279708bcee 100644 --- a/sysdump/sysdump.go +++ b/sysdump/sysdump.go @@ -67,6 +67,8 @@ type Options struct { Debug bool // Whether to enable scraping profiling data. Profiling bool + // Whether to enable scraping tracing data. + Tracing bool // The labels used to target additional pods ExtraLabelSelectors []string // The labels used to target Hubble pods. @@ -1292,6 +1294,21 @@ func (c *Collector) Run() error { if c.Options.CiliumNamespace != "" && c.Options.CiliumOperatorNamespace != "" { tasks = append(tasks, ciliumTasks...) + + serialTasks = append(serialTasks, Task{ + CreatesSubtasks: true, + Description: "Collecting tracing data from Cilium pods", + Quick: false, + Task: func(ctx context.Context) error { + if !c.Options.Tracing { + return nil + } + if err := c.SubmitTracingGopsSubtask(c.CiliumPods, ciliumAgentContainerName); err != nil { + return fmt.Errorf("failed to collect tracing data from Cilium pods: %w", err) + } + return nil + }, + }) } tetragonTasks := []Task{ @@ -1404,6 +1421,54 @@ func (c *Collector) Run() error { tasks = append(tasks, c.getGatewayAPITasks()...) } + // First, run each serial task in its own workerpool. + var r []workerpool.Task + for i, t := range serialTasks { + t := t + if c.shouldSkipTask(t) { + c.logDebug("Skipping %q", t.Description) + continue + } + + var wg sync.WaitGroup + if t.CreatesSubtasks { + wg.Add(1) + } + + // Adjust the worker count to make enough headroom for tasks that submit sub-tasks. + // This is necessary because 'Submit' is blocking. + wc := 1 + if t.CreatesSubtasks { + wc++ + } + c.Pool = workerpool.New(wc) + + // Add the serial task to the worker pool. + if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error { + if t.CreatesSubtasks { + defer wg.Done() + } + c.logTask(t.Description) + defer c.logDebug("Finished %q", t.Description) + return t.Task(ctx) + }); err != nil { + return fmt.Errorf("failed to submit task to the worker pool: %w", err) + } + + // Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish. + wg.Wait() + results, err := c.Pool.Drain() + if err != nil { + return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err) + } + // Close the worker Pool. + if err := c.Pool.Close(); err != nil { + return fmt.Errorf("failed to close the serial tasks worker pool: %w", err) + } + + r = append(r, results...) + } + // Adjust the worker count to make enough headroom for tasks that submit sub-tasks. // This is necessary because 'Submit' is blocking. wc := 1 @@ -1443,62 +1508,16 @@ func (c *Collector) Run() error { // Wait for the all subtasks to be submitted and then call 'Drain' to wait for everything to finish. c.subtasksWg.Wait() - r, err := c.Pool.Drain() + results, err := c.Pool.Drain() if err != nil { return fmt.Errorf("failed to drain the worker pool: %w", err) } + r = append(r, results...) // Close the worker Pool. if err := c.Pool.Close(); err != nil { return fmt.Errorf("failed to close the worker pool: %w", err) } - // Run each serial task in its own workerpool. - for i, t := range serialTasks { - t := t - if c.shouldSkipTask(t) { - c.logDebug("Skipping %q", t.Description) - continue - } - - var wg sync.WaitGroup - if t.CreatesSubtasks { - wg.Add(1) - } - - // Adjust the worker count to make enough headroom for tasks that submit sub-tasks. - // This is necessary because 'Submit' is blocking. - wc := 1 - if t.CreatesSubtasks { - wc++ - } - c.Pool = workerpool.New(wc) - - // Add the serial task to the worker pool. - if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error { - if t.CreatesSubtasks { - defer wg.Done() - } - c.logTask(t.Description) - defer c.logDebug("Finished %q", t.Description) - return t.Task(ctx) - }); err != nil { - return fmt.Errorf("failed to submit task to the worker pool: %w", err) - } - - // Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish. - wg.Wait() - results, err := c.Pool.Drain() - if err != nil { - return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err) - } - // Close the worker Pool. - if err := c.Pool.Close(); err != nil { - return fmt.Errorf("failed to close the serial tasks worker pool: %w", err) - } - - r = append(r, results...) - } - loggedStart := false // Check if any errors occurred and warn the user. for _, res := range r { @@ -2116,6 +2135,44 @@ func (c *Collector) SubmitProfilingGopsSubtasks(pods []*corev1.Pod, containerNam return nil } +// SubmitTracingGopsSubtask submits task to collect tracing data from pods. +func (c *Collector) SubmitTracingGopsSubtask(pods []*corev1.Pod, containerName string) error { + for _, p := range pods { + p := p + if err := c.Pool.Submit(fmt.Sprintf("gops-%s-%s", p.Name, gopsTrace), func(ctx context.Context) error { + agentPID, err := c.getGopsPID(ctx, p, containerName) + if err != nil { + return err + } + o, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{ + gopsCommand, + gopsTrace, + agentPID, + }) + if err != nil { + return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err) + } + filePath, err := extractGopsProfileData(o.String()) + if err != nil { + return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err) + } + f := c.AbsoluteTempPath(fmt.Sprintf("%s-%s-.trace", p.Name, gopsTrace)) + err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, filePath, f, c.Options.CopyRetryLimit) + if err != nil { + return fmt.Errorf("failed to collect gops trace output for %q: %w", p.Name, err) + } + if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, filePath}); err != nil { + c.logWarn("failed to delete trace output from pod %q in namespace %q: %w", p.Name, p.Namespace, err) + return nil + } + return nil + }); err != nil { + return fmt.Errorf("failed to submit %s gops task for %q: %w", gopsTrace, p.Name, err) + } + } + return nil +} + // SubmitLogsTasks submits tasks to collect kubernetes logs from pods. func (c *Collector) SubmitLogsTasks(pods []*corev1.Pod, since time.Duration, limitBytes int64) error { t := time.Now().Add(-since)