From e743fd4444bf15e223d15752c5ee96c16e41bc4d Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Thu, 25 Apr 2024 13:27:41 -0700 Subject: [PATCH] [add_session_metadata processor] Keep exited processes in the process DB (#39173) With the add_session_metadata processor, don't remove processes from the process db when the process has exited. The processor can be run on an fork/exec events after the process has actually exited, so the process must remain in the DB after it has exited, so the info can be used in enrichment of these events. Now that the process is kept in the DB, the exit code is also appended on exit events, so the exit code can be used in enrichment of the exit events. (cherry picked from commit 964958886a0416017d3720a4a38a7805e198b7c3) --- CHANGELOG.next.asciidoc | 1 + .../sessionmd/add_session_metadata.go | 10 +- .../processors/sessionmd/processdb/db.go | 32 +- .../sessionmd/processdb/entry_leader_test.go | 347 ++++++++++++++++++ .../processors/sessionmd/processdb/reaper.go | 109 ++++++ .../procfs_provider/procfs_provider.go | 2 +- .../processors/sessionmd/types/process.go | 2 +- 7 files changed, 495 insertions(+), 8 deletions(-) create mode 100644 x-pack/auditbeat/processors/sessionmd/processdb/reaper.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 14f18ada3e7..586c0ce05a1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Auditbeat* - Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994] +- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173] *Filebeat* diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index 5b934980494..ff9fa54e556 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -57,7 +57,7 @@ func New(cfg *cfg.C) (beat.Processor, error) { } backfilledPIDs := db.ScrapeProcfs() - logger.Debugf("backfilled %d processes", len(backfilledPIDs)) + logger.Infof("backfilled %d processes", len(backfilledPIDs)) var p provider.Provider @@ -70,6 +70,9 @@ func New(cfg *cfg.C) (beat.Processor, error) { if err != nil { return nil, fmt.Errorf("failed to create provider: %w", err) } + logger.Info("backend=auto using procfs") + } else { + logger.Info("backend=auto using ebpf") } case "ebpf": p, err = ebpf_provider.NewProvider(ctx, logger, db) @@ -111,6 +114,11 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { return result, nil } +func (p *addSessionMetadata) Close() error { + p.db.Close() + return nil +} + func (p *addSessionMetadata) String() string { return fmt.Sprintf("%v=[backend=%s, pid_field=%s]", processorName, p.config.Backend, p.config.PIDField) diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index f5ec71d1d4a..2c7c228e2c1 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -7,6 +7,7 @@ package processdb import ( + "container/heap" "encoding/base64" "errors" "fmt" @@ -82,6 +83,7 @@ type Process struct { Cwd string Env map[string]string Filename string + ExitCode int32 } var ( @@ -185,6 +187,8 @@ type DB struct { entryLeaders map[uint32]EntryType entryLeaderRelationships map[uint32]uint32 procfs procfs.Reader + stopChan chan struct{} + removalCandidates rcHeap } func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { @@ -192,13 +196,17 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) { if initError != nil { return &DB{}, initError } - return &DB{ + db := DB{ logger: logp.NewLogger("processdb"), processes: make(map[uint32]Process), entryLeaders: make(map[uint32]EntryType), entryLeaderRelationships: make(map[uint32]uint32), procfs: reader, - }, nil + stopChan: make(chan struct{}), + removalCandidates: make(rcHeap, 0), + } + db.startReaper() + return &db, nil } func (db *DB) calculateEntityIDv1(pid uint32, startTime time.Time) string { @@ -406,9 +414,18 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) { defer db.mutex.Unlock() pid := exit.PIDs.Tgid - delete(db.processes, pid) - delete(db.entryLeaders, pid) - delete(db.entryLeaderRelationships, pid) + process, ok := db.processes[pid] + if !ok { + db.logger.Errorf("could not insert exit, pid %v not found in db", pid) + return + } + process.ExitCode = exit.ExitCode + db.processes[pid] = process + heap.Push(&db.removalCandidates, removalCandidate{ + pid: pid, + startTime: process.PIDs.StartTimeNS, + exitTime: time.Now(), + }) } func interactiveFromTTY(tty types.TTYDev) bool { @@ -437,6 +454,7 @@ func fullProcessFromDBProcess(p Process) types.Process { ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective) ret.TTY.CharDevice.Major = p.CTTY.Major ret.TTY.CharDevice.Minor = p.CTTY.Minor + ret.ExitCode = p.ExitCode return ret } @@ -716,3 +734,7 @@ func (db *DB) scrapeAncestors(proc Process) { db.insertProcess(p) } } + +func (db *DB) Close() { + close(db.stopChan) +} diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go b/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go index 15f98250f55..74140f47f6c 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/entry_leader_test.go @@ -23,6 +23,7 @@ const ( lsPath = "/usr/bin/ls" bashPath = "/usr/bin/bash" grepPath = "/usr/bin/grep" + wPath = "/usr/bin/w" ) // Entry evaluation tests @@ -1242,3 +1243,349 @@ func TestKernelThreads(t *testing.T) { requireProcess(t, db, rcuGpPID, rcuGpPath) requireParent(t, db, rcuGpPID, kthreaddPID) } + +// PIDs can be reused when the maximum PID is reached and the number rolls over. +// The DB should always have the current process representation when PIDs are reused. +// In the same session a process exits and the PID is reused for a new process. +func TestPIDReuseSameSession(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshd0PID := uint32(100) + sshd1PID := uint32(101) + bashPID := uint32(1000) + commandPID := uint32(1001) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: commandPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: commandPID, + }, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: commandPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: commandPID, + }, + ExitCode: 0, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: commandPID, + Sid: bashPID, + Ppid: bashPID, + Pgid: commandPID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd0 + requireProcess(t, db, sshd0PID, sshdPath) + requireParent(t, db, sshd0PID, 1) + requireSessionLeader(t, db, sshd0PID, sshd0PID) + requireEntryLeader(t, db, sshd0PID, sshd0PID, Init) + + // sshd1 + requireProcess(t, db, sshd1PID, sshdPath) + requireParent(t, db, sshd1PID, sshd0PID) + requireSessionLeader(t, db, sshd1PID, sshd1PID) + requireEntryLeader(t, db, sshd1PID, sshd0PID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshd1PID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // grep + requireProcess(t, db, commandPID, grepPath) + requireParent(t, db, commandPID, bashPID) + requireSessionLeader(t, db, commandPID, bashPID) + requireEntryLeader(t, db, commandPID, bashPID, Sshd) +} + +// A new session, where all PIDs have been previously used for other, now exited, processes +func TestPIDReuseNewSession(t *testing.T) { + reader := procfs.NewMockReader() + populateProcfsWithInit(reader) + db, err := NewDB(reader, *logger) + require.Nil(t, err) + db.ScrapeProcfs() + + sshd0PID := uint32(100) + sshd1PID := uint32(101) + bashPID := uint32(1000) + command0PID := uint32(1001) + command1PID := uint32(1002) + command2PID := uint32(1003) + + // 1st session + // sshd0 + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + // sshd1 + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + // bash + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + // command0 + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: lsPath, + PIDs: types.PIDInfo{ + Tgid: command0PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: command0PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + ExitCode: 0, + }) + + // command 1 & 2 in pg + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: command1PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: command2PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: command2PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + ExitCode: 0, + }) + + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: command1PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command1PID, + }, + ExitCode: 0, + }) + + // exit bash + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + ExitCode: 0, + }) + + // exit sshd1 + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + ExitCode: 0, + }) + + // exit sshd0 + db.InsertExit(types.ProcessExitEvent{ + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + ExitCode: 0, + }) + + //2nd session + x1 := bashPID + x2 := sshd0PID + sshd0PID = command0PID + sshd1PID = command1PID + bashPID = command2PID + command0PID = x1 + command1PID = x2 + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd0PID, + Sid: sshd0PID, + Ppid: 1, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: sshdPath, + PIDs: types.PIDInfo{ + Tgid: sshd1PID, + Sid: sshd1PID, + Ppid: sshd0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: bashPath, + PIDs: types.PIDInfo{ + Tgid: bashPID, + Sid: bashPID, + Ppid: sshd1PID, + Pgid: bashPID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: wPath, + PIDs: types.PIDInfo{ + Tgid: command0PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + }) + + insertForkAndExec(t, db, types.ProcessExecEvent{ + Filename: grepPath, + PIDs: types.PIDInfo{ + Tgid: command1PID, + Sid: bashPID, + Ppid: bashPID, + Pgid: command0PID, + }, + }) + + // systemd + systemd, err := db.GetProcess(1) + require.Nil(t, err) + requireParentUnset(t, systemd) + requireEntryLeaderUnset(t, systemd) + + requireProcess(t, db, 1, systemdPath) + requireSessionLeader(t, db, 1, 1) + + // sshd0 + requireProcess(t, db, sshd0PID, sshdPath) + requireParent(t, db, sshd0PID, 1) + requireSessionLeader(t, db, sshd0PID, sshd0PID) + requireEntryLeader(t, db, sshd0PID, sshd0PID, Init) + + // sshd1 + requireProcess(t, db, sshd1PID, sshdPath) + requireParent(t, db, sshd1PID, sshd0PID) + requireSessionLeader(t, db, sshd1PID, sshd1PID) + requireEntryLeader(t, db, sshd1PID, sshd0PID, Init) + + // bash + requireProcess(t, db, bashPID, bashPath) + requireParent(t, db, bashPID, sshd1PID) + requireSessionLeader(t, db, bashPID, bashPID) + requireEntryLeader(t, db, bashPID, bashPID, Sshd) + + // w + requireProcess(t, db, command0PID, wPath) + requireParent(t, db, command0PID, bashPID) + requireSessionLeader(t, db, command0PID, bashPID) + requireEntryLeader(t, db, command0PID, bashPID, Sshd) + requireGroupLeader(t, db, command0PID, command0PID) + + // grep + requireProcess(t, db, command1PID, grepPath) + requireParent(t, db, command1PID, bashPID) + requireSessionLeader(t, db, command1PID, bashPID) + requireEntryLeader(t, db, command1PID, bashPID, Sshd) + requireGroupLeader(t, db, command1PID, command0PID) +} diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go new file mode 100644 index 00000000000..12751bead9c --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/processdb/reaper.go @@ -0,0 +1,109 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux + +package processdb + +import ( + "container/heap" + "time" +) + +const ( + reaperInterval = 30 * time.Second // run the reaper process at this interval + removalTimeout = 10 * time.Second // remove processes that have been exited longer than this +) + +type removalCandidate struct { + pid uint32 + exitTime time.Time + startTime uint64 +} + +type rcHeap []removalCandidate + +func (h rcHeap) Len() int { + return len(h) +} + +func (h rcHeap) Less(i, j int) bool { + return h[i].exitTime.Sub(h[j].exitTime) < 0 +} + +func (h rcHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *rcHeap) Push(x any) { + v, ok := x.(removalCandidate) + if ok { + *h = append(*h, v) + } +} + +func (h *rcHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// The reaper will remove exited processes from the DB a short time after they have exited. +// Processes cannot be removed immediately when exiting, as the event enrichment will happen sometime +// afterwards, and will fail if the process is already removed from the DB. +// +// In Linux, exited processes cannot be session leader, process group leader or parent, so if a process has exited, +// it cannot have a relation with any other longer-lived processes. If this processor is ported to other OSs, this +// assumption will need to be revisited. +func (db *DB) startReaper() { + go func(db *DB) { + ticker := time.NewTicker(reaperInterval) + defer ticker.Stop() + + h := &db.removalCandidates + heap.Init(h) + for { + select { + case <-ticker.C: + db.mutex.Lock() + now := time.Now() + for { + if len(db.removalCandidates) == 0 { + break + } + v := heap.Pop(h) + c, ok := v.(removalCandidate) + if !ok { + db.logger.Debugf("unexpected item in removal queue: \"%v\"", v) + continue + } + if now.Sub(c.exitTime) < removalTimeout { + // this candidate hasn't reached its timeout, put it back on the heap + // everything else will have a later exit time, so end this run + heap.Push(h, c) + break + } + p, ok := db.processes[c.pid] + if !ok { + db.logger.Debugf("pid %v was candidate for removal, but was already removed", c.pid) + continue + } + if p.PIDs.StartTimeNS != c.startTime { + // this could happen if the PID has already rolled over and reached this PID again. + db.logger.Debugf("start times of removal candidate %v differs, not removing (PID had been reused?)", c.pid) + continue + } + delete(db.processes, c.pid) + delete(db.entryLeaders, c.pid) + delete(db.entryLeaderRelationships, c.pid) + } + db.mutex.Unlock() + case <-db.stopChan: + return + } + } + }(db) +} diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go index a0d11ad18e5..2f99dd72b1f 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go @@ -69,7 +69,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { pe.Env = proc_info.Env pe.Filename = proc_info.Filename } else { - s.logger.Errorf("get process info from proc for pid %v: %w", pid, err) + s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err) // If process info couldn't be taken from procfs, populate with as much info as // possible from the event pe.PIDs.Tgid = uint32(pid) diff --git a/x-pack/auditbeat/processors/sessionmd/types/process.go b/x-pack/auditbeat/processors/sessionmd/types/process.go index 232dd348de8..daf989ef3cd 100644 --- a/x-pack/auditbeat/processors/sessionmd/types/process.go +++ b/x-pack/auditbeat/processors/sessionmd/types/process.go @@ -33,7 +33,7 @@ type Process struct { // The exit code of the process, if this is a termination event. // The field should be absent if there is no exit code for the event (e.g. process start). - ExitCode *int64 `json:"exit_code,omitempty"` + ExitCode int32 `json:"exit_code,omitempty"` // Whether the process is connected to an interactive shell. // Process interactivity is inferred from the processes file descriptors. If the character device for the controlling tty is the same as stdin and stderr for the process, the process is considered interactive.