Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Format code with autopep8 #17

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 60 additions & 51 deletions plugins/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,35 @@

# inode types
# see https://elixir.bootlin.com/linux/latest/source/include/uapi/linux/stat.h
S_IFMT = 0o170000 # inode type mask
S_IFSOCK = 0o140000 # socket
S_IFLNK = 0o120000 # symbolic link
S_IFREG = 0o100000 # regular file
S_IFBLK = 0o60000 # block device
S_IFDIR = 0o40000 # directory
S_IFCHR = 0o20000 # character device
S_IFIFO = 0o10000 # fifo (pipe)
S_ISUID = 0o4000
S_ISGID = 0o2000
S_ISVTX = 0o1000
S_IFMT = 0o170000 # inode type mask
S_IFSOCK = 0o140000 # socket
S_IFLNK = 0o120000 # symbolic link
S_IFREG = 0o100000 # regular file
S_IFBLK = 0o60000 # block device
S_IFDIR = 0o40000 # directory
S_IFCHR = 0o20000 # character device
S_IFIFO = 0o10000 # fifo (pipe)
S_ISUID = 0o4000
S_ISGID = 0o2000
S_ISVTX = 0o1000

# user permissions
S_IRWXU = 0o700 # user permissions mask
S_IRUSR = 0o400 # user read
S_IWUSR = 0o200 # user write
S_IXUSR = 0o100 # user execute
S_IRWXU = 0o700 # user permissions mask
S_IRUSR = 0o400 # user read
S_IWUSR = 0o200 # user write
S_IXUSR = 0o100 # user execute

# group permissions
S_IRWXG = 0o070 # group permissions mask
S_IRGRP = 0o040 # group read
S_IWGRP = 0o020 # group write
S_IXGRP = 0o010 # group execute
S_IRWXG = 0o070 # group permissions mask
S_IRGRP = 0o040 # group read
S_IWGRP = 0o020 # group write
S_IXGRP = 0o010 # group execute

# other permissions
S_IRWXO = 0o007 # other permissions mask
S_IROTH = 0o004 # other read
S_IWOTH = 0o002 # other write
S_IXOTH = 0o001 # other execute
S_IRWXO = 0o007 # other permissions mask
S_IROTH = 0o004 # other read
S_IWOTH = 0o002 # other write
S_IXOTH = 0o001 # other execute


vollog = logging.getLogger(__name__)
Expand Down Expand Up @@ -83,7 +83,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
description='Sort files by path',
optional=True)
]

@classmethod
def create_mount_filter(cls, mnt_list: List[int] = None) -> Callable[[Any], bool]:
"""Constructs a filter function for mount IDs.
Expand All @@ -104,7 +104,7 @@ def filter_func(mount):
return filter_func
else:
return lambda _: False

@classmethod
def create_path_filter(cls, path) -> Callable[[Any], bool]:
"""Constructs a filter function for file paths.
Expand All @@ -118,9 +118,9 @@ def create_path_filter(cls, path) -> Callable[[Any], bool]:

def filter_func(x):
return not x.startswith(path)

return filter_func

