Skip to content

Commit

Permalink
tetragon: Use CacheId to store ProcessInternal
Browse files Browse the repository at this point in the history
Signed-off-by: Jiri Olsa <[email protected]>
  • Loading branch information
olsajiri committed Oct 8, 2023
1 parent 603c1ac commit 9f5bc31
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 28 deletions.
5 changes: 2 additions & 3 deletions pkg/grpc/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GetProcessExec(event *MsgExecveEventUnix, useCache bool) *tetragon.ProcessE
parentId := tetragonProcess.ParentExecId
processId := tetragonProcess.ExecId

parent, err := process.Get(parentId)
parent, err := process.Get(proc.GetParentCacheId())
if err == nil {
tetragonParent = parent.UnsafeGetProcess()
}
Expand Down Expand Up @@ -204,8 +204,7 @@ func (msg *MsgExecveEventUnix) Retry(internal *process.ProcessInternal, ev notif
// Check we have a parent with exception for pid 1, note we do this last because we want
// to ensure the podInfo and process are set before returning any errors.
if proc.Pid.Value > 1 && parent == nil {
parentId := proc.ParentExecId
parent, err := process.Get(parentId)
parent, err := process.Get(internal.GetParentCacheId())
if parent == nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/grpc/exec/exec_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *s
}

func GetProcessRefcntFromCache(t *testing.T, Pid uint32, Ktime uint64) uint32 {
procID := process.GetProcessID(Pid, Ktime)
proc, err := process.Get(procID)
cacheId := process.GetProcessCacheId(Pid, Ktime)
proc, err := process.Get(cacheId)
if err == nil {
return proc.RefGet()
}
Expand Down
19 changes: 9 additions & 10 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync/atomic"
"time"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/metrics/errormetrics"
"github.com/cilium/tetragon/pkg/metrics/mapmetrics"
Expand All @@ -22,7 +21,7 @@ type CacheId struct {
}

type Cache struct {
cache *lru.Cache[string, *ProcessInternal]
cache *lru.Cache[CacheId, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
Expand Down Expand Up @@ -81,7 +80,7 @@ func (pc *Cache) cacheGarbageCollector() {
}
if p.color == deleteReady {
p.color = deleted
pc.remove(p.process)
pc.remove(p.cacheId)
} else {
newQueue = append(newQueue, p)
p.color = deleteReady
Expand Down Expand Up @@ -137,7 +136,7 @@ func NewCache(
) (*Cache, error) {
lruCache, err := lru.NewWithEvict(
processCacheSize,
func(_ string, _ *ProcessInternal) {
func(_ CacheId, _ *ProcessInternal) {
mapmetrics.MapDropInc("processLru")
},
)
Expand All @@ -152,28 +151,28 @@ func NewCache(
return pm, nil
}

func (pc *Cache) get(processID string) (*ProcessInternal, error) {
func (pc *Cache) get(processID CacheId) (*ProcessInternal, error) {
process, ok := pc.cache.Get(processID)
if !ok {
logger.GetLogger().WithField("id in event", processID).Debug("process not found in cache")
errormetrics.ErrorTotalInc(errormetrics.ProcessCacheMissOnGet)
return nil, fmt.Errorf("invalid entry for process ID: %s", processID)
return nil, fmt.Errorf("invalid entry for process ID: %v", processID)
}
return process, nil
}

// Add a ProcessInternal structure to the cache. Must be called only from
// clone or execve events
func (pc *Cache) add(process *ProcessInternal) bool {
evicted := pc.cache.Add(process.process.ExecId, process)
func (pc *Cache) add(cacheId CacheId, process *ProcessInternal) bool {
evicted := pc.cache.Add(cacheId, process)
if evicted {
errormetrics.ErrorTotalInc(errormetrics.ProcessCacheEvicted)
}
return evicted
}

func (pc *Cache) remove(process *tetragon.Process) bool {
present := pc.cache.Remove(process.ExecId)
func (pc *Cache) remove(cacheId CacheId) bool {
present := pc.cache.Remove(cacheId)
if !present {
errormetrics.ErrorTotalInc(errormetrics.ProcessCacheMissOnRemove)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/process/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ func TestProcessCache(t *testing.T) {
},
},
}
cache.add(&proc)
cacheId := GetProcessCacheId(1234, 4321)
cache.add(cacheId, &proc)
assert.Equal(t, cache.len(), 1)

result, err := cache.get(proc.process.ExecId)
result, err := cache.get(cacheId)
assert.NoError(t, err)
assert.Equal(t, proc.process.ExecId, result.process.ExecId)
assert.Equal(t, proc.capabilities, result.capabilities)

// remove the entry from cache.
assert.True(t, cache.remove(proc.process))
assert.True(t, cache.remove(cacheId))
assert.Equal(t, cache.len(), 0)
_, err = cache.get(proc.process.ExecId)
_, err = cache.get(cacheId)
assert.Error(t, err)
}
19 changes: 10 additions & 9 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,14 @@ func GetParentProcessInternal(pid uint32, ktime uint64) (*ProcessInternal, *Proc
var parent, process *ProcessInternal
var err error

processID := GetProcessID(pid, ktime)
cacheId := GetProcessCacheId(pid, ktime)

if process, err = procCache.get(processID); err != nil {
logger.GetLogger().WithField("id in event", processID).WithField("pid", pid).WithField("ktime", ktime).Debug("process not found in cache")
if process, err = procCache.get(cacheId); err != nil {
logger.GetLogger().WithField("id in event", cacheId).WithField("pid", pid).WithField("ktime", ktime).Debug("process not found in cache")
return nil, nil
}

if parent, err = procCache.get(process.process.ParentExecId); err != nil {
if parent, err = procCache.get(process.parentCacheId); err != nil {
logger.GetLogger().WithField("id in event", process.process.ParentExecId).WithField("pid", pid).WithField("ktime", ktime).Debug("parent process not found in cache")
return process, nil
}
Expand All @@ -449,14 +449,15 @@ func AddExecEvent(event *tetragonAPI.MsgExecveEventUnix) *ProcessInternal {
proc = initProcessInternalExec(event, event.CleanupProcess)
}

procCache.add(proc)
procCache.add(proc.cacheId, proc)
return proc
}

// AddCloneEvent adds a new process into the cache from a CloneEvent
func AddCloneEvent(event *tetragonAPI.MsgCloneEvent) error {
parentExecId := GetProcessID(event.Parent.Pid, event.Parent.Ktime)
parent, err := Get(parentExecId)
parentCacheId := GetProcessCacheId(event.Parent.Pid, event.Parent.Ktime)
parent, err := Get(parentCacheId)
if err != nil {
logger.GetLogger().WithFields(logrus.Fields{
"event.name": "Clone",
Expand All @@ -472,12 +473,12 @@ func AddCloneEvent(event *tetragonAPI.MsgCloneEvent) error {
}

parent.RefInc()
procCache.add(proc)
procCache.add(proc.cacheId, proc)
return nil
}

func Get(execId string) (*ProcessInternal, error) {
return procCache.get(execId)
func Get(cacheId CacheId) (*ProcessInternal, error) {
return procCache.get(cacheId)
}

// GetK8s returns K8sResourceWatcher. You must call InitCache before calling this function to ensure
Expand Down

0 comments on commit 9f5bc31

Please sign in to comment.