From 4775eae4a47486af1c225509fc578f96b544d213 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Mon, 22 Apr 2024 11:32:18 -0700 Subject: [PATCH 1/9] Add logging on missed processes --- .../sessionmd/add_session_metadata.go | 3 ++- .../processors/sessionmd/processdb/db.go | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index 5b934980494..da8ae46b608 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -57,7 +57,7 @@ func New(cfg *cfg.C) (beat.Processor, error) { } backfilledPIDs := db.ScrapeProcfs() - logger.Debugf("backfilled %d processes", len(backfilledPIDs)) + logger.Errorf("backfilled %d processes", len(backfilledPIDs)) var p provider.Provider @@ -106,6 +106,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { result, err := p.enrich(ev) if err != nil { + p.logger.Errorf("enriching event: %w", err) return ev, fmt.Errorf("enriching event: %w", err) } return result, nil diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index f5ec71d1d4a..178231132d4 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -556,7 +556,22 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { process, ok := db.processes[pid] if !ok { - return types.Process{}, errors.New("process not found") + procInfo, err := db.procfs.GetProcess(pid) + if err != nil { + return types.Process{}, errors.New("process not found in db (scraping from proc failed)") + } + process := Process{ + PIDs: pidInfoFromProto(procInfo.PIDs), + Creds: credInfoFromProto(procInfo.Creds), + CTTY: ttyDevFromProto(procInfo.CTTY), + Argv: procInfo.Argv, + Cwd: procInfo.Cwd, + Env: procInfo.Env, + Filename: procInfo.Filename, + } + db.insertProcess(process) + + process = db.processes[pid] } ret := fullProcessFromDBProcess(process) From 8f94732f5dc3324dcfb30697b76cc509d861b417 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Tue, 23 Apr 2024 21:59:27 -0700 Subject: [PATCH 2/9] Keep exit event process in DB With the add_session_metadata processor, don't remove processes from the process db when the process has exited. The processor can be run on an fork/exec events after the process has actually exited, so the process must remain in the DB after it has exited, so the info can be used in enrichment of these events. Now the process is kept in the DB, and the exit code is appended, so the exit code is also now properly enriched for exit events. --- .../sessionmd/add_session_metadata.go | 3 ++ .../processors/sessionmd/processdb/db.go | 28 ++++++++----------- .../processors/sessionmd/types/process.go | 2 +- 3 files changed, 15 insertions(+), 18 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index da8ae46b608..312ce68496e 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -70,6 +70,9 @@ func New(cfg *cfg.C) (beat.Processor, error) { if err != nil { return nil, fmt.Errorf("failed to create provider: %w", err) } + logger.Info("backend=auto using procfs") + } else { + logger.Info("backend=auto using ebpf") } case "ebpf": p, err = ebpf_provider.NewProvider(ctx, logger, db) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index 178231132d4..d683359ec20 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -82,6 +82,7 @@ type Process struct { Cwd string Env map[string]string Filename string + ExitCode int32 } var ( @@ -406,7 +407,14 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) { defer db.mutex.Unlock() pid := exit.PIDs.Tgid - delete(db.processes, pid) + process, ok := db.processes[pid] + if !ok { + db.logger.Errorf("could not insert exit, pid %v not found in db", pid) + } + + process.ExitCode = exit.ExitCode + db.processes[pid] = process + delete(db.entryLeaders, pid) delete(db.entryLeaderRelationships, pid) } @@ -437,6 +445,7 @@ func fullProcessFromDBProcess(p Process) types.Process { ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective) ret.TTY.CharDevice.Major = p.CTTY.Major ret.TTY.CharDevice.Minor = p.CTTY.Minor + ret.ExitCode = p.ExitCode return ret } @@ -556,22 +565,7 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { process, ok := db.processes[pid] if !ok { - procInfo, err := db.procfs.GetProcess(pid) - if err != nil { - return types.Process{}, errors.New("process not found in db (scraping from proc failed)") - } - process := Process{ - PIDs: pidInfoFromProto(procInfo.PIDs), - Creds: credInfoFromProto(procInfo.Creds), - CTTY: ttyDevFromProto(procInfo.CTTY), - Argv: procInfo.Argv, - Cwd: procInfo.Cwd, - Env: procInfo.Env, - Filename: procInfo.Filename, - } - db.insertProcess(process) - - process = db.processes[pid] + return types.Process{}, errors.New("process not found") } ret := fullProcessFromDBProcess(process) diff --git a/x-pack/auditbeat/processors/sessionmd/types/process.go b/x-pack/auditbeat/processors/sessionmd/types/process.go index 232dd348de8..daf989ef3cd 100644 --- a/x-pack/auditbeat/processors/sessionmd/types/process.go +++ b/x-pack/auditbeat/processors/sessionmd/types/process.go @@ -33,7 +33,7 @@ type Process struct { // The exit code of the process, if this is a termination event. // The field should be absent if there is no exit code for the event (e.g. process start). - ExitCode *int64 `json:"exit_code,omitempty"` + ExitCode int32 `json:"exit_code,omitempty"` // Whether the process is connected to an interactive shell. // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive. From 9eb01ea54cac5100279699bdc26862c2548a9d3b Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Tue, 23 Apr 2024 22:31:27 -0700 Subject: [PATCH 3/9] update changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3ee1edbffa3..63614c0527e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -95,6 +95,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Auditbeat* - Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994] +- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173] *Filebeat* From d074b3edaa1b19823982d80276b041e065eb5bcb Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 24 Apr 2024 15:49:26 -0700 Subject: [PATCH 4/9] Add process db reaper --- .../sessionmd/add_session_metadata.go | 5 + .../processors/sessionmd/processdb/db.go | 74 +++- .../sessionmd/processdb/entry_leader_test.go | 347 ++++++++++++++++++ 3 files changed, 422 insertions(+), 4 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index 76cee009424..ff9fa54e556 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -114,6 +114,11 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { return result, nil } +func (p *addSessionMetadata) Close() error { + p.db.Close() + return nil +} + func (p *addSessionMetadata) String() string { return fmt.Sprintf("%v=[backend=%s, pid_field=%s]", processorName, p.config.Backend, p.config.PIDField) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index 7c7e86b4419..3c8496f67be 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -26,6 +26,11 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +const ( + reaperInterval = 15 * time.Second // run the reaper process at this interval + removalTime = 10 * time.Second // remove processes that have been exited longer than this +) + type TTYType int const ( @@ -186,6 +191,8 @@ type DB struct { entryLeaders map[uint32]EntryType entryLeaderRelationships map[uint32]uint32 procfs procfs.Reader + stopChan chan struct{} + removalCandidates map[uint32]removalCandidate } func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { @@ -193,13 +200,17 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { if initError != nil { return &DB{}, initError } - return &DB{ + db := DB{ logger: logp.NewLogger("processdb"), processes: make(map[uint32]Process), entryLeaders: make(map[uint32]EntryType), entryLeaderRelationships: make(map[uint32]uint32), procfs: reader, - }, nil + stopChan: make(chan struct{}), + removalCandidates: make(map[uint32]removalCandidate), + } + db.startReaper() + return &db, nil } func (db *DB) calculateEntityIDv1(pid uint32, startTime time.Time) string { @@ -407,8 +418,6 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) { defer db.mutex.Unlock() pid := exit.PIDs.Tgid - delete(db.entryLeaders, pid) - delete(db.entryLeaderRelationships, pid) process, ok := db.processes[pid] if !ok { db.logger.Errorf("could not insert exit, pid %v not found in db", pid) @@ -416,6 +425,10 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) { } process.ExitCode = exit.ExitCode db.processes[pid] = process + db.removalCandidates[pid] = removalCandidate{ + startTime: process.PIDs.StartTimeNS, + exitTime: time.Now(), + } } func interactiveFromTTY(tty types.TTYDev) bool { @@ -724,3 +737,56 @@ func (db *DB) scrapeAncestors(proc Process) { db.insertProcess(p) } } + +func (db *DB) Close() { + close(db.stopChan) +} + +type removalCandidate struct { + exitTime time.Time + startTime uint64 +} + +// The reaper will remove exited processes from the DB a short time after they have exited. +// Processes cannot be removed immediately when exiting, as the event enrichment will happen sometime +// afterwards, and will fail if the process is already removed from the DB. +// +// In Linux, exited processes cannot be session leader, process group leader or parent, so if a process has exited, +// it cannot have a relation with any other longer-lived processes. If this processor is ported to other OSs, this +// assumption will need to be revisited. +func (db *DB) startReaper() { + ticker := time.NewTicker(reaperInterval) + defer ticker.Stop() + now := time.Now() + + go func() { + for { + select { + case <-ticker.C: + db.mutex.Lock() + for pid, c := range db.removalCandidates { + p, ok := db.processes[pid] + if !ok { + db.logger.Debugf("pid %v was candidate for removal, but was already removed", pid) + delete(db.removalCandidates, pid) + continue + } + if p.PIDs.StartTimeNS != c.startTime { + db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", pid) + delete(db.removalCandidates, pid) + continue + } + if now.Sub(c.exitTime) > removalTime { + delete(db.processes, pid) + delete(db.entryLeaders, pid) + delete(db.entryLeaderRelationships, pid) + delete(db.removalCandidates, pid) + } + } + db.mutex.Unlock() + case <-db.stopChan: + return + } + } + }() +} diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go b/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go index 15f98250f55..74140f47f6c 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go @@ -23,6 +23,7 @@ const ( lsPath = "/usr/bin/ls" bashPath = "/usr/bin/bash" grepPath = "/usr/bin/grep" + wPath = "/usr/bin/w" ) // Entry evaluation tests @@ -1242,3 +1243,349 @@ func TestKernelThreads(t *testing.T) { requireProcess(t, db, rcuGpPID, rcuGpPath) requireParent(t, db, rcuGpPID, kthreaddPID) } + +// PIDs can be reused when the maximum PID is reached and the number rolls over. +// The DB should always have the current process representation when PIDs are reused. +// In the same session a process exits and the PID is reused for a new process. +func TestPIDReuseSameSession(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshd0PID := uint32(100) + sshd1PID := uint32(101) + bashPID := uint32(1000) + commandPID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: commandPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: commandPID, + }, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: commandPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: commandPID, + }, + ExitCode: 0, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: commandPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: commandPID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd0 + requireProcess(t, db, sshd0PID, sshdPath) + requireParent(t, db, sshd0PID, 1) + requireSessionLeader(t, db, sshd0PID, sshd0PID) + requireEntryLeader(t, db, sshd0PID, sshd0PID, Init) + + // sshd1 + requireProcess(t, db, sshd1PID, sshdPath) + requireParent(t, db, sshd1PID, sshd0PID) + requireSessionLeader(t, db, sshd1PID, sshd1PID) + requireEntryLeader(t, db, sshd1PID, sshd0PID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshd1PID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // grep + requireProcess(t, db, commandPID, grepPath) + requireParent(t, db, commandPID, bashPID) + requireSessionLeader(t, db, commandPID, bashPID) + requireEntryLeader(t, db, commandPID, bashPID, Sshd) +} + +// A new session, where all PIDs have been previously used for other, now exited, processes +func TestPIDReuseNewSession(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshd0PID := uint32(100) + sshd1PID := uint32(101) + bashPID := uint32(1000) + command0PID := uint32(1001) + command1PID := uint32(1002) + command2PID := uint32(1003) + + // 1st session + // sshd0 + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + // sshd1 + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + // bash + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + // command0 + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: command0PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: command0PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + ExitCode: 0, + }) + + // command 1 & 2 in pg + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: command1PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: command2PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: command2PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + ExitCode: 0, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: command1PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + ExitCode: 0, + }) + + // exit bash + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + ExitCode: 0, + }) + + // exit sshd1 + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + ExitCode: 0, + }) + + // exit sshd0 + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + ExitCode: 0, + }) + + //2nd session + x1 := bashPID + x2 := sshd0PID + sshd0PID = command0PID + sshd1PID = command1PID + bashPID = command2PID + command0PID = x1 + command1PID = x2 + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: wPath, + PIDs: types.PIDInfo{ + Tgid: command0PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: command1PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd0 + requireProcess(t, db, sshd0PID, sshdPath) + requireParent(t, db, sshd0PID, 1) + requireSessionLeader(t, db, sshd0PID, sshd0PID) + requireEntryLeader(t, db, sshd0PID, sshd0PID, Init) + + // sshd1 + requireProcess(t, db, sshd1PID, sshdPath) + requireParent(t, db, sshd1PID, sshd0PID) + requireSessionLeader(t, db, sshd1PID, sshd1PID) + requireEntryLeader(t, db, sshd1PID, sshd0PID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshd1PID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // w + requireProcess(t, db, command0PID, wPath) + requireParent(t, db, command0PID, bashPID) + requireSessionLeader(t, db, command0PID, bashPID) + requireEntryLeader(t, db, command0PID, bashPID, Sshd) + requireGroupLeader(t, db, command0PID, command0PID) + + // grep + requireProcess(t, db, command1PID, grepPath) + requireParent(t, db, command1PID, bashPID) + requireSessionLeader(t, db, command1PID, bashPID) + requireEntryLeader(t, db, command1PID, bashPID, Sshd) + requireGroupLeader(t, db, command1PID, command0PID) +} From bcb9a696ede215d37e35d1c949096766115299d6 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 24 Apr 2024 17:20:32 -0700 Subject: [PATCH 5/9] Use heap for removal queue --- .../processors/sessionmd/processdb/db.go | 99 +++++++++++++------ 1 file changed, 71 insertions(+), 28 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index 3c8496f67be..a87f42faf88 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -7,6 +7,7 @@ package processdb import ( + "container/heap" "encoding/base64" "errors" "fmt" @@ -26,11 +27,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -const ( - reaperInterval = 15 * time.Second // run the reaper process at this interval - removalTime = 10 * time.Second // remove processes that have been exited longer than this -) - type TTYType int const ( @@ -192,7 +188,7 @@ type DB struct { entryLeaderRelationships map[uint32]uint32 procfs procfs.Reader stopChan chan struct{} - removalCandidates map[uint32]removalCandidate + removalCandidates rcHeap } func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { @@ -206,8 +202,8 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { entryLeaders: make(map[uint32]EntryType), entryLeaderRelationships: make(map[uint32]uint32), procfs: reader, - stopChan: make(chan struct{}), - removalCandidates: make(map[uint32]removalCandidate), + stopChan: make(chan struct{}), + removalCandidates: make(rcHeap, 0), } db.startReaper() return &db, nil @@ -425,10 +421,11 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) { } process.ExitCode = exit.ExitCode db.processes[pid] = process - db.removalCandidates[pid] = removalCandidate{ + heap.Push(&db.removalCandidates, removalCandidate{ + pid: pid, startTime: process.PIDs.StartTimeNS, - exitTime: time.Now(), - } + exitTime: time.Now(), + }) } func interactiveFromTTY(tty types.TTYDev) bool { @@ -742,11 +739,43 @@ func (db *DB) Close() { close(db.stopChan) } +const ( + reaperInterval = 10 * time.Second // run the reaper process at this interval + removalTime = 5 * time.Second // remove processes that have been exited longer than this +) + type removalCandidate struct { + pid uint32 exitTime time.Time startTime uint64 } +type rcHeap []removalCandidate + +func (h rcHeap) Len() int { + return len(h) +} + +func (h rcHeap) Less(i, j int) bool { + return h[i].exitTime.Sub(h[j].exitTime) < 0 +} + +func (h rcHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *rcHeap) Push(x any) { + *h = append(*h, x.(removalCandidate)) +} + +func (h *rcHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + // The reaper will remove exited processes from the DB a short time after they have exited. // Processes cannot be removed immediately when exiting, as the event enrichment will happen sometime // afterwards, and will fail if the process is already removed from the DB. @@ -755,38 +784,52 @@ type removalCandidate struct { // it cannot have a relation with any other longer-lived processes. If this processor is ported to other OSs, this // assumption will need to be revisited. func (db *DB) startReaper() { - ticker := time.NewTicker(reaperInterval) - defer ticker.Stop() - now := time.Now() + go func(db *DB) { + ticker := time.NewTicker(reaperInterval) + defer ticker.Stop() - go func() { + h := &db.removalCandidates + heap.Init(h) for { select { case <-ticker.C: db.mutex.Lock() - for pid, c := range db.removalCandidates { - p, ok := db.processes[pid] + now := time.Now() + for { + if len(db.removalCandidates) == 0 { + break + } + v := heap.Pop(h) + c, ok := v.(removalCandidate) if !ok { - db.logger.Debugf("pid %v was candidate for removal, but was already removed", pid) - delete(db.removalCandidates, pid) + db.logger.Errorf("unexpected item in removal queue: \"%v\"", v) continue } - if p.PIDs.StartTimeNS != c.startTime { - db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", pid) - delete(db.removalCandidates, pid) + if now.Sub(c.exitTime) < removalTime { + // this candidate hasn't reached its timeout, put it back on the heap + // everything else will have a later exit time, so end this run + heap.Push(h, c) + break + } + p, ok := db.processes[c.pid] + if !ok { + db.logger.Errorf("pid %v was candidate for removal, but was already removed", c.pid) continue } - if now.Sub(c.exitTime) > removalTime { - delete(db.processes, pid) - delete(db.entryLeaders, pid) - delete(db.entryLeaderRelationships, pid) - delete(db.removalCandidates, pid) + if p.PIDs.StartTimeNS != c.startTime { + // this could happen if the PID has already rolled over and reached this PID again. + db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", c.pid) + continue } + db.logger.Errorf("Removing pid %v", c.pid) + delete(db.processes, c.pid) + delete(db.entryLeaders, c.pid) + delete(db.entryLeaderRelationships, c.pid) } db.mutex.Unlock() case <-db.stopChan: return } } - }() + }(db) } From a4f8a76bda0a10e1cc27299e53438f9f9d9319ee Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 24 Apr 2024 17:27:35 -0700 Subject: [PATCH 6/9] Move the process reaper into its own file --- .../processors/sessionmd/processdb/db.go | 95 ---------------- .../processors/sessionmd/processdb/reaper.go | 104 ++++++++++++++++++ 2 files changed, 104 insertions(+), 95 deletions(-) create mode 100644 x-pack/auditbeat/processors/sessionmd/processdb/reaper.go diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index a87f42faf88..2c7c228e2c1 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -738,98 +738,3 @@ func (db *DB) scrapeAncestors(proc Process) { func (db *DB) Close() { close(db.stopChan) } - -const ( - reaperInterval = 10 * time.Second // run the reaper process at this interval - removalTime = 5 * time.Second // remove processes that have been exited longer than this -) - -type removalCandidate struct { - pid uint32 - exitTime time.Time - startTime uint64 -} - -type rcHeap []removalCandidate - -func (h rcHeap) Len() int { - return len(h) -} - -func (h rcHeap) Less(i, j int) bool { - return h[i].exitTime.Sub(h[j].exitTime) < 0 -} - -func (h rcHeap) Swap(i, j int) { - h[i], h[j] = h[j], h[i] -} - -func (h *rcHeap) Push(x any) { - *h = append(*h, x.(removalCandidate)) -} - -func (h *rcHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -// The reaper will remove exited processes from the DB a short time after they have exited. -// Processes cannot be removed immediately when exiting, as the event enrichment will happen sometime -// afterwards, and will fail if the process is already removed from the DB. -// -// In Linux, exited processes cannot be session leader, process group leader or parent, so if a process has exited, -// it cannot have a relation with any other longer-lived processes. If this processor is ported to other OSs, this -// assumption will need to be revisited. -func (db *DB) startReaper() { - go func(db *DB) { - ticker := time.NewTicker(reaperInterval) - defer ticker.Stop() - - h := &db.removalCandidates - heap.Init(h) - for { - select { - case <-ticker.C: - db.mutex.Lock() - now := time.Now() - for { - if len(db.removalCandidates) == 0 { - break - } - v := heap.Pop(h) - c, ok := v.(removalCandidate) - if !ok { - db.logger.Errorf("unexpected item in removal queue: \"%v\"", v) - continue - } - if now.Sub(c.exitTime) < removalTime { - // this candidate hasn't reached its timeout, put it back on the heap - // everything else will have a later exit time, so end this run - heap.Push(h, c) - break - } - p, ok := db.processes[c.pid] - if !ok { - db.logger.Errorf("pid %v was candidate for removal, but was already removed", c.pid) - continue - } - if p.PIDs.StartTimeNS != c.startTime { - // this could happen if the PID has already rolled over and reached this PID again. - db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", c.pid) - continue - } - db.logger.Errorf("Removing pid %v", c.pid) - delete(db.processes, c.pid) - delete(db.entryLeaders, c.pid) - delete(db.entryLeaderRelationships, c.pid) - } - db.mutex.Unlock() - case <-db.stopChan: - return - } - } - }(db) -} diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go new file mode 100644 index 00000000000..17b65ef2aaa --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go @@ -0,0 +1,104 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package processdb + +import ( + "container/heap" + "time" +) + +const ( + reaperInterval = 30 * time.Second // run the reaper process at this interval + removalTime = 10 * time.Second // remove processes that have been exited longer than this +) + +type removalCandidate struct { + pid uint32 + exitTime time.Time + startTime uint64 +} + +type rcHeap []removalCandidate + +func (h rcHeap) Len() int { + return len(h) +} + +func (h rcHeap) Less(i, j int) bool { + return h[i].exitTime.Sub(h[j].exitTime) < 0 +} + +func (h rcHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *rcHeap) Push(x any) { + *h = append(*h, x.(removalCandidate)) +} + +func (h *rcHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// The reaper will remove exited processes from the DB a short time after they have exited. +// Processes cannot be removed immediately when exiting, as the event enrichment will happen sometime +// afterwards, and will fail if the process is already removed from the DB. +// +// In Linux, exited processes cannot be session leader, process group leader or parent, so if a process has exited, +// it cannot have a relation with any other longer-lived processes. If this processor is ported to other OSs, this +// assumption will need to be revisited. +func (db *DB) startReaper() { + go func(db *DB) { + ticker := time.NewTicker(reaperInterval) + defer ticker.Stop() + + h := &db.removalCandidates + heap.Init(h) + for { + select { + case <-ticker.C: + db.mutex.Lock() + now := time.Now() + for { + if len(db.removalCandidates) == 0 { + break + } + v := heap.Pop(h) + c, ok := v.(removalCandidate) + if !ok { + db.logger.Debugf("unexpected item in removal queue: \"%v\"", v) + continue + } + if now.Sub(c.exitTime) < removalTime { + // this candidate hasn't reached its timeout, put it back on the heap + // everything else will have a later exit time, so end this run + heap.Push(h, c) + break + } + p, ok := db.processes[c.pid] + if !ok { + db.logger.Debugf("pid %v was candidate for removal, but was already removed", c.pid) + continue + } + if p.PIDs.StartTimeNS != c.startTime { + // this could happen if the PID has already rolled over and reached this PID again. + db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", c.pid) + continue + } + delete(db.processes, c.pid) + delete(db.entryLeaders, c.pid) + delete(db.entryLeaderRelationships, c.pid) + } + db.mutex.Unlock() + case <-db.stopChan: + return + } + } + }(db) +} From e9e8095fdc2b1e36e67712c4a2bd4c0bff7e695f Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 24 Apr 2024 17:38:22 -0700 Subject: [PATCH 7/9] Add linux build directive --- x-pack/auditbeat/processors/sessionmd/processdb/reaper.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go index 17b65ef2aaa..ad6fd67a8a0 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go @@ -2,6 +2,8 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. +//go:build linux + package processdb import ( @@ -11,7 +13,7 @@ import ( const ( reaperInterval = 30 * time.Second // run the reaper process at this interval - removalTime = 10 * time.Second // remove processes that have been exited longer than this + removalTimeout = 10 * time.Second // remove processes that have been exited longer than this ) type removalCandidate struct { @@ -75,7 +77,7 @@ func (db *DB) startReaper() { db.logger.Debugf("unexpected item in removal queue: \"%v\"", v) continue } - if now.Sub(c.exitTime) < removalTime { + if now.Sub(c.exitTime) < removalTimeout { // this candidate hasn't reached its timeout, put it back on the heap // everything else will have a later exit time, so end this run heap.Push(h, c) From bfa804c8670f930bf1f907bcea2602cac16560e2 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 24 Apr 2024 17:42:09 -0700 Subject: [PATCH 8/9] Refuse to push invalid items to heap --- x-pack/auditbeat/processors/sessionmd/processdb/reaper.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go index ad6fd67a8a0..12751bead9c 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go @@ -13,7 +13,7 @@ import ( const ( reaperInterval = 30 * time.Second // run the reaper process at this interval - removalTimeout = 10 * time.Second // remove processes that have been exited longer than this + removalTimeout = 10 * time.Second // remove processes that have been exited longer than this ) type removalCandidate struct { @@ -37,7 +37,10 @@ func (h rcHeap) Swap(i, j int) { } func (h *rcHeap) Push(x any) { - *h = append(*h, x.(removalCandidate)) + v, ok := x.(removalCandidate) + if ok { + *h = append(*h, v) + } } func (h *rcHeap) Pop() any { From e5b7e0f7b21508a5b34617e6f829920ce53ffa40 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Thu, 25 Apr 2024 12:30:28 -0700 Subject: [PATCH 9/9] Change log level when procfs misses process --- .../sessionmd/provider/procfs_provider/procfs_provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go index a0d11ad18e5..2f99dd72b1f 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go @@ -69,7 +69,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { pe.Env = proc_info.Env pe.Filename = proc_info.Filename } else { - s.logger.Errorf("get process info from proc for pid %v: %w", pid, err) + s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err) // If process info couldn't be taken from procfs, populate with as much info as // possible from the event pe.PIDs.Tgid = uint32(pid)