Skip to content

Commit

Permalink
sysdump: Add gops trace data
Browse files Browse the repository at this point in the history
Add the ability to scrape trace data from gops.  See
https://pkg.go.dev/runtime/trace for more information about the Go
execution tracer.

The overhead derived from enabling the Go execution tracer, while
usually low and suitable for production use, might be higher than
collecting pprof data, so the option is disabled by default.

Signed-off-by: Fabio Falzoi <[email protected]>
  • Loading branch information
pippolo84 committed Oct 23, 2023
1 parent aebfffc commit 85540e0
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 48 deletions.
3 changes: 3 additions & 0 deletions internal/cli/cmd/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func initSysdumpFlags(cmd *cobra.Command, options *sysdump.Options, optionPrefix
cmd.Flags().BoolVar(&options.Profiling,
optionPrefix+"profiling", sysdump.DefaultProfiling,
"Whether to enable scraping profiling data")
cmd.Flags().BoolVar(&options.Tracing,
optionPrefix+"tracing", sysdump.DefaultTracing,
"Whether to enable scraping tracing data")
cmd.Flags().StringArrayVar(&options.ExtraLabelSelectors,
optionPrefix+"extra-label-selectors", nil,
"Optional set of labels selectors used to target additional pods for log collection.")
Expand Down
1 change: 1 addition & 0 deletions sysdump/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ var (
"pprof-heap",
"pprof-cpu",
}
gopsTrace = "trace"

// Gateway API resource group versions used for sysdumping these
gatewayClass = schema.GroupVersionResource{
Expand Down
1 change: 1 addition & 0 deletions sysdump/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
DefaultCiliumSpireServerLabelSelector = "app=spire-server"
DefaultDebug = false
DefaultProfiling = true
DefaultTracing = false
DefaultHubbleLabelSelector = labelPrefix + "hubble"
DefaultHubbleFlowsCount = 10000
DefaultHubbleFlowsTimeout = 5 * time.Second
Expand Down
153 changes: 105 additions & 48 deletions sysdump/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Options struct {
Debug bool
// Whether to enable scraping profiling data.
Profiling bool
// Whether to enable scraping tracing data.
Tracing bool
// The labels used to target additional pods
ExtraLabelSelectors []string
// The labels used to target Hubble pods.
Expand Down Expand Up @@ -1292,6 +1294,21 @@ func (c *Collector) Run() error {

if c.Options.CiliumNamespace != "" && c.Options.CiliumOperatorNamespace != "" {
tasks = append(tasks, ciliumTasks...)

serialTasks = append(serialTasks, Task{
CreatesSubtasks: true,
Description: "Collecting tracing data from Cilium pods",
Quick: false,
Task: func(ctx context.Context) error {
if !c.Options.Tracing {
return nil
}
if err := c.SubmitTracingGopsSubtask(c.CiliumPods, ciliumAgentContainerName); err != nil {
return fmt.Errorf("failed to collect tracing data from Cilium pods: %w", err)
}
return nil
},
})
}

tetragonTasks := []Task{
Expand Down Expand Up @@ -1404,6 +1421,54 @@ func (c *Collector) Run() error {
tasks = append(tasks, c.getGatewayAPITasks()...)
}

// First, run each serial task in its own workerpool.
var r []workerpool.Task
for i, t := range serialTasks {
t := t
if c.shouldSkipTask(t) {
c.logDebug("Skipping %q", t.Description)
continue
}

var wg sync.WaitGroup
if t.CreatesSubtasks {
wg.Add(1)
}

// Adjust the worker count to make enough headroom for tasks that submit sub-tasks.
// This is necessary because 'Submit' is blocking.
wc := 1
if t.CreatesSubtasks {
wc++
}
c.Pool = workerpool.New(wc)

// Add the serial task to the worker pool.
if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error {
if t.CreatesSubtasks {
defer wg.Done()
}
c.logTask(t.Description)
defer c.logDebug("Finished %q", t.Description)
return t.Task(ctx)
}); err != nil {
return fmt.Errorf("failed to submit task to the worker pool: %w", err)
}

// Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish.
wg.Wait()
results, err := c.Pool.Drain()
if err != nil {
return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err)
}
// Close the worker Pool.
if err := c.Pool.Close(); err != nil {
return fmt.Errorf("failed to close the serial tasks worker pool: %w", err)
}

r = append(r, results...)
}

// Adjust the worker count to make enough headroom for tasks that submit sub-tasks.
// This is necessary because 'Submit' is blocking.
wc := 1
Expand Down Expand Up @@ -1443,62 +1508,16 @@ func (c *Collector) Run() error {

// Wait for the all subtasks to be submitted and then call 'Drain' to wait for everything to finish.
c.subtasksWg.Wait()
r, err := c.Pool.Drain()
results, err := c.Pool.Drain()
if err != nil {
return fmt.Errorf("failed to drain the worker pool: %w", err)
}
r = append(r, results...)
// Close the worker Pool.
if err := c.Pool.Close(); err != nil {
return fmt.Errorf("failed to close the worker pool: %w", err)
}

// Run each serial task in its own workerpool.
for i, t := range serialTasks {
t := t
if c.shouldSkipTask(t) {
c.logDebug("Skipping %q", t.Description)
continue
}

var wg sync.WaitGroup
if t.CreatesSubtasks {
wg.Add(1)
}

// Adjust the worker count to make enough headroom for tasks that submit sub-tasks.
// This is necessary because 'Submit' is blocking.
wc := 1
if t.CreatesSubtasks {
wc++
}
c.Pool = workerpool.New(wc)

// Add the serial task to the worker pool.
if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error {
if t.CreatesSubtasks {
defer wg.Done()
}
c.logTask(t.Description)
defer c.logDebug("Finished %q", t.Description)
return t.Task(ctx)
}); err != nil {
return fmt.Errorf("failed to submit task to the worker pool: %w", err)
}

// Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish.
wg.Wait()
results, err := c.Pool.Drain()
if err != nil {
return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err)
}
// Close the worker Pool.
if err := c.Pool.Close(); err != nil {
return fmt.Errorf("failed to close the serial tasks worker pool: %w", err)
}

r = append(r, results...)
}

loggedStart := false
// Check if any errors occurred and warn the user.
for _, res := range r {
Expand Down Expand Up @@ -2116,6 +2135,44 @@ func (c *Collector) SubmitProfilingGopsSubtasks(pods []*corev1.Pod, containerNam
return nil
}

// SubmitTracingGopsSubtask submits task to collect tracing data from pods.
func (c *Collector) SubmitTracingGopsSubtask(pods []*corev1.Pod, containerName string) error {
for _, p := range pods {
p := p
if err := c.Pool.Submit(fmt.Sprintf("gops-%s-%s", p.Name, gopsTrace), func(ctx context.Context) error {
agentPID, err := c.getGopsPID(ctx, p, containerName)
if err != nil {
return err
}
o, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{
gopsCommand,
gopsTrace,
agentPID,
})
if err != nil {
return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
filePath, err := extractGopsProfileData(o.String())
if err != nil {
return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
f := c.AbsoluteTempPath(fmt.Sprintf("%s-%s-<ts>.trace", p.Name, gopsTrace))
err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, filePath, f, c.Options.CopyRetryLimit)
if err != nil {
return fmt.Errorf("failed to collect gops trace output for %q: %w", p.Name, err)
}
if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, filePath}); err != nil {
c.logWarn("failed to delete trace output from pod %q in namespace %q: %w", p.Name, p.Namespace, err)
return nil
}
return nil
}); err != nil {
return fmt.Errorf("failed to submit %s gops task for %q: %w", gopsTrace, p.Name, err)
}
}
return nil
}

// SubmitLogsTasks submits tasks to collect kubernetes logs from pods.
func (c *Collector) SubmitLogsTasks(pods []*corev1.Pod, since time.Duration, limitBytes int64) error {
t := time.Now().Add(-since)
Expand Down

0 comments on commit 85540e0

Please sign in to comment.