Skip to content

Commit

Permalink
lint and reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
tuturu-tech committed Jun 12, 2024
1 parent 2fcbcc7 commit 5e447bb
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 52 deletions.
139 changes: 94 additions & 45 deletions fuzz_utils/modify_corpus/CorpusModifier.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
"""The CorpusModifier class handles modification of corpus call sequences based on fuzzer config"""
import os
import yaml
import hashlib
import json
import shutil
import copy
import datetime
import yaml

from slither import Slither
from fuzz_utils.utils.slither_utils import get_target_contract
from fuzz_utils.utils.error_handler import handle_exit
from fuzz_utils.utils.crytic_print import CryticPrint

# pylint: disable=too-many-instance-attributes
class CorpusModifier:
"""
Handles modifying the corpus based on the fuzzer configuration.
"""
fuzzer_fields: dict = {"echidna": ["maxTimeDelay", "maxBlockDelay", "filterFunctions"], "medusa": ["blockTimestampDelayMax", "blockNumberDelayMax"]}
corpora_format: dict = {"echidna": {"coverage": [], "reproducers": []}, "medusa": {"call_sequences": {"immutable": [], "mutable": []}, "test_results": []}}

fuzzer_fields: dict = {
"echidna": ["maxTimeDelay", "maxBlockDelay", "filterFunctions"],
"medusa": ["blockTimestampDelayMax", "blockNumberDelayMax"],
}
corpora_format: dict = {
"echidna": {"coverage": [], "reproducers": []},
"medusa": {"call_sequences": {"immutable": [], "mutable": []}, "test_results": []},
}
valid_modes: list = ["delete_sequence", "delete_calls"]
fuzzer_config: dict | None = None
def __init__(
self,
config: dict,
slither: Slither | None
) -> None:

