Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format code with autopep8 #13

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 60 additions & 51 deletions plugins/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,35 @@

# inode types
# see https://elixir.bootlin.com/linux/latest/source/include/uapi/linux/stat.h
S_IFMT = 0o170000 # inode type mask
S_IFSOCK = 0o140000 # socket
S_IFLNK = 0o120000 # symbolic link
S_IFREG = 0o100000 # regular file
S_IFBLK = 0o60000 # block device
S_IFDIR = 0o40000 # directory
S_IFCHR = 0o20000 # character device
S_IFIFO = 0o10000 # fifo (pipe)
S_ISUID = 0o4000
S_ISGID = 0o2000
S_ISVTX = 0o1000
S_IFMT = 0o170000 # inode type mask
S_IFSOCK = 0o140000 # socket
S_IFLNK = 0o120000 # symbolic link
S_IFREG = 0o100000 # regular file
S_IFBLK = 0o60000 # block device
S_IFDIR = 0o40000 # directory
S_IFCHR = 0o20000 # character device
S_IFIFO = 0o10000 # fifo (pipe)
S_ISUID = 0o4000
S_ISGID = 0o2000
S_ISVTX = 0o1000

# user permissions
S_IRWXU = 0o700 # user permissions mask
S_IRUSR = 0o400 # user read
S_IWUSR = 0o200 # user write
S_IXUSR = 0o100 # user execute
S_IRWXU = 0o700 # user permissions mask
S_IRUSR = 0o400 # user read
S_IWUSR = 0o200 # user write
S_IXUSR = 0o100 # user execute

# group permissions
S_IRWXG = 0o070 # group permissions mask
S_IRGRP = 0o040 # group read
S_IWGRP = 0o020 # group write
S_IXGRP = 0o010 # group execute
S_IRWXG = 0o070 # group permissions mask
S_IRGRP = 0o040 # group read
S_IWGRP = 0o020 # group write
S_IXGRP = 0o010 # group execute

# other permissions
S_IRWXO = 0o007 # other permissions mask
S_IROTH = 0o004 # other read
S_IWOTH = 0o002 # other write
S_IXOTH = 0o001 # other execute
S_IRWXO = 0o007 # other permissions mask
S_IROTH = 0o004 # other read
S_IWOTH = 0o002 # other write
S_IXOTH = 0o001 # other execute


vollog = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,7 +83,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
description='Sort files by path',
optional=True)
]

@classmethod
def create_mount_filter(cls, mnt_list: List[int] = None) -> Callable[[Any], bool]:
"""Constructs a filter function for mount IDs.
Expand All @@ -104,7 +104,7 @@ def filter_func(mount):
return filter_func
else:
return lambda _: False

@classmethod
def create_path_filter(cls, path) -> Callable[[Any], bool]:
"""Constructs a filter function for file paths.
Expand All @@ -118,9 +118,9 @@ def create_path_filter(cls, path) -> Callable[[Any], bool]:

def filter_func(x):
return not x.startswith(path)

return filter_func

