Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sysdump: Add "serial" tasks support to enable trace data collection #2052

Merged
merged 2 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/cli/cmd/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func initSysdumpFlags(cmd *cobra.Command, options *sysdump.Options, optionPrefix
cmd.Flags().BoolVar(&options.Profiling,
optionPrefix+"profiling", sysdump.DefaultProfiling,
"Whether to enable scraping profiling data")
cmd.Flags().BoolVar(&options.Tracing,
optionPrefix+"tracing", sysdump.DefaultTracing,
"Whether to enable scraping tracing data")
cmd.Flags().StringArrayVar(&options.ExtraLabelSelectors,
optionPrefix+"extra-label-selectors", nil,
"Optional set of labels selectors used to target additional pods for log collection.")
Expand Down
1 change: 1 addition & 0 deletions sysdump/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ var (
"pprof-heap",
"pprof-cpu",
}
gopsTrace = "trace"

// Gateway API resource group versions used for sysdumping these
gatewayClass = schema.GroupVersionResource{
Expand Down
1 change: 1 addition & 0 deletions sysdump/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
DefaultCiliumSpireServerLabelSelector = "app=spire-server"
DefaultDebug = false
DefaultProfiling = true
DefaultTracing = false
DefaultHubbleLabelSelector = labelPrefix + "hubble"
DefaultHubbleFlowsCount = 10000
DefaultHubbleFlowsTimeout = 5 * time.Second
Expand Down
109 changes: 108 additions & 1 deletion sysdump/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Options struct {
Debug bool
// Whether to enable scraping profiling data.
Profiling bool
// Whether to enable scraping tracing data.
Tracing bool
// The labels used to target additional pods
ExtraLabelSelectors []string
// The labels used to target Hubble pods.
Expand Down Expand Up @@ -489,6 +491,9 @@ func (c *Collector) Run() error {
},
}

// task that needs to be executed "serially" (i.e: not concurrently with other tasks).
var serialTasks []Task

ciliumTasks := []Task{
{
Description: "Collecting Cilium network policies",
Expand Down Expand Up @@ -1260,6 +1265,21 @@ func (c *Collector) Run() error {

if c.Options.CiliumNamespace != "" && c.Options.CiliumOperatorNamespace != "" {
tasks = append(tasks, ciliumTasks...)

serialTasks = append(serialTasks, Task{
CreatesSubtasks: true,
Description: "Collecting tracing data from Cilium pods",
Quick: false,
Task: func(ctx context.Context) error {
if !c.Options.Tracing {
return nil
}
if err := c.SubmitTracingGopsSubtask(c.CiliumPods, ciliumAgentContainerName); err != nil {
return fmt.Errorf("failed to collect tracing data from Cilium pods: %w", err)
}
return nil
},
})
}

tetragonTasks := []Task{
Expand Down Expand Up @@ -1375,6 +1395,54 @@ func (c *Collector) Run() error {
tasks = append(tasks, c.getEnvoyConfigTasks()...)
}

// First, run each serial task in its own workerpool.
var r []workerpool.Task
for i, t := range serialTasks {
pippolo84 marked this conversation as resolved.
Show resolved Hide resolved
t := t
if c.shouldSkipTask(t) {
c.logDebug("Skipping %q", t.Description)
continue
}

var wg sync.WaitGroup
if t.CreatesSubtasks {
wg.Add(1)
}

// Adjust the worker count to make enough headroom for tasks that submit sub-tasks.
// This is necessary because 'Submit' is blocking.
wc := 1
if t.CreatesSubtasks {
wc++
}
c.Pool = workerpool.New(wc)

// Add the serial task to the worker pool.
if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error {
if t.CreatesSubtasks {
defer wg.Done()
}
c.logTask(t.Description)
defer c.logDebug("Finished %q", t.Description)
return t.Task(ctx)
}); err != nil {
return fmt.Errorf("failed to submit task to the worker pool: %w", err)
}

// Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish.
wg.Wait()
asauber marked this conversation as resolved.
Show resolved Hide resolved
results, err := c.Pool.Drain()
if err != nil {
return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err)
}
// Close the worker Pool.
if err := c.Pool.Close(); err != nil {
return fmt.Errorf("failed to close the serial tasks worker pool: %w", err)
}

r = append(r, results...)
}

tklauser marked this conversation as resolved.
Show resolved Hide resolved
// Adjust the worker count to make enough headroom for tasks that submit sub-tasks.
// This is necessary because 'Submit' is blocking.
wc := 1
Expand Down Expand Up @@ -1414,10 +1482,11 @@ func (c *Collector) Run() error {

// Wait for the all subtasks to be submitted and then call 'Drain' to wait for everything to finish.
c.subtasksWg.Wait()
r, err := c.Pool.Drain()
results, err := c.Pool.Drain()
if err != nil {
return fmt.Errorf("failed to drain the worker pool: %w", err)
}
r = append(r, results...)
// Close the worker Pool.
if err := c.Pool.Close(); err != nil {
return fmt.Errorf("failed to close the worker pool: %w", err)
Expand Down Expand Up @@ -2073,6 +2142,44 @@ func (c *Collector) SubmitProfilingGopsSubtasks(pods []*corev1.Pod, containerNam
return nil
}

// SubmitTracingGopsSubtask submits task to collect tracing data from pods.
func (c *Collector) SubmitTracingGopsSubtask(pods []*corev1.Pod, containerName string) error {
for _, p := range pods {
p := p
if err := c.Pool.Submit(fmt.Sprintf("gops-%s-%s", p.Name, gopsTrace), func(ctx context.Context) error {
agentPID, err := c.getGopsPID(ctx, p, containerName)
if err != nil {
return err
}
o, err := c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{
gopsCommand,
asauber marked this conversation as resolved.
Show resolved Hide resolved
gopsTrace,
agentPID,
})
if err != nil {
return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
filePath, err := extractGopsProfileData(o.String())
if err != nil {
return fmt.Errorf("failed to collect gops trace for %q (%q) in namespace %q: %w", p.Name, containerName, p.Namespace, err)
}
f := c.AbsoluteTempPath(fmt.Sprintf("%s-%s-<ts>.trace", p.Name, gopsTrace))
err = c.Client.CopyFromPod(ctx, p.Namespace, p.Name, containerName, filePath, f, c.Options.CopyRetryLimit)
if err != nil {
return fmt.Errorf("failed to collect gops trace output for %q: %w", p.Name, err)
}
if _, err = c.Client.ExecInPod(ctx, p.Namespace, p.Name, containerName, []string{rmCommand, filePath}); err != nil {
c.logWarn("failed to delete trace output from pod %q in namespace %q: %w", p.Name, p.Namespace, err)
return nil
}
return nil
}); err != nil {
return fmt.Errorf("failed to submit %s gops task for %q: %w", gopsTrace, p.Name, err)
}
}
return nil
}

// SubmitLogsTasks submits tasks to collect kubernetes logs from pods.
func (c *Collector) SubmitLogsTasks(pods []*corev1.Pod, since time.Duration, limitBytes int64) error {
t := time.Now().Add(-since)
Expand Down