def __init__(self, config: dict, slither: Slither | None) -> None:
if config:
self.modifier_config = config
if slither:
Expand All @@ -38,15 +44,20 @@ def __init__(
self.mode = config["mode"]
if "targetContract" in config:
self.target = get_target_contract(self.slither, config["targetContract"])
self.corpus_copy = {}
self.rules_list = [self._is_incorrect_delay, self._is_blacklisted_function, self._is_nonexistent_function]
self.corpus_copy: list = []
self.rules_list = [
self._is_incorrect_delay,
self._is_blacklisted_function,
self._is_nonexistent_function,
]
self.modifications_list = [self._modify_invalid_caller]


def modify_corpus(self) -> None:
"""Modifies the current corpus and saves the new version"""
# 1. Open the corpus and parse all the files
self.corpus_copy = self._copy_fuzzer_corpus(self.corpora_format[self.fuzzer], self.corpus_path)
self.corpus_copy = self._copy_fuzzer_corpus(
self.corpora_format[self.fuzzer], self.corpus_path
)
# 2. a) Copy current corpus somewhere in case something goes wrong?
self.save_corpus_to_history()
# 4. Apply the rules
Expand All @@ -56,13 +67,17 @@ def modify_corpus(self) -> None:

def dry_run(self) -> None:
"""Prints the calls that would be modified, without modifying them"""
self.corpus_copy = self._copy_fuzzer_corpus(self.corpora_format[self.fuzzer.lower()], self.corpus_path)
new_corpus = self._filter_corpus(self.mode, self.rules_list, self.modifications_list)
self.corpus_copy = self._copy_fuzzer_corpus(
self.corpora_format[self.fuzzer.lower()], self.corpus_path
)
_ = self._filter_corpus(self.mode, self.rules_list, self.modifications_list)

def save_corpus_to_history(self) -> None:
"""Saves the current corpus directory to the corpus history"""
# 1. Fetch current corpus
corpus_to_save = self._copy_fuzzer_corpus(self.corpora_format[self.fuzzer.lower()], self.corpus_path)
corpus_to_save = self._copy_fuzzer_corpus(
self.corpora_format[self.fuzzer.lower()], self.corpus_path
)
# 2. Check if .fuzz_utils folder already exists, create it if not
directory = self._create_or_fetch_hidden_directory()
# 3. Convert into a string
Expand All @@ -86,12 +101,19 @@ def save_corpus_to_history(self) -> None:
# 6. Add to history
timestamp = datetime.datetime.now().isoformat()
comment = input("Enter a comment for this save: ")
history[corpus_hash] = {"path": self.corpus_path, "timestamp": timestamp, "content": corpus_to_save, "comment": comment}
history[corpus_hash] = {
"path": self.corpus_path,
"timestamp": timestamp,
"content": corpus_to_save,
"comment": comment,
}
# 7. Save history
# TODO check if this can fail
with open(history_path, "w", encoding="utf-8") as f:
json.dump(history, f)

CryticPrint().print_information(f"Corpus saved to history with hash: {corpus_hash}")

def restore_corpus_from_history(self, corpus_hash: str) -> None:
"""Overrides the current corpus directory with a historical version"""
directory = self._create_or_fetch_hidden_directory()
Expand Down Expand Up @@ -120,11 +142,11 @@ def list_historic_corpora(self) -> None:
for key, value in history.items():
print(f'{key}: {value["comment"]} (saved at {value["timestamp"]})')
else:
print("No historic corpora were found.")


print("No historic corpora found.")

def _override_corpus_directory(self, directory_path: str, contents_list: list) -> None:
def _override_corpus_directory( # pylint: disable=no-self-use
self, directory_path: str, contents_list: list
) -> None:
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
Expand All @@ -143,10 +165,11 @@ def _override_corpus_directory(self, directory_path: str, contents_list: list) -
os.makedirs(directory, exist_ok=True)

# Write the file contents
with open(file_path, "w", encoding="utf-8") as file:
json.dump(file_content, file, indent=4)
# TODO fix the mypy errors later on
with open(file_path, "w", encoding="utf-8") as file: # type: ignore[assignment]
json.dump(file_content, file, indent=4) # type: ignore[arg-type]

def _create_or_fetch_hidden_directory(self) -> str:
def _create_or_fetch_hidden_directory(self) -> str: # pylint: disable=no-self-use
current_directory = os.getcwd()
directory_name = os.path.join(current_directory, ".fuzz_utils")
os.makedirs(directory_name, exist_ok=True)
Expand All @@ -162,18 +185,32 @@ def _is_incorrect_delay(self, call_object: dict) -> bool:
block_delay: int

if self.fuzzer == "echidna":
maxTimeDelay = self.fuzzer_config["maxTimeDelay"] if "maxTimeDelay" in self.fuzzer_config else None
maxBlockDelay = self.fuzzer_config["maxBlockDelay"] if "maxBlockDelay" in self.fuzzer_config else None
maxTimeDelay = (
self.fuzzer_config["maxTimeDelay"] if "maxTimeDelay" in self.fuzzer_config else None
)
maxBlockDelay = (
self.fuzzer_config["maxBlockDelay"]
if "maxBlockDelay" in self.fuzzer_config
else None
)
time_delay = int(call_object["delay"][0], 16)
block_delay = int(call_object["delay"][1], 16)
elif self.fuzzer == "medusa":
maxTimeDelay = self.fuzzer_config["fuzzing"]["blockTimestampDelayMax"] if "blockTimestampDelayMax" in self.fuzzer_config["fuzzing"] else None
maxBlockDelay = self.fuzzer_config["fuzzing"]["blockNumberDelayMax"] if "blockNumberDelayMax" in self.fuzzer_config["fuzzing"] else None
maxTimeDelay = (
self.fuzzer_config["fuzzing"]["blockTimestampDelayMax"]
if "blockTimestampDelayMax" in self.fuzzer_config["fuzzing"]
else None
)
maxBlockDelay = (
self.fuzzer_config["fuzzing"]["blockNumberDelayMax"]
if "blockNumberDelayMax" in self.fuzzer_config["fuzzing"]
else None
)
time_delay = call_object["blockTimestampDelay"]
block_delay = call_object["blockNumberDelay"]
else:
raise ValueError(f"Invalid fuzzer: {self.fuzzer}")

if maxTimeDelay:
if time_delay > maxTimeDelay:
return True
Expand All @@ -188,8 +225,8 @@ def _is_nonexistent_function(self, call_object: dict) -> bool:
return False

function_name: str
#contracts = self.slither.contracts_derived
#functions = [y.name for x in contracts for y in x.functions_entry_points]
# contracts = self.slither.contracts_derived
# functions = [y.name for x in contracts for y in x.functions_entry_points]
# TODO enable multiple targets later
functions = [x.name for x in self.target.functions_entry_points]

Expand All @@ -212,12 +249,18 @@ def _is_blacklisted_function(self, call_object: dict) -> bool:
blacklisted_functions: list | None
if self.fuzzer == "echidna":
function_name = call_object["call"]["contents"][0]
blacklisted_functions: list | None = self.fuzzer_config["filterFunctions"] if "filterFunctions" in self.fuzzer_config else None
blacklisted_functions = (
self.fuzzer_config["filterFunctions"]
if "filterFunctions" in self.fuzzer_config
else None
)
else:
raise ValueError("Function blacklisting is only available in Echidna.")

if blacklisted_functions:
if function_name in blacklisted_functions: # pylint: disable=unsupported-membership-test
if (
function_name in blacklisted_functions
): # pylint: disable=unsupported-membership-test
return True
return False

Expand All @@ -235,17 +278,23 @@ def _filter_corpus(self, mode: str, rules_list: list, modification_list: list) -
new_corpus: list = []
for idx, value in enumerate(self.corpus_copy):
# A list of files with call sequences in them
resulting_sequence = self._filter_call_sequence(mode, rules_list, modification_list, value["content"])
resulting_sequence = self._filter_call_sequence(
mode, rules_list, modification_list, value["content"]
)

if resulting_sequence:
new_corpus.append({"path": self.corpus_copy[idx]["path"], "content": resulting_sequence})
new_corpus.append(
{"path": self.corpus_copy[idx]["path"], "content": resulting_sequence}
)
return new_corpus

def _filter_call_sequence(self, mode: str, rules_list: list, modification_list: list, call_sequence: list) -> dict | None:
def should_skip(call):
def _filter_call_sequence(
self, mode: str, rules_list: list, modification_list: list, call_sequence: list
) -> list | None:
def should_skip(call: dict) -> bool:
return any(rule_fn(call) for rule_fn in rules_list)

def replace_fields(call) -> dict:
def replace_fields(call: dict) -> dict:
for modify_fn in modification_list:
call = modify_fn(call)
return call
Expand All @@ -266,17 +315,16 @@ def replace_fields(call) -> dict:

return resulting_sequence if resulting_sequence else None


def _copy_fuzzer_corpus(self, corpus: dict, current_path: str) -> list | None:
temp_corpus = []
def _copy_fuzzer_corpus(self, corpus: dict, current_path: str) -> list:
temp_corpus: list = []
for key in corpus.keys():
subdir_path = os.path.join(current_path, key)
if isinstance(corpus[key], dict):
temp_corpus[key] = self._copy_fuzzer_corpus(corpus[key], subdir_path)
else:
temp_corpus.extend(self._fetch_directory_files(subdir_path))

return temp_corpus if temp_corpus else None
return temp_corpus

def _fetch_directory_files(self, directory: str) -> list:
file_list: list = []
Expand All @@ -288,7 +336,9 @@ def _fetch_directory_files(self, directory: str) -> list:
try:
with open(full_path, "r", encoding="utf-8") as file:
if self.fuzzer.lower() == "echidna":
file_list.append({"path": full_path, "content": yaml.safe_load(file)})
file_list.append(
{"path": full_path, "content": yaml.safe_load(file)}
)
else:
file_list.append({"path": full_path, "content": json.load(file)})
except Exception: # pylint: disable=broad-except
Expand All @@ -308,12 +358,11 @@ def _fetch_fuzzer_config(self, fields: list[str]) -> dict:
complete_config = yaml.safe_load(file)
else:
complete_config = json.load(file)["fuzzing"]
except Exception: # pylint: disable=broad-except
except Exception: # pylint: disable=broad-except
handle_exit(f"Failed to find the fuzzer configuration file at {self.config_path}")

for key, value in complete_config.items():
if key in fields:
filtered_config[key] = value

return filtered_config

12 changes: 7 additions & 5 deletions fuzz_utils/parsing/commands/modify_corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fuzz_utils.utils.crytic_print import CryticPrint
from fuzz_utils.modify_corpus.CorpusModifier import CorpusModifier
from fuzz_utils.utils.error_handler import handle_exit
from fuzz_utils.parsing.parser_util import check_config_and_set_default_values, open_config
from fuzz_utils.parsing.parser_util import check_config_and_set_default_values

COMMAND: str = "modify-corpus"

Expand Down Expand Up @@ -81,14 +81,16 @@ def modify_command(args: Namespace) -> None:
config["modifySenders"] = {}
# Process list
for item in args.modify_senders:
assert("=" in item)
assert "=" in item
senders = item.split("=")
config["modifySenders"][senders[0]] = senders[1]
if args.filter_functions:
config["filterFunctions"] = args.filter_functions
if args.dry_run:
config["dryRun"] = args.dry_run
CryticPrint().print_error("The --dry-run command isn't implemented yet, come back in a bit!")
CryticPrint().print_error(
"The --dry-run command isn't implemented yet, come back in a bit!"
)

check_config_and_set_default_values(
config,
Expand All @@ -103,8 +105,8 @@ def modify_command(args: Namespace) -> None:

if config["fuzzer"] not in {"echidna", "medusa"}:
handle_exit(
f"\n* The requested fuzzer {config['fuzzer']} is not supported. Supported fuzzers: echidna, medusa."
)
f"\n* The requested fuzzer {config['fuzzer']} is not supported. Supported fuzzers: echidna, medusa."
)

corpus_modifier = CorpusModifier(config, slither)
corpus_modifier.modify_corpus()
Expand Down
8 changes: 6 additions & 2 deletions fuzz_utils/parsing/commands/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
def restore_flags(parser: ArgumentParser) -> None:
"""The unit test generation parser flags"""
parser.add_argument(
"-ch", "--hash", dest="corpus_hash", help="Hash of the historic corpus that will be restored."
"-ch",
"--hash",
dest="corpus_hash",
help="Hash of the historic corpus that will be restored.",
)
parser.add_argument(
"-lh",
Expand All @@ -33,6 +36,7 @@ def restore_command(args: Namespace) -> None:
corpus_modifier.list_historic_corpora()
else:
if args.corpus_hash:
CryticPrint().print_information(f"Restoring corpus with hash: {args.corpus_hash}")
corpus_modifier.restore_corpus_from_history(args.corpus_hash)
else:
handle_exit("No hash was provided!")
handle_exit("No hash was provided!")
1 change: 1 addition & 0 deletions fuzz_utils/parsing/commands/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ def snapshot_command(args: Namespace) -> None:
)

corpus_modifier = CorpusModifier(config, None)
CryticPrint().print_information(f"Saving corpus {args.corpus_dir} to history...")
corpus_modifier.save_corpus_to_history()

0 comments on commit 5e447bb

Please sign in to comment.