Skip to content

Commit

Permalink
ci: extend test_file_integrity.py to test kprobes backend of file int…
Browse files Browse the repository at this point in the history
…egrity module
  • Loading branch information
pkoutsovasilis committed Jan 31, 2024
1 parent d82972f commit 46ef848
Showing 1 changed file with 147 additions and 39 deletions.
186 changes: 147 additions & 39 deletions auditbeat/tests/system/test_file_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
from auditbeat import *


if platform.system() == 'Linux':
fim_backends = ["fsnotify", "kprobes"]
else:
fim_backends = ["fsnotify"]


# Escapes a path to match what's printed in the logs
def escape_path(path):
return path.replace('\\', '\\\\')
Expand All @@ -13,7 +19,7 @@ def has_file(objs, path, sha1hash):
found = False
for obj in objs:
if 'file.path' in obj and 'file.hash.sha1' in obj \
and obj['file.path'].lower() == path.lower() and obj['file.hash.sha1'] == sha1hash:
and obj['file.path'].lower() == path.lower() and obj['file.hash.sha1'] == sha1hash:
found = True
break
assert found, "File '{0}' with sha1sum '{1}' not found".format(path, sha1hash)
Expand Down Expand Up @@ -52,19 +58,27 @@ class Test(BaseTest):

def wait_output(self, min_events):
self.wait_until(lambda: wrap_except(lambda: len(self.read_output()) >= min_events))
# wait for the number of lines in the file to stay constant for a second
# wait for the number of lines in the file to stay constant for 10 seconds
prev_lines = -1
while True:
num_lines = self.output_lines()
if prev_lines < num_lines:
prev_lines = num_lines
time.sleep(1)
time.sleep(10)
else:
break

@unittest.skipIf(os.getenv("CI") is not None and platform.system() == 'Darwin',
'Flaky test: https://github.com/elastic/beats/issues/24678')
def test_non_recursive(self):
def wait_startup(self, backend, dir):
if backend == "kprobes":
self.wait_log_contains("Started kprobes watcher", max_timeout=30, ignore_case=True)
else:
# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts.
# Case must be ignored under windows as capitalisation of paths
# may differ
self.wait_log_contains(escape_path(dir), max_timeout=30, ignore_case=True)

def _test_non_recursive(self, backend):
"""
file_integrity monitors watched directories (non recursive).
"""
Expand All @@ -73,22 +87,21 @@ def test_non_recursive(self):
self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):
extras = {
"paths": dirs,
"scan_at_start": False
}
if platform.system() == "Linux":
extras["force_backend"] = backend

self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False
}
"extras": extras
}],
)
proc = self.start_beat()

# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts.
# Case must be ignored under windows as capitalisation of paths
# may differ
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)
self.wait_startup(backend, dirs[0])

file1 = os.path.join(dirs[0], 'file.txt')
self.create_file(file1, "hello world!")
Expand All @@ -109,10 +122,12 @@ def test_non_recursive(self):

# log entries are JSON formatted, this value shows up as an escaped json string.
self.wait_log_contains("\\\"deleted\\\"")
self.wait_log_contains("\"path\":\"{0}\"".format(escape_path(subdir)), ignore_case=True)
self.wait_output(3)
self.wait_until(lambda: any(
'file.path' in obj and obj['file.path'].lower() == subdir.lower() for obj in self.read_output()))

if backend == "fsnotify" or backend == "kprobes":
self.wait_output(4)
else:
# ebpf backend doesn't catch directory creation
self.wait_output(3)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()
Expand All @@ -126,39 +141,49 @@ def test_non_recursive(self):

has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169")
has_file(objs, file2, "d23be250530a24be33069572db67995f21244c51")
has_dir(objs, subdir)
if backend == "fsnotify" or backend == "kprobes":
has_dir(objs, subdir)

file_events(objs, file1, ['created', 'deleted'])
file_events(objs, file2, ['created'])

# assert file inside subdir is not reported
assert self.log_contains(file3) is False

