Skip to content

Commit

Permalink
[Auditbeat][add_session_metadata processor] Fix more potential enrich…
Browse files Browse the repository at this point in the history
…ment failures (#39243)

Fix two more cases that could cause unenriched processes in the add_session_metadata processor.

It was possible for auditd events to arrive before the ebpf event added processes to the process DB, now the enrichment will wait for the process to be inserted into the DB, if it's not already before enrichment is run on it. Also stop attempting to enrich failed syscall events, and modifying the DB based on these.

Changes:

With the ebpf backend, when an event is processed wait for a process to be added to the DB before enriching, if it's not already in the DB before the event is received.
Do not enrich failed syscall auditd events. Since failed syscalls don't actually cause a process to be created, they should not be enriched, or inserted to the process
Remove scrapeAncestors from DB. The intention of this was to fill in missed processes, but now processes should not be missed with epbf, and ineffective with procfs, as the process will most likely already be ended. This was causing DB inconsistancies when run on failed syscall events, and I haven't ever seen any cases where it's helpful now.
  • Loading branch information
mjwolf authored May 1, 2024
1 parent 02ea29d commit ffcd181
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 60 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Auditbeat*
- Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994]
- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173]

- Fix failing to enrich process events in sessionmd processor {issue}38955[38955] {pull}39173[39173] {pull}39243[39243]
- Prevent scenario of losing children-related file events in a directory for recursive fsnotify backend of auditbeat file integrity module {pull}39133[39133]


Expand Down
19 changes: 16 additions & 3 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,24 @@ func New(cfg *cfg.C) (beat.Processor, error) {
}

func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
_, err := ev.GetValue(p.config.PIDField)
pi, err := ev.GetValue(p.config.PIDField)
if err != nil {
// Do not attempt to enrich events without PID; it's not a supported event
return ev, nil //nolint:nilerr // Running on events without PID is expected
}

err = p.provider.UpdateDB(ev)
// Do not enrich failed syscalls, as there was no actual process change related to it
v, err := ev.GetValue("auditd.result")
if err == nil && v == "fail" {
return ev, nil
}

pid, err := pidToUInt32(pi)
if err != nil {
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
}

err = p.provider.UpdateDB(ev, pid)
if err != nil {
return ev, err
}
Expand Down Expand Up @@ -136,7 +147,9 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {

fullProcess, err := p.db.GetProcess(pid)
if err != nil {
return nil, fmt.Errorf("pid %v not found in db: %w", pid, err)
e := fmt.Errorf("pid %v not found in db: %w", pid, err)
p.logger.Errorf("%v", e)
return nil, e
}

processMap := fullProcess.ToMap()
Expand Down
39 changes: 8 additions & 31 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) {

pid := fork.ChildPIDs.Tgid
ppid := fork.ParentPIDs.Tgid
db.scrapeAncestors(db.processes[pid])

if entry, ok := db.processes[ppid]; ok {
entry.PIDs = pidInfoFromProto(fork.ChildPIDs)
Expand Down Expand Up @@ -282,7 +281,6 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) {
}

db.processes[exec.PIDs.Tgid] = proc
db.scrapeAncestors(proc)
entryLeaderPID := db.evaluateEntryLeader(proc)
if entryLeaderPID != nil {
db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID
Expand Down Expand Up @@ -568,6 +566,14 @@ func setSameAsProcess(process *types.Process) {
}
}

func (db *DB) HasProcess(pid uint32) bool {
db.mutex.RLock()
defer db.mutex.RUnlock()

_, ok := db.processes[pid]
return ok
}

func (db *DB) GetProcess(pid uint32) (types.Process, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()
Expand All @@ -585,8 +591,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillParent(&ret, parent)
break
}
db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid)
db.scrapeAncestors(process)
}
}

Expand All @@ -596,8 +600,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillGroupLeader(&ret, groupLeader)
break
}
db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid)
db.scrapeAncestors(process)
}
}

Expand All @@ -607,8 +609,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillSessionLeader(&ret, sessionLeader)
break
}
db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid)
db.scrapeAncestors(process)
}
}

Expand Down Expand Up @@ -712,29 +712,6 @@ func getTTYType(major uint16, minor uint16) TTYType {
return TTYUnknown
}

