Skip to content

Commit

Permalink
sysdump: Add support for serial tasks
Browse files Browse the repository at this point in the history
Add support for "serial" tasks in the sysdump subcommand.  Serial tasks
are the ones that cannot be executed concurrently with other tasks.

One example of such task is the collection of the data from the Go
execution tracer, that should not be done while the agent is potentially
busy fulfilling requests from other tasks (e.g: collecting and sending
profiling data).

Signed-off-by: Fabio Falzoi <[email protected]>
  • Loading branch information
pippolo84 committed Oct 18, 2023
1 parent d509f1a commit 34b3fb3
Showing 1 changed file with 50 additions and 1 deletion.
51 changes: 50 additions & 1 deletion sysdump/sysdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"sync"
"time"

"github.com/cilium/cilium/pkg/versioncheck"
"github.com/cilium/workerpool"
"github.com/mholt/archiver/v3"
corev1 "k8s.io/api/core/v1"
Expand All @@ -26,6 +25,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/cli-runtime/pkg/genericclioptions"

"github.com/cilium/cilium/pkg/versioncheck"

"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/utils"
"github.com/cilium/cilium-cli/k8s"
Expand Down Expand Up @@ -489,6 +490,9 @@ func (c *Collector) Run() error {
},
}

// task that needs to be executed "serially" (i.e: not concurrently with other tasks).
var serialTasks []Task

ciliumTasks := []Task{
{
Description: "Collecting Cilium network policies",
Expand Down Expand Up @@ -1448,6 +1452,51 @@ func (c *Collector) Run() error {
return fmt.Errorf("failed to close the worker pool: %w", err)
}

// Run each serial task in its own workerpool.
for i, t := range serialTasks {
t := t
if c.shouldSkipTask(t) {
c.logDebug("Skipping %q", t.Description)
continue
}
if t.CreatesSubtasks {
c.subtasksWg.Add(1)
}

// Adjust the worker count to make enough headroom for tasks that submit sub-tasks.
// This is necessary because 'Submit' is blocking.
wc := 1
if t.CreatesSubtasks {
wc++
}
c.Pool = workerpool.New(wc)

// Add the serial task to the worker pool.
if err := c.Pool.Submit(fmt.Sprintf("[%d] %s", len(tasks)+i, t.Description), func(ctx context.Context) error {
if t.CreatesSubtasks {
defer c.subtasksWg.Done()
}
c.logTask(t.Description)
defer c.logDebug("Finished %q", t.Description)
return t.Task(ctx)
}); err != nil {
return fmt.Errorf("failed to submit task to the worker pool: %w", err)
}

// Wait for the subtasks to be submitted and then call 'Drain' to wait for them to finish.
c.subtasksWg.Wait()
results, err := c.Pool.Drain()
if err != nil {
return fmt.Errorf("failed to drain the serial tasks worker pool: %w", err)
}
// Close the worker Pool.
if err := c.Pool.Close(); err != nil {
return fmt.Errorf("failed to close the serial tasks worker pool: %w", err)
}

r = append(r, results...)
}

loggedStart := false
// Check if any errors occurred and warn the user.
for _, res := range r {
Expand Down

0 comments on commit 34b3fb3

Please sign in to comment.