@unittest.skipIf(os.getenv("BUILD_ID") is not None, "Skipped as flaky: https://github.com/elastic/beats/issues/7731")
def test_recursive(self):
@unittest.skipIf(os.getenv("CI") is not None and platform.system() == 'Darwin',
'Flaky test: https://github.com/elastic/beats/issues/24678')
def test_non_recursive_fsnotify(self):
self._test_non_recursive("fsnotify")

@unittest.skipIf(platform.system() != 'Linux', 'Non linux, skipping.')
def test_non_recursive_kprobes(self):
self._test_non_recursive("kprobes")

def _test_recursive(self, backend):
"""
file_integrity monitors watched directories (recursive).
"""

dirs = [self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):
extras = {
"paths": dirs,
"scan_at_start": False,
"recursive": True
}

if platform.system() == "Linux":
extras["force_backend"] = backend

self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False,
"recursive": True
}
"extras": extras
}],
)
proc = self.start_beat()

# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)
self.wait_log_contains("\"recursive\":true")
self.wait_startup(backend, dirs[0])

# auditbeat_test/subdir/
subdir = os.path.join(dirs[0], "subdir")
Expand All @@ -174,10 +199,13 @@ def test_recursive(self):
file2 = os.path.join(subdir2, "more.txt")
self.create_file(file2, "")

self.wait_log_contains("\"path\":\"{0}\"".format(escape_path(file2)), ignore_case=True)
self.wait_output(4)
self.wait_until(lambda: any(
'file.path' in obj and obj['file.path'].lower() == subdir2.lower() for obj in self.read_output()))
if backend == "fsnotify" or backend == "kprobes":
self.wait_output(4)
self.wait_until(lambda: any(
'file.path' in obj and obj['file.path'].lower() == subdir2.lower() for obj in self.read_output()))
else:
# ebpf backend doesn't catch directory creation
self.wait_output(2)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()
Expand All @@ -191,8 +219,88 @@ def test_recursive(self):

has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169")
has_file(objs, file2, "da39a3ee5e6b4b0d3255bfef95601890afd80709")
has_dir(objs, subdir)
has_dir(objs, subdir2)
if backend == "fsnotify" or backend == "kprobes":
has_dir(objs, subdir)
has_dir(objs, subdir2)

file_events(objs, file1, ['created'])
file_events(objs, file2, ['created'])

def test_recursive_fsnotify(self):
self._test_recursive("fsnotify")

@unittest.skipIf(platform.system() != 'Linux', 'Non linux, skipping.')
def test_recursive_kprobes(self):
self._test_recursive("kprobes")

def _test_file_modified(self, backend):
"""
file_integrity tests for file modifications (chmod, chown, write, truncate, xattrs).
"""

dirs = [self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):

self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False,
"recursive": False,
"force_backend": backend
}
}],
)
proc = self.start_beat()
self.wait_startup(backend, dirs[0])

# Event 1: file create
f = os.path.join(dirs[0], f'file_{backend}.txt')
self.create_file(f, "hello world!")

if backend == "fsnotify" or backend == "kprobes":
# FSNotify can't catch the events if operations happens too fast
time.sleep(1)

# Event 2: chmod
os.chmod(f, 0o777)

if backend == "fsnotify" or backend == "kprobes":
# FSNotify can't catch the events if operations happens too fast
time.sleep(1)

with open(f, "w") as fd:
# Event 3: write
fd.write("data")

if backend == "fsnotify" or backend == "kprobes":
# FSNotify can't catch the events if operations happens too fast
time.sleep(1)

# Event 4: truncate
fd.truncate(0)

if backend == "fsnotify" or backend == "kprobes":
# FSNotify can't catch the events if operations happens too fast
time.sleep(1)

# Wait N events
self.wait_output(4)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()

# Ensure all Beater stages are used.
assert self.log_contains("Setup Beat: auditbeat")
assert self.log_contains("auditbeat start running")
assert self.log_contains("auditbeat stopped")

@unittest.skipIf(platform.system() != 'Linux', 'Non linux, skipping.')
def test_file_modified_fsnotify(self):
self._test_file_modified("fsnotify")

@unittest.skipIf(platform.system() != 'Linux', 'Non linux, skipping.')
def test_file_modified_kprobes(self):
self._test_file_modified("kprobes")

0 comments on commit 46ef848

Please sign in to comment.