Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proc id change #1567

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/grpc/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GetProcessExec(event *MsgExecveEventUnix, useCache bool) *tetragon.ProcessE
parentId := tetragonProcess.ParentExecId
processId := tetragonProcess.ExecId

parent, err := process.Get(parentId)
parent, err := process.Get(proc.GetParentCacheId())
if err == nil {
tetragonParent = parent.UnsafeGetProcess()
}
Expand Down Expand Up @@ -204,8 +204,7 @@ func (msg *MsgExecveEventUnix) Retry(internal *process.ProcessInternal, ev notif
// Check we have a parent with exception for pid 1, note we do this last because we want
// to ensure the podInfo and process are set before returning any errors.
if proc.Pid.Value > 1 && parent == nil {
parentId := proc.ParentExecId
parent, err := process.Get(parentId)
parent, err := process.Get(internal.GetParentCacheId())
if parent == nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/grpc/exec/exec_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ func InitEnv[EXEC notify.Message, EXIT notify.Message](t *testing.T, cancelWg *s
}

func GetProcessRefcntFromCache(t *testing.T, Pid uint32, Ktime uint64) uint32 {
procID := process.GetProcessID(Pid, Ktime)
proc, err := process.Get(procID)
cacheId := process.GetProcessCacheId(Pid, Ktime)
proc, err := process.Get(cacheId)
if err == nil {
return proc.RefGet()
}
Expand Down
25 changes: 15 additions & 10 deletions pkg/process/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ import (
"sync/atomic"
"time"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/metrics/errormetrics"
"github.com/cilium/tetragon/pkg/metrics/mapmetrics"
lru "github.com/hashicorp/golang-lru/v2"
)

type CacheId struct {
pid uint32
ktime uint64
nodeName string
}

type Cache struct {
cache *lru.Cache[string, *ProcessInternal]
cache *lru.Cache[CacheId, *ProcessInternal]
size int
deleteChan chan *ProcessInternal
stopChan chan bool
Expand Down Expand Up @@ -75,7 +80,7 @@ func (pc *Cache) cacheGarbageCollector() {
}
if p.color == deleteReady {
p.color = deleted
pc.remove(p.process)
pc.remove(p.cacheId)
} else {
newQueue = append(newQueue, p)
p.color = deleteReady
Expand Down Expand Up @@ -131,7 +136,7 @@ func NewCache(
) (*Cache, error) {
lruCache, err := lru.NewWithEvict(
processCacheSize,
func(_ string, _ *ProcessInternal) {
func(_ CacheId, _ *ProcessInternal) {
mapmetrics.MapDropInc("processLru")
},
)
Expand All @@ -146,28 +151,28 @@ func NewCache(
return pm, nil
}

func (pc *Cache) get(processID string) (*ProcessInternal, error) {
func (pc *Cache) get(processID CacheId) (*ProcessInternal, error) {
process, ok := pc.cache.Get(processID)
if !ok {
logger.GetLogger().WithField("id in event", processID).Debug("process not found in cache")
errormetrics.ErrorTotalInc(errormetrics.ProcessCacheMissOnGet)
return nil, fmt.Errorf("invalid entry for process ID: %s", processID)
return nil, fmt.Errorf("invalid entry for process ID: %v", processID)
}
return process, nil
}

// Add a ProcessInternal structure to the cache. Must be called only from
// clone or execve events
func (pc *Cache) add(process *ProcessInternal) bool {
evicted := pc.cache.Add(process.process.ExecId, process)
func (pc *Cache) add(cacheId CacheId, process *ProcessInternal) bool {
evicted := pc.cache.Add(cacheId, process)
if evicted {
errormetrics.ErrorTotalInc(errormetrics.ProcessCacheEvicted)
}
return evicted
}

func (pc *Cache) remove(process *tetragon.Process) bool {
present := pc.cache.Remove(process.ExecId)
func (pc *Cache) remove(cacheId CacheId) bool {
present := pc.cache.Remove(cacheId)
if !present {
errormetrics.ErrorTotalInc(errormetrics.ProcessCacheMissOnRemove)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/process/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ func TestProcessCache(t *testing.T) {
},
},
}
cache.add(&proc)
cacheId := GetProcessCacheId(1234, 4321)
cache.add(cacheId, &proc)
assert.Equal(t, cache.len(), 1)

result, err := cache.get(proc.process.ExecId)
result, err := cache.get(cacheId)
assert.NoError(t, err)
assert.Equal(t, proc.process.ExecId, result.process.ExecId)
assert.Equal(t, proc.capabilities, result.capabilities)

// remove the entry from cache.
assert.True(t, cache.remove(proc.process))
assert.True(t, cache.remove(cacheId))
assert.Equal(t, cache.len(), 0)
_, err = cache.get(proc.process.ExecId)
_, err = cache.get(cacheId)
assert.Error(t, err)
}
39 changes: 30 additions & 9 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type ProcessInternal struct {
// garbage collector metadata
color int // Writes should happen only inside gc select channel
refcnt uint32

// cache id values for process/parent
cacheId CacheId
parentCacheId CacheId
}

var (
Expand All @@ -65,6 +69,10 @@ var (
ErrProcessInfoMissing = errors.New("failed process info missing")
)

func GetProcessCacheId(pid uint32, ktime uint64) CacheId {
return CacheId{pid, ktime, nodeName}
}

func InitCache(w watcher.K8sResourceWatcher, size int) error {
var err error

Expand All @@ -90,6 +98,14 @@ func FreeCache() {
procCache = nil
}

func (pi *ProcessInternal) GetProcessCacheId() CacheId {
return pi.cacheId
}

func (pi *ProcessInternal) GetParentCacheId() CacheId {
return pi.parentCacheId
}

// GetProcessCopy() duplicates tetragon.Process and returns it
func (pi *ProcessInternal) GetProcessCopy() *tetragon.Process {
if pi.process == nil {
Expand All @@ -114,6 +130,7 @@ func (pi *ProcessInternal) cloneInternalProcessCopy() *ProcessInternal {
apiBinaryProp: pi.apiBinaryProp,
namespaces: pi.namespaces,
refcnt: 1, // Explicitly initialize refcnt to 1
parentCacheId: pi.cacheId,
}
}

Expand Down Expand Up @@ -339,6 +356,8 @@ func initProcessInternalExec(
apiBinaryProp: apiBinaryProp,
namespaces: apiNs,
refcnt: 1,
cacheId: GetProcessCacheId(process.PID, process.Ktime),
parentCacheId: GetProcessCacheId(parent.Pid, parent.Ktime),
}
}

Expand All @@ -357,6 +376,7 @@ func initProcessInternalClone(event *tetragonAPI.MsgCloneEvent,
return nil, err
}

pi.cacheId = GetProcessCacheId(event.PID, event.Ktime)
pi.process.ParentExecId = parentExecId
pi.process.ExecId = GetProcessID(event.PID, event.Ktime)
pi.process.Pid = &wrapperspb.UInt32Value{Value: event.PID}
Expand Down Expand Up @@ -404,14 +424,14 @@ func GetParentProcessInternal(pid uint32, ktime uint64) (*ProcessInternal, *Proc
var parent, process *ProcessInternal
var err error

processID := GetProcessID(pid, ktime)
cacheId := GetProcessCacheId(pid, ktime)

if process, err = procCache.get(processID); err != nil {
logger.GetLogger().WithField("id in event", processID).WithField("pid", pid).WithField("ktime", ktime).Debug("process not found in cache")
if process, err = procCache.get(cacheId); err != nil {
logger.GetLogger().WithField("id in event", cacheId).WithField("pid", pid).WithField("ktime", ktime).Debug("process not found in cache")
return nil, nil
}

if parent, err = procCache.get(process.process.ParentExecId); err != nil {
if parent, err = procCache.get(process.parentCacheId); err != nil {
logger.GetLogger().WithField("id in event", process.process.ParentExecId).WithField("pid", pid).WithField("ktime", ktime).Debug("parent process not found in cache")
return process, nil
}
Expand All @@ -429,14 +449,15 @@ func AddExecEvent(event *tetragonAPI.MsgExecveEventUnix) *ProcessInternal {
proc = initProcessInternalExec(event, event.CleanupProcess)
}

procCache.add(proc)
procCache.add(proc.cacheId, proc)
return proc
}

// AddCloneEvent adds a new process into the cache from a CloneEvent
func AddCloneEvent(event *tetragonAPI.MsgCloneEvent) error {
parentExecId := GetProcessID(event.Parent.Pid, event.Parent.Ktime)
parent, err := Get(parentExecId)
parentCacheId := GetProcessCacheId(event.Parent.Pid, event.Parent.Ktime)
parent, err := Get(parentCacheId)
if err != nil {
logger.GetLogger().WithFields(logrus.Fields{
"event.name": "Clone",
Expand All @@ -452,12 +473,12 @@ func AddCloneEvent(event *tetragonAPI.MsgCloneEvent) error {
}

parent.RefInc()
procCache.add(proc)
procCache.add(proc.cacheId, proc)
return nil
}

func Get(execId string) (*ProcessInternal, error) {
return procCache.get(execId)
func Get(cacheId CacheId) (*ProcessInternal, error) {
return procCache.get(cacheId)
}

// GetK8s returns K8sResourceWatcher. You must call InitCache before calling this function to ensure
Expand Down
Loading