func (db *DB) scrapeAncestors(proc Process) {
for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} {
if _, exists := db.processes[pid]; pid == 0 || exists {
continue
}
procInfo, err := db.procfs.GetProcess(pid)
if err != nil {
db.logger.Debugf("couldn't get %v from procfs: %w", pid, err)
continue
}
p := Process{
PIDs: pidInfoFromProto(procInfo.PIDs),
Creds: credInfoFromProto(procInfo.Creds),
CTTY: ttyDevFromProto(procInfo.CTTY),
Argv: procInfo.Argv,
Cwd: procInfo.Cwd,
Env: procInfo.Env,
Filename: procInfo.Filename,
}
db.insertProcess(p)
}
}

func (db *DB) Close() {
close(db.stopChan)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package ebpf_provider
import (
"context"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/ebpf"
Expand Down Expand Up @@ -151,7 +152,80 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr
return &p, nil
}

func (s prvdr) UpdateDB(ev *beat.Event) error {
// no-op for ebpf, DB is updated from pushed ebpf events
return nil
const (
maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process
combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration
backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time
resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset
)

var (
combinedWait = 0 * time.Millisecond
inBackoff = false
backoffStart = time.Now()
since = time.Now()
backoffSkipped = 0
)

// With ebpf, process events are pushed to the DB by the above goroutine, so this doesn't actually update the DB.
// It does to try sync the processor and ebpf events, so that the process is in the process db before continuing.
//
// It's possible that the event to enrich arrives before the process is inserted into the DB. In that case, this
// will block continuing the enrichment until the process is seen (or the timeout is reached).
//
// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during
// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up
// waiting for these processes, at the cost of possibly not enriching some processes.
func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
if s.db.HasProcess(pid) {
return nil
}

now := time.Now()
if inBackoff {
if now.Sub(backoffStart) > backoffDuration {
s.logger.Warnf("ended backoff, skipped %d processes", backoffSkipped)
inBackoff = false
combinedWait = 0 * time.Millisecond
} else {
backoffSkipped += 1
return nil
}
} else {
if combinedWait > combinedWaitLimit {
s.logger.Warn("starting backoff")
inBackoff = true
backoffStart = now
backoffSkipped = 0
return nil
}
// maintain a moving window of time for the delays we track
if now.Sub(since) > resetDuration {
since = now
combinedWait = 0 * time.Millisecond
}
}

start := now
nextWait := 5 * time.Millisecond
for {
waited := time.Since(start)
if s.db.HasProcess(pid) {
s.logger.Debugf("got process that was missing after %v", waited)
combinedWait = combinedWait + waited
return nil
}
if waited >= maxWaitLimit {
e := fmt.Errorf("process %v was not seen after %v", pid, waited)
s.logger.Warnf("%w", e)
combinedWait = combinedWait + waited
return e
}
time.Sleep(nextWait)
if nextWait*2+waited > maxWaitLimit {
nextWait = maxWaitLimit - waited
} else {
nextWait = nextWait * 2
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea
}

// UpdateDB will update the process DB with process info from procfs or the event itself
func (s prvdr) UpdateDB(ev *beat.Event) error {
pi, err := ev.Fields.GetValue(s.pidField)
if err != nil {
return fmt.Errorf("event not supported, no pid")
}
pid, ok := pi.(int)
if !ok {
return fmt.Errorf("pid field not int")
}

func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
syscall, err := ev.GetValue(syscallField)
if err != nil {
return fmt.Errorf("event not supported, no syscall data")
Expand All @@ -59,7 +50,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
switch syscall {
case "execveat", "execve":
pe := types.ProcessExecEvent{}
proc_info, err := s.reader.GetProcess(uint32(pid))
proc_info, err := s.reader.GetProcess(pid)
if err == nil {
pe.PIDs = proc_info.PIDs
pe.Creds = proc_info.Creds
Expand All @@ -72,7 +63,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err)
// If process info couldn't be taken from procfs, populate with as much info as
// possible from the event
pe.PIDs.Tgid = uint32(pid)
pe.PIDs.Tgid = pid
var intr interface{}
var i int
var ok bool
Expand Down Expand Up @@ -106,7 +97,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
case "exit_group":
pe := types.ProcessExitEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Tgid: pid,
},
}
s.db.InsertExit(pe)
Expand All @@ -122,8 +113,8 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
if result == "success" {
setsid_ev := types.ProcessSetsidEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Sid: uint32(pid),
Tgid: pid,
Sid: pid,
},
}
s.db.InsertSetsid(setsid_ev)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/auditbeat/processors/sessionmd/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ import (
)

type Provider interface {
UpdateDB(*beat.Event) error
UpdateDB(*beat.Event, uint32) error
}

0 comments on commit ffcd181

Please sign in to comment.