Skip to content

Commit

Permalink
tetragon: Load initial sensor via sensor manager
Browse files Browse the repository at this point in the history
Loading initial sensor via sensor manager so it stores all the
loaded sensors/programs. So far we kept it separated but having
it in one place is handy for metrics requesting this data.

Moving the load before the server is started so we don't need to
"wait" for that. This way we can remove the sensorMgWait channel.

Signed-off-by: Jiri Olsa <[email protected]>
  • Loading branch information
olsajiri committed Oct 31, 2024
1 parent 4ee1a05 commit f520d68
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 57 deletions.
56 changes: 23 additions & 33 deletions cmd/tetragon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ func deleteOldBpfDir(path string) {
log.Infof("Removed bpf instance: %s", path)
}

func loadInitialSensor(ctx context.Context) error {
base.ConfigCgroupRate(&option.Config.CgroupRate)

mgr := observer.GetSensorManager()
initialSensor := base.GetInitialSensor()

if err := mgr.AddSensor(ctx, initialSensor.Name, initialSensor); err != nil {
return err
}
return mgr.EnableSensor(ctx, initialSensor.Name)
}

func tetragonExecute() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -359,20 +371,7 @@ func tetragonExecute() error {
cancel()
}()

// start sensor manager, and have it wait on sensorMgWait until we load
// the base sensor. note that this means that calling methods on the
// manager will block so they will have to either be executed in a
// goroutine or after we close the sensorMgWait channel to avoid
// deadlock.
sensorMgWait := make(chan struct{})
defer func() {
// if we fail before closing the channel, close it so that
// the sensor manager routine is unblocked.
if sensorMgWait != nil {
close(sensorMgWait)
}
}()
if err := obs.InitSensorManager(sensorMgWait); err != nil {
if err := obs.InitSensorManager(); err != nil {
return err
}

Expand Down Expand Up @@ -447,6 +446,16 @@ func tetragonExecute() error {
return err
}

// Load initial sensor before we start the server,
// so it's there before we allow to load policies.
if err = loadInitialSensor(ctx); err != nil {
return err
}
observer.GetSensorManager().LogSensorsAndProbes(ctx)
defer func() {
observer.RemoveSensors(ctx)
}()

pm, err := tetragonGrpc.NewProcessManager(
ctx,
&cleanupWg,
Expand Down Expand Up @@ -477,28 +486,9 @@ func tetragonExecute() error {

obs.LogPinnedBpf(observerDir)

base.ConfigCgroupRate(&option.Config.CgroupRate)

// load base sensor
initialSensor := base.GetInitialSensor()
if err := initialSensor.Load(observerDir); err != nil {
return err
}
defer func() {
initialSensor.Unload()
}()

cgrouprate.NewCgroupRate(ctx, pm, base.CgroupRateMap, &option.Config.CgroupRate)
cgrouprate.Config(base.CgroupRateOptionsMap)

// now that the base sensor was loaded, we can start the sensor manager
close(sensorMgWait)
sensorMgWait = nil
observer.GetSensorManager().LogSensorsAndProbes(ctx)
defer func() {
observer.RemoveSensors(ctx)
}()

err = loadTpFromDir(ctx, option.Config.TracingPolicyDir)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func runTetragon(ctx context.Context, configFile string, args *Arguments, summar
option.Config.BpfDir = bpf.MapPrefixPath()
obs := observer.NewObserver()

if err := obs.InitSensorManager(nil); err != nil {
if err := obs.InitSensorManager(); err != nil {
logger.GetLogger().Fatalf("InitSensorManager failed: %v", err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ func (k *Observer) Start(ctx context.Context) error {
}

// InitSensorManager starts the sensor controller
func (k *Observer) InitSensorManager(waitChan chan struct{}) error {
mgr, err := sensors.StartSensorManager(option.Config.BpfDir, waitChan)
func (k *Observer) InitSensorManager() error {
mgr, err := sensors.StartSensorManager(option.Config.BpfDir)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/observer/observertesthelper/observer_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func loadExporter(tb testing.TB, ctx context.Context, obs *observer.Observer, op
processCacheSize := 32768
dataCacheSize := 1024

if err := obs.InitSensorManager(nil); err != nil {
if err := obs.InitSensorManager(); err != nil {
return err
}

Expand Down
8 changes: 1 addition & 7 deletions pkg/sensors/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,18 @@ type SensorStatus struct {
// The purpose of this goroutine is to serialize loading and unloading of
// sensors as requested from different goroutines (e.g., different GRPC
// clients).
//
// if waitChan is not nil, the serving of sensor requests will block until
// something is received. The intention of this is to allow the main function
// to first load the base sensor before the sensor manager starts loading other sensors.
func StartSensorManager(
bpfDir string,
waitChan chan struct{},
) (*Manager, error) {
pfState, err := policyfilter.GetState()
if err != nil {
return nil, fmt.Errorf("failed to initialize policy filter state: %w", err)
}
return StartSensorManagerWithPF(bpfDir, waitChan, pfState)
return StartSensorManagerWithPF(bpfDir, pfState)
}

func StartSensorManagerWithPF(
bpfDir string,
waitChan chan struct{},
pfState policyfilter.State,
) (*Manager, error) {
colMap := newCollectionMap()
Expand Down
20 changes: 10 additions & 10 deletions pkg/sensors/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestAddPolicy(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
assert.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestAddPolicies(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
assert.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestAddPolicySpecError(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
assert.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestAddPolicyLoadError(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
assert.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand All @@ -157,7 +157,7 @@ func TestPolicyFilterDisabled(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

mgr, err := StartSensorManagerWithPF("", nil, policyfilter.DisabledState())
mgr, err := StartSensorManagerWithPF("", policyfilter.DisabledState())
assert.NoError(t, err)
defer mgr.StopSensorManager(ctx)

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestPolicyStates(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
require.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand All @@ -234,7 +234,7 @@ func TestPolicyStates(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
require.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestPolicyLoadErrorOverride(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
require.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestPolicyListingWhileLoadUnload(t *testing.T) {
polName := "test-policy"
testSensor := makeTestDelayedSensor(t)

mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
require.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestPolicyKernelMemoryBytes(t *testing.T) {
})

policy := v1alpha1.TracingPolicy{}
mgr, err := StartSensorManager("", nil)
mgr, err := StartSensorManager("")
require.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sensors/tracing/generickprobe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func Test_DisableEnablePolicy_Kprobe(t *testing.T) {

tus.LoadSensor(t, base.GetInitialSensor())
path := bpf.MapPrefixPath()
mgr, err := sensors.StartSensorManager(path, nil)
mgr, err := sensors.StartSensorManager(path)
assert.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down Expand Up @@ -190,7 +190,7 @@ func Test_DisableEnablePolicy_KernelMemoryBytes(t *testing.T) {

tus.LoadSensor(t, base.GetInitialSensor())
path := bpf.MapPrefixPath()
mgr, err := sensors.StartSensorManager(path, nil)
mgr, err := sensors.StartSensorManager(path)
require.NoError(t, err)
t.Cleanup(func() {
if err := mgr.StopSensorManager(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/sensors/sensors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func getTestSensorManager(ctx context.Context, t *testing.T, pfState policyfilte
}

path := bpf.MapPrefixPath()
mgr, err = sensors.StartSensorManagerWithPF(path, nil, pfState)
mgr, err = sensors.StartSensorManagerWithPF(path, pfState)
if err != nil {
t.Fatalf("StartSensorManagerWithPF failed: %s", err)
}
Expand Down

0 comments on commit f520d68

Please sign in to comment.