From 603c1ac1b3ada73a23b421074f44da614ac0448c Mon Sep 17 00:00:00 2001 From: Jiri Olsa Date: Sun, 8 Oct 2023 18:16:43 +0000 Subject: [PATCH 1/2] tetragon: Add CacheId object to ProcessInternal Signed-off-by: Jiri Olsa --- pkg/process/cache.go | 6 ++++++ pkg/process/process.go | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/pkg/process/cache.go b/pkg/process/cache.go index 190a052a3ff..5cee870c4b1 100644 --- a/pkg/process/cache.go +++ b/pkg/process/cache.go @@ -15,6 +15,12 @@ import ( lru "github.com/hashicorp/golang-lru/v2" ) +type CacheId struct { + pid uint32 + ktime uint64 + nodeName string +} + type Cache struct { cache *lru.Cache[string, *ProcessInternal] size int diff --git a/pkg/process/process.go b/pkg/process/process.go index ed0c4da9639..a85ddbf2284 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -52,6 +52,10 @@ type ProcessInternal struct { // garbage collector metadata color int // Writes should happen only inside gc select channel refcnt uint32 + + // cache id values for process/parent + cacheId CacheId + parentCacheId CacheId } var ( @@ -65,6 +69,10 @@ var ( ErrProcessInfoMissing = errors.New("failed process info missing") ) +func GetProcessCacheId(pid uint32, ktime uint64) CacheId { + return CacheId{pid, ktime, nodeName} +} + func InitCache(w watcher.K8sResourceWatcher, size int) error { var err error @@ -90,6 +98,14 @@ func FreeCache() { procCache = nil } +func (pi *ProcessInternal) GetProcessCacheId() CacheId { + return pi.cacheId +} + +func (pi *ProcessInternal) GetParentCacheId() CacheId { + return pi.parentCacheId +} + // GetProcessCopy() duplicates tetragon.Process and returns it func (pi *ProcessInternal) GetProcessCopy() *tetragon.Process { if pi.process == nil { @@ -114,6 +130,7 @@ func (pi *ProcessInternal) cloneInternalProcessCopy() *ProcessInternal { apiBinaryProp: pi.apiBinaryProp, namespaces: pi.namespaces, refcnt: 1, // Explicitly initialize refcnt to 1 + parentCacheId: pi.cacheId, } } @@ -339,6 +356,8 @@ func initProcessInternalExec( apiBinaryProp: apiBinaryProp, namespaces: apiNs, refcnt: 1, + cacheId: GetProcessCacheId(process.PID, process.Ktime), + parentCacheId: GetProcessCacheId(parent.Pid, parent.Ktime), } } @@ -357,6 +376,7 @@ func initProcessInternalClone(event *tetragonAPI.MsgCloneEvent, return nil, err } + pi.cacheId = GetProcessCacheId(event.PID, event.Ktime) pi.process.ParentExecId = parentExecId pi.process.ExecId = GetProcessID(event.PID, event.Ktime) pi.process.Pid = &wrapperspb.UInt32Value{Value: event.PID} From 9f5bc316c3aaca0c5c0cd6676f45231a141303bd Mon Sep 17 00:00:00 2001 From: Jiri Olsa Date: Sun, 8 Oct 2023 19:34:31 +0000 Subject: [PATCH 2/2] tetragon: Use CacheId to store ProcessInternal Signed-off-by: Jiri Olsa --- pkg/grpc/exec/exec.go | 5 ++--- pkg/grpc/exec/exec_test_helper.go | 4 ++-- pkg/process/cache.go | 19 +++++++++---------- pkg/process/cache_test.go | 9 +++++---- pkg/process/process.go | 19 ++++++++++--------- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/grpc/exec/exec.go b/pkg/grpc/exec/exec.go index ba2f1473874..8b8e1a16e19 100644 --- a/pkg/grpc/exec/exec.go +++ b/pkg/grpc/exec/exec.go @@ -52,7 +52,7 @@ func GetProcessExec(event *MsgExecveEventUnix, useCache bool) *tetragon.ProcessE parentId := tetragonProcess.ParentExecId processId := tetragonProcess.ExecId - parent, err := process.Get(parentId) + parent, err := process.Get(proc.GetParentCacheId()) if err == nil { tetragonParent = parent.UnsafeGetProcess() } @@ -204,8 +204,7 @@ func (msg *MsgExecveEventUnix) Retry(internal *process.ProcessInternal, ev notif // Check we have a parent with exception for pid 1, note we do this last because we want // to ensure the podInfo and process are set before returning any errors. if proc.Pid.Value > 1 && parent == nil { - parentId := proc.ParentExecId - parent, err := process.Get(parentId) + parent, err := process.Get(internal.GetParentCacheId()) if parent == nil { return err } diff --git a/pkg/grpc/exec/exec_test_helper.go b/pkg/grpc/exec/exec_test_helper.go index 7c6abfeeb9f..050b3d03f1e 100644 --- a/pkg/grpc/exec/exec_test_helper.go +++ b/pkg/grpc/exec/exec_test_helper.go @@ -289,8 +289,8 @@ func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *s } func GetProcessRefcntFromCache(t *testing.T, Pid uint32, Ktime uint64) uint32 { - procID := process.GetProcessID(Pid, Ktime) - proc, err := process.Get(procID) + cacheId := process.GetProcessCacheId(Pid, Ktime) + proc, err := process.Get(cacheId) if err == nil { return proc.RefGet() } diff --git a/pkg/process/cache.go b/pkg/process/cache.go index 5cee870c4b1..1746c3f2cc3 100644 --- a/pkg/process/cache.go +++ b/pkg/process/cache.go @@ -8,7 +8,6 @@ import ( "sync/atomic" "time" - "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/metrics/errormetrics" "github.com/cilium/tetragon/pkg/metrics/mapmetrics" @@ -22,7 +21,7 @@ type CacheId struct { } type Cache struct { - cache *lru.Cache[string, *ProcessInternal] + cache *lru.Cache[CacheId, *ProcessInternal] size int deleteChan chan *ProcessInternal stopChan chan bool @@ -81,7 +80,7 @@ func (pc *Cache) cacheGarbageCollector() { } if p.color == deleteReady { p.color = deleted - pc.remove(p.process) + pc.remove(p.cacheId) } else { newQueue = append(newQueue, p) p.color = deleteReady @@ -137,7 +136,7 @@ func NewCache( ) (*Cache, error) { lruCache, err := lru.NewWithEvict( processCacheSize, - func(_ string, _ *ProcessInternal) { + func(_ CacheId, _ *ProcessInternal) { mapmetrics.MapDropInc("processLru") }, ) @@ -152,28 +151,28 @@ func NewCache( return pm, nil } -func (pc *Cache) get(processID string) (*ProcessInternal, error) { +func (pc *Cache) get(processID CacheId) (*ProcessInternal, error) { process, ok := pc.cache.Get(processID) if !ok { logger.GetLogger().WithField("id in event", processID).Debug("process not found in cache") errormetrics.ErrorTotalInc(errormetrics.ProcessCacheMissOnGet) - return nil, fmt.Errorf("invalid entry for process ID: %s", processID) + return nil, fmt.Errorf("invalid entry for process ID: %v", processID) } return process, nil } // Add a ProcessInternal structure to the cache. Must be called only from // clone or execve events -func (pc *Cache) add(process *ProcessInternal) bool { - evicted := pc.cache.Add(process.process.ExecId, process) +func (pc *Cache) add(cacheId CacheId, process *ProcessInternal) bool { + evicted := pc.cache.Add(cacheId, process) if evicted { errormetrics.ErrorTotalInc(errormetrics.ProcessCacheEvicted) } return evicted } -func (pc *Cache) remove(process *tetragon.Process) bool { - present := pc.cache.Remove(process.ExecId) +func (pc *Cache) remove(cacheId CacheId) bool { + present := pc.cache.Remove(cacheId) if !present { errormetrics.ErrorTotalInc(errormetrics.ProcessCacheMissOnRemove) } diff --git a/pkg/process/cache_test.go b/pkg/process/cache_test.go index c4630daaad2..300c1faabd1 100644 --- a/pkg/process/cache_test.go +++ b/pkg/process/cache_test.go @@ -30,17 +30,18 @@ func TestProcessCache(t *testing.T) { }, }, } - cache.add(&proc) + cacheId := GetProcessCacheId(1234, 4321) + cache.add(cacheId, &proc) assert.Equal(t, cache.len(), 1) - result, err := cache.get(proc.process.ExecId) + result, err := cache.get(cacheId) assert.NoError(t, err) assert.Equal(t, proc.process.ExecId, result.process.ExecId) assert.Equal(t, proc.capabilities, result.capabilities) // remove the entry from cache. - assert.True(t, cache.remove(proc.process)) + assert.True(t, cache.remove(cacheId)) assert.Equal(t, cache.len(), 0) - _, err = cache.get(proc.process.ExecId) + _, err = cache.get(cacheId) assert.Error(t, err) } diff --git a/pkg/process/process.go b/pkg/process/process.go index a85ddbf2284..1d8242deb72 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -424,14 +424,14 @@ func GetParentProcessInternal(pid uint32, ktime uint64) (*ProcessInternal, *Proc var parent, process *ProcessInternal var err error - processID := GetProcessID(pid, ktime) + cacheId := GetProcessCacheId(pid, ktime) - if process, err = procCache.get(processID); err != nil { - logger.GetLogger().WithField("id in event", processID).WithField("pid", pid).WithField("ktime", ktime).Debug("process not found in cache") + if process, err = procCache.get(cacheId); err != nil { + logger.GetLogger().WithField("id in event", cacheId).WithField("pid", pid).WithField("ktime", ktime).Debug("process not found in cache") return nil, nil } - if parent, err = procCache.get(process.process.ParentExecId); err != nil { + if parent, err = procCache.get(process.parentCacheId); err != nil { logger.GetLogger().WithField("id in event", process.process.ParentExecId).WithField("pid", pid).WithField("ktime", ktime).Debug("parent process not found in cache") return process, nil } @@ -449,14 +449,15 @@ func AddExecEvent(event *tetragonAPI.MsgExecveEventUnix) *ProcessInternal { proc = initProcessInternalExec(event, event.CleanupProcess) } - procCache.add(proc) + procCache.add(proc.cacheId, proc) return proc } // AddCloneEvent adds a new process into the cache from a CloneEvent func AddCloneEvent(event *tetragonAPI.MsgCloneEvent) error { parentExecId := GetProcessID(event.Parent.Pid, event.Parent.Ktime) - parent, err := Get(parentExecId) + parentCacheId := GetProcessCacheId(event.Parent.Pid, event.Parent.Ktime) + parent, err := Get(parentCacheId) if err != nil { logger.GetLogger().WithFields(logrus.Fields{ "event.name": "Clone", @@ -472,12 +473,12 @@ func AddCloneEvent(event *tetragonAPI.MsgCloneEvent) error { } parent.RefInc() - procCache.add(proc) + procCache.add(proc.cacheId, proc) return nil } -func Get(execId string) (*ProcessInternal, error) { - return procCache.get(execId) +func Get(cacheId CacheId) (*ProcessInternal, error) { + return procCache.get(cacheId) } // GetK8s returns K8sResourceWatcher. You must call InitCache before calling this function to ensure