@classmethod
def create_uid_filter(cls, uid_list: List[int] = None) -> Callable[[Any], bool]:
"""Constructs a filter function for owner UIDs.
Expand All @@ -137,13 +137,13 @@ def create_uid_filter(cls, uid_list: List[int] = None) -> Callable[[Any], bool]:

def filter_func(uid):
return uid not in filter_list

return filter_func
else:
return lambda _: False

@classmethod
def _mode_to_str(cls, mode:int) -> str:
def _mode_to_str(cls, mode: int) -> str:
"""Calculate the mode string (see http://cvsweb.netbsd.org/bsdweb.cgi/src/lib/libc/string/strmode.c?rev=1.16&content-type=text/x-cvsweb-markup)"""
string = ''

Expand All @@ -165,7 +165,7 @@ def _mode_to_str(cls, mode:int) -> str:
string += 'p'
else:
string += '?'

# get user permissions
string += 'r' if mode & S_IRUSR else '-'
string += 'w' if mode & S_IWUSR else '-'
Expand All @@ -178,7 +178,7 @@ def _mode_to_str(cls, mode:int) -> str:
string += 'S'
elif user_execute == S_IXUSR | S_ISUID:
string += 's'

# get group permissions
string += 'r' if mode & S_IRGRP else '-'
string += 'w' if mode & S_IWGRP else '-'
Expand Down Expand Up @@ -219,7 +219,8 @@ def get_file_info(cls,
A mount and task are required for path calculation.
"""
# get file path
path = symbols.linux.LinuxUtilities.prepend_path(dentry, mount, task.fs.root)
path = symbols.linux.LinuxUtilities.prepend_path(
dentry, mount, task.fs.root)
if path is None:
path = ''

Expand All @@ -236,7 +237,7 @@ def get_file_info(cls,
inode_id = -1
inode_addr = 0
inode = None

# get file info
mode = ''
uid = -1
Expand All @@ -262,7 +263,7 @@ def get_file_info(cls,
accessed = inode.i_atime.tv_sec

return mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, path

@classmethod
def _walk_dentry(cls,
context: interfaces.context.ContextInterface,
Expand All @@ -289,8 +290,9 @@ def _walk_dentry(cls,
# walk subdirs linked list
for subdir_dentry in dentry.d_subdirs.to_list(symbol_table + constants.BANG + 'dentry', 'd_child'):
# walk subdir dentry
cls._walk_dentry(context, vmlinux_module_name, dentry_set, subdir_dentry)

cls._walk_dentry(context, vmlinux_module_name,
dentry_set, subdir_dentry)

@classmethod
def get_dentries(cls,
context: interfaces.context.ContextInterface,
Expand All @@ -308,46 +310,51 @@ def get_dentries(cls,
if pid_filter is None:
pid_filter = pslist.PsList.create_pid_filter([1])

non_filtered_mounts = mount_plugin.Mount.get_mounts(context, vmlinux_module_name, pid_filter)

non_filtered_mounts = mount_plugin.Mount.get_mounts(
context, vmlinux_module_name, pid_filter)

# filter out mounts
mounts = [(task, mount) for task, mount in non_filtered_mounts if not mnt_filter(mount)]
mounts = [(task, mount)
for task, mount in non_filtered_mounts if not mnt_filter(mount)]
num_mounts = len(mounts)

for i, (task, mount) in enumerate(mounts):
vollog.info(f'[{i}/{num_mounts}] listing files for mount ID {mount.mnt_id}')

vollog.info(
f'[{i}/{num_mounts}] listing files for mount ID {mount.mnt_id}')

# set of dentry addresses for this mount
mount_dentries = set()

# get the root dentry of this mount
root_dentry = mount.get_mnt_root().dereference()

# walk root dentry and extract all dentries recursively
cls._walk_dentry(context, vmlinux_module_name, mount_dentries, root_dentry)
cls._walk_dentry(context, vmlinux_module_name,
mount_dentries, root_dentry)

# add dentries for this mount to global list
for dentry_ptr in mount_dentries:
dentry = vmlinux.object(object_type='dentry', offset=dentry_ptr, absolute=True)
dentry = vmlinux.object(
object_type='dentry', offset=dentry_ptr, absolute=True)
dentries.append((task, mount, dentry))

return dentries

def _generator(self):
# create path and UID filters
path_filter = self.create_path_filter(self.config.get('path', None))
uid_filter = self.create_uid_filter(self.config.get('uid', None))

# get requested PIDs
pids = self.config.get('pid')

# if a mount list was specified but PID list wasn't, extract mounts from all PIDs
if self.config.get('mount') and not pids:
# get PIDs of all tasks
pids = []
for task in pslist.PsList.list_tasks(self.context, self.config['kernel']):
pids.append(task.pid)

# build PID filter
if pids:
pid_filter = pslist.PsList.create_pid_filter(pids)
Expand All @@ -367,7 +374,8 @@ def _generator(self):
for i, (task, mount, dentry) in enumerate(dentries):
# print info message every 1000 files
if i % 1000 == 0:
vollog.info(f'[{i}/{num_dentries}] extracting file info and filtering paths')
vollog.info(
f'[{i}/{num_dentries}] extracting file info and filtering paths')

info = self.get_file_info(task, mount, dentry)
# info could not be extracted
Expand All @@ -378,16 +386,17 @@ def _generator(self):
# apply path and UID filters
if not path_filter(file_path) and not uid_filter(uid):
files[file_path] = mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, file_path

paths = list(files.keys())
# sort files by path
if self.config.get('sort', None):
vollog.info('sorting files')
paths.sort()
vollog.info('done sorting')
for path in paths:
mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, file_path = files[path]
mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, file_path = files[
path]
yield (0, (mnt_id, inode_id, format_hints.Hex(inode_addr), mode, uid, gid, size, created, modified, accessed, file_path))

def run(self):
return renderers.TreeGrid([('Mount ID', int), ('Inode ID', int), ('Inode Address', format_hints.Hex), ('Mode', str), ('UID', int), ('GID', int), ('Size', int), ('Created', int), ('Modified', int), ('Accessed', int), ('File Path', str)], self._generator())
Loading