@classmethod
def create_uid_filter(cls, uid_list: List[int] = None) -> Callable[[Any], bool]:
"""Constructs a filter function for owner UIDs.
Expand All @@ -137,13 +137,13 @@ def create_uid_filter(cls, uid_list: List[int] = None) -> Callable[[Any], bool]:

def filter_func(uid):
return uid not in filter_list

return filter_func
else:
return lambda _: False

@classmethod
def _mode_to_str(cls, mode:int) -> str:
def _mode_to_str(cls, mode: int) -> str:
"""Calculate the mode string (see http://cvsweb.netbsd.org/bsdweb.cgi/src/lib/libc/string/strmode.c?rev=1.16&content-type=text/x-cvsweb-markup)"""
string = ''

Expand All @@ -165,7 +165,7 @@ def _mode_to_str(cls, mode:int) -> str:
string += 'p'
else:
string += '?'

# get user permissions
string += 'r' if mode & S_IRUSR else '-'
string += 'w' if mode & S_IWUSR else '-'
Expand All @@ -178,7 +178,7 @@ def _mode_to_str(cls, mode:int) -> str:
string += 'S'
elif user_execute == S_IXUSR | S_ISUID:
string += 's'

# get group permissions
string += 'r' if mode & S_IRGRP else '-'
string += 'w' if mode & S_IWGRP else '-'
Expand Down Expand Up @@ -220,7 +220,8 @@ def get_file_info(cls,
"""
# get file path
try:
path = symbols.linux.LinuxUtilities.prepend_path(dentry, mount, task.fs.root)
path = symbols.linux.LinuxUtilities.prepend_path(
dentry, mount, task.fs.root)
except exceptions.PagedInvalidAddressException:
path = ''
else:
Expand All @@ -240,7 +241,7 @@ def get_file_info(cls,
inode_id = -1
inode_addr = 0
inode = None

# get file info
mode = ''
uid = -1
Expand Down Expand Up @@ -272,7 +273,7 @@ def get_file_info(cls,
accessed = inode.i_atime.tv_sec

return mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, path

@classmethod
def _walk_dentry(cls,
context: interfaces.context.ContextInterface,
Expand Down Expand Up @@ -300,8 +301,9 @@ def _walk_dentry(cls,
field_name = 'd_child' if dentry.has_member('d_child') else 'd_u'
for subdir_dentry in dentry.d_subdirs.to_list(symbol_table + constants.BANG + 'dentry', field_name):
# walk subdir dentry
cls._walk_dentry(context, vmlinux_module_name, dentry_set, subdir_dentry)

cls._walk_dentry(context, vmlinux_module_name,
dentry_set, subdir_dentry)

@classmethod
def get_dentries(cls,
context: interfaces.context.ContextInterface,
Expand All @@ -319,46 +321,51 @@ def get_dentries(cls,
if pid_filter is None:
pid_filter = pslist.PsList.create_pid_filter([1])

non_filtered_mounts = mount_plugin.Mount.get_mounts(context, vmlinux_module_name, pid_filter)

non_filtered_mounts = mount_plugin.Mount.get_mounts(
context, vmlinux_module_name, pid_filter)

# filter out mounts
mounts = [(task, mount) for task, mount in non_filtered_mounts if not mnt_filter(mount)]
mounts = [(task, mount)
for task, mount in non_filtered_mounts if not mnt_filter(mount)]
num_mounts = len(mounts)

for i, (task, mount) in enumerate(mounts):
vollog.info(f'[{i}/{num_mounts}] listing files for mount ID {mount.mnt_id}')

vollog.info(
f'[{i}/{num_mounts}] listing files for mount ID {mount.mnt_id}')

# set of dentry addresses for this mount
mount_dentries = set()

# get the root dentry of this mount
root_dentry = mount.get_mnt_root().dereference()

# walk root dentry and extract all dentries recursively
cls._walk_dentry(context, vmlinux_module_name, mount_dentries, root_dentry)
cls._walk_dentry(context, vmlinux_module_name,
mount_dentries, root_dentry)

# add dentries for this mount to global list
for dentry_ptr in mount_dentries:
dentry = vmlinux.object(object_type='dentry', offset=dentry_ptr, absolute=True)
dentry = vmlinux.object(
object_type='dentry', offset=dentry_ptr, absolute=True)
dentries.append((task, mount, dentry))

return dentries

def _generator(self):
# create path and UID filters
path_filter = self.create_path_filter(self.config.get('path', None))
uid_filter = self.create_uid_filter(self.config.get('uid', None))

# get requested PIDs
pids = self.config.get('pid')

# if a mount list was specified but PID list wasn't, extract mounts from all PIDs
if self.config.get('mount') and not pids:
# get PIDs of all tasks
pids = []
for task in pslist.PsList.list_tasks(self.context, self.config['kernel']):
pids.append(task.pid)

# build PID filter
if pids:
pid_filter = pslist.PsList.create_pid_filter(pids)
Expand All @@ -378,7 +385,8 @@ def _generator(self):
for i, (task, mount, dentry) in enumerate(dentries):
# print info message every 1000 files
if i % 1000 == 0:
vollog.info(f'[{i}/{num_dentries}] extracting file info and filtering paths')
vollog.info(
f'[{i}/{num_dentries}] extracting file info and filtering paths')

info = self.get_file_info(task, mount, dentry)
# info could not be extracted
Expand All @@ -389,16 +397,17 @@ def _generator(self):
# apply path and UID filters
if not path_filter(file_path) and not uid_filter(uid):
files[file_path] = mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, file_path

paths = list(files.keys())
# sort files by path
if self.config.get('sort', None):
vollog.info('sorting files')
paths.sort()
vollog.info('done sorting')
for path in paths:
mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, file_path = files[path]
mnt_id, inode_id, inode_addr, mode, uid, gid, size, created, modified, accessed, file_path = files[
path]
yield (0, (mnt_id, inode_id, format_hints.Hex(inode_addr), mode, uid, gid, size, created, modified, accessed, file_path))

def run(self):
return renderers.TreeGrid([('Mount ID', int), ('Inode ID', int), ('Inode Address', format_hints.Hex), ('Mode', str), ('UID', int), ('GID', int), ('Size', int), ('Created', int), ('Modified', int), ('Accessed', int), ('File Path', str)], self._generator())
41 changes: 24 additions & 17 deletions plugins/ifconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
]

@classmethod
def _get_devs_namespaces(cls,
def _get_devs_namespaces(cls,
context: interfaces.context.ContextInterface,
vmlinux_module_name: str) -> Iterable[Tuple[int, symbols.linux.extensions.net_device]]:
"""Walk the list of net namespaces and extract all net devices from them (kernel >= 2.6.24)."""
vmlinux = context.modules[vmlinux_module_name]
symbol_table = vmlinux.symbol_table_name

net_namespace_list = vmlinux.object_from_symbol(symbol_name='net_namespace_list')

# enumerate each network namespace (struct net) in memory and pass the first one
net_namespace_list = vmlinux.object_from_symbol(
symbol_name='net_namespace_list')

# enumerate each network namespace (struct net) in memory and pass the first one
for net_ns in net_namespace_list.to_list(symbol_table + constants.BANG + 'net', 'list', sentinel=True):
try:
ns_num = net_ns.get_inum()
Expand All @@ -45,15 +46,16 @@ def _get_devs_namespaces(cls,
# for each net namespace, walk the list of net devices
for net_dev in net_ns.dev_base_head.to_list(symbol_table + constants.BANG + 'net_device', 'dev_list', sentinel=True):
yield ns_num, net_dev

@classmethod
def _get_devs_base(cls,
context: interfaces.context.ContextInterface,
vmlinux_module_name: str) -> Iterable[Tuple[int, symbols.linux.extensions.net_device]]:
context: interfaces.context.ContextInterface,
vmlinux_module_name: str) -> Iterable[Tuple[int, symbols.linux.extensions.net_device]]:
"""Walk the list of net devices headed by dev_base (kernel < 2.6.22)."""
vmlinux = context.modules[vmlinux_module_name]

first_net_device = vmlinux.object_from_symbol(symbol_name='dev_base').dereference()
first_net_device = vmlinux.object_from_symbol(
symbol_name='dev_base').dereference()

for net_dev in symbols.linux.LinuxUtilities.walk_internal_list(vmlinux, 'net_device', 'next', first_net_device):
# no network namespace, so yield -1 instead of namespace number
Expand All @@ -74,13 +76,15 @@ def get_net_devs(cls,
func = cls._get_devs_base
# kernel 2.6.22 and 2.6.23
elif vmlinux.has_symbol('dev_name_head'):
vollog.error('Cannot extract net devices from kernel versions 2.6.22 - 2.6.23')
vollog.error(
'Cannot extract net devices from kernel versions 2.6.22 - 2.6.23')
return
# other unsupported kernels
else:
vollog.error("Unable to determine ifconfig information. Probably because it's an old kernel")
vollog.error(
"Unable to determine ifconfig information. Probably because it's an old kernel")
return

# yield net devices
for net_ns, dev in func(context, vmlinux_module_name):
yield net_ns, dev
Expand All @@ -101,7 +105,8 @@ def get_net_dev_info(cls,
# get MAC address
mac_addr = ''
for netdev_hw_addr in net_dev.dev_addrs.list.to_list(symbol_table + constants.BANG + 'netdev_hw_addr', 'list', sentinel=True):
mac_addr = ':'.join(['{0:02x}'.format(x) for x in netdev_hw_addr.addr][:6])
mac_addr = ':'.join(['{0:02x}'.format(x)
for x in netdev_hw_addr.addr][:6])
# use only first address
break

Expand All @@ -113,14 +118,15 @@ def get_net_dev_info(cls,
except exceptions.PagedInvalidAddressException:
ipv4_addr = ''
ipv4_prefixlen = 0

# get IPv6 info
ipv6_addr = ''
ipv6_prefixlen = 0
try:
inet6_dev = net_dev.ip6_ptr.dereference()
for inet6_ifaddr in inet6_dev.addr_list.to_list(symbol_table + constants.BANG + 'inet6_ifaddr', 'if_list', sentinel=True):
ipv6_addr = conversion.convert_ipv6(inet6_ifaddr.addr.in6_u.u6_addr32)
ipv6_addr = conversion.convert_ipv6(
inet6_ifaddr.addr.in6_u.u6_addr32)
ipv6_prefixlen = inet6_ifaddr.prefix_len
# use only first address
break
Expand All @@ -132,13 +138,14 @@ def get_net_dev_info(cls,

return name, mac_addr, ipv4_addr, ipv4_prefixlen, ipv6_addr, ipv6_prefixlen, promisc

def _generator(self):
def _generator(self):
# get all network devices
for _, net_dev in self.get_net_devs(self.context, self.config['kernel']):
# extract information from each device
info = self.get_net_dev_info(self.context, self.config['kernel'], net_dev)
info = self.get_net_dev_info(
self.context, self.config['kernel'], net_dev)
name, mac_addr, ipv4_addr, ipv4_prefixlen, ipv6_addr, ipv6_prefixlen, promisc = info

# convert to CIDR notation
if ipv4_addr:
ipv4 = ipv4_addr + '/' + str(ipv4_prefixlen)
Expand Down
Loading