diff --git a/internal/cli/cmd/sysdump.go b/internal/cli/cmd/sysdump.go index 966d047ad9..16c5d2fd22 100644 --- a/internal/cli/cmd/sysdump.go +++ b/internal/cli/cmd/sysdump.go @@ -98,6 +98,9 @@ func initSysdumpFlags(cmd *cobra.Command, options *sysdump.Options, optionPrefix cmd.Flags().BoolVar(&options.Profiling, optionPrefix+"profiling", sysdump.DefaultProfiling, "Whether to enable scraping profiling data") + cmd.Flags().BoolVar(&options.Tracing, + optionPrefix+"tracing", sysdump.DefaultTracing, + "Whether to enable scraping tracing data") cmd.Flags().StringArrayVar(&options.ExtraLabelSelectors, optionPrefix+"extra-label-selectors", nil, "Optional set of labels selectors used to target additional pods for log collection.") diff --git a/sysdump/constants.go b/sysdump/constants.go index 6e6d087c19..ccd69b50e3 100644 --- a/sysdump/constants.go +++ b/sysdump/constants.go @@ -150,6 +150,7 @@ var ( "pprof-heap", "pprof-cpu", } + gopsTrace = "trace" // Gateway API resource group versions used for sysdumping these gatewayClass = schema.GroupVersionResource{ diff --git a/sysdump/defaults.go b/sysdump/defaults.go index 84d2586372..0452cba082 100644 --- a/sysdump/defaults.go +++ b/sysdump/defaults.go @@ -24,6 +24,7 @@ const ( DefaultCiliumSpireServerLabelSelector = "app=spire-server" DefaultDebug = false DefaultProfiling = true + DefaultTracing = false DefaultHubbleLabelSelector = labelPrefix + "hubble" DefaultHubbleFlowsCount = 10000 DefaultHubbleFlowsTimeout = 5 * time.Second diff --git a/sysdump/sysdump.go b/sysdump/sysdump.go index 9fb57bdcf9..28aab21b46 100644 --- a/sysdump/sysdump.go +++ b/sysdump/sysdump.go @@ -66,6 +66,8 @@ type Options struct { Debug bool // Whether to enable scraping profiling data. Profiling bool + // Whether to enable scraping tracing data. + Tracing bool // The labels used to target additional pods ExtraLabelSelectors []string // The labels used to target Hubble pods. @@ -489,6 +491,9 @@ func (c *Collector) Run() error { }, } + // task that needs to be executed "serially" (i.e: not concurrently with other tasks). + var serialTasks []Task + ciliumTasks := []Task{ { Description: "Collecting Cilium network policies", @@ -1260,6 +1265,21 @@ func (c *Collector) Run() error { if c.Options.CiliumNamespace != "" && c.Options.CiliumOperatorNamespace != "" { tasks = append(tasks, ciliumTasks...) + + serialTasks = append(serialTasks, Task{ + CreatesSubtasks: true, + Description: "Collecting tracing data from Cilium pods", + Quick: false, + Task: func(ctx context.Context) error { + if !c.Options.Tracing { + return nil + } + if err := c.SubmitTracingGopsSubtask(c.CiliumPods, ciliumAgentContainerName); err != nil { + return fmt.Errorf("failed to collect tracing data from Cilium pods: %w", err) + } + return nil + }, + }) } tetragonTasks := []Task{ @@ -1375,6 +1395,54 @@ func (c *Collector) Run() error { tasks = append(tasks, c.getEnvoyConfigTasks()...) } + // First, run each serial task in its own workerpool. + var r []workerpool.Task + for i, t := range serialTasks { + t := t + if c.shouldSkipTask(t) { + c.logDebug("Skipping %q", t.Description) + continue + } + + var wg sync.WaitGroup + if t.CreatesSubtasks { + wg.Add(1) + } + + // Adjust the worker count to make enough headroom for tasks that submit sub-tasks. + // This is necessary because 'Submit' is blocking. + wc := 1 + if t.CreatesSubtasks { + wc++ + } + c.Pool = workerpool.New(wc) + + // Add the serial task to the worker pool. + if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error { + if t.CreatesSubtasks { + defer wg.Done() + } + c.logTask(t.Description) + defer c.logDebug("Finished %q", t.Description) + return t.Task(ctx) + }); err != nil { + return fmt.Errorf("failed to submit task to the worker pool: %w", err) + } + + // Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish. + wg.Wait() + results, err := c.Pool.Drain() + if err != nil { + return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err) + } + // Close the worker Pool. + if err := c.Pool.Close(); err != nil { + return fmt.Errorf("failed to close the serial tasks worker pool: %w", err) + } + + r = append(r, results...) + } + // Adjust the worker count to make enough headroom for tasks that submit sub-tasks. // This is necessary because 'Submit' is blocking. wc := 1 @@ -1414,10 +1482,11 @@ func (c *Collector) Run() error { // Wait for the all subtasks to be submitted and then call 'Drain' to wait for everything to finish. c.subtasksWg.Wait() - r, err := c.Pool.Drain() + results, err := c.Pool.Drain() if err != nil { return fmt.Errorf("failed to drain the worker pool: %w", err) } + r = append(r, results...) // Close the worker Pool. if err := c.Pool.Close(); err != nil { return fmt.Errorf("failed to close the worker pool: %w", err) @@ -2073,6 +2142,44 @@ func (c *Collector) SubmitProfilingGopsSubtasks(pods []*corev1.Pod, containerNam return nil } +// SubmitTracingGopsSubtask submits task to collect tracing data from pods. +func (c *Collector) SubmitTracingGopsSubtask(pods []*corev1.Pod, containerName string) error { + for _, p := range pods { + p := p + if err := c.Pool.Submit(fmt.Sprintf("gops-%s-%s", p.Name, gopsTrace), func(ctx context.Context) error { + agentPID, err := c.getGopsPID(ctx, p, containerName) + if err != nil { + return err + } + o, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{ + gopsCommand, + gopsTrace, + agentPID, + }) + if err != nil { + return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err) + } + filePath, err := extractGopsProfileData(o.String()) + if err != nil { + return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err) + } + f := c.AbsoluteTempPath(fmt.Sprintf("%s-%s-<ts>.trace", p.Name, gopsTrace)) + err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, filePath, f, c.Options.CopyRetryLimit) + if err != nil { + return fmt.Errorf("failed to collect gops trace output for %q: %w", p.Name, err) + } + if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, filePath}); err != nil { + c.logWarn("failed to delete trace output from pod %q in namespace %q: %w", p.Name, p.Namespace, err) + return nil + } + return nil + }); err != nil { + return fmt.Errorf("failed to submit %s gops task for %q: %w", gopsTrace, p.Name, err) + } + } + return nil +} + // SubmitLogsTasks submits tasks to collect kubernetes logs from pods. func (c *Collector) SubmitLogsTasks(pods []*corev1.Pod, since time.Duration, limitBytes int64) error { t := time.Now().Add(-since)