diff --git a/fuzz_utils/modify_corpus/CorpusModifier.py b/fuzz_utils/modify_corpus/CorpusModifier.py index 05b1a77..d4cb3d7 100644 --- a/fuzz_utils/modify_corpus/CorpusModifier.py +++ b/fuzz_utils/modify_corpus/CorpusModifier.py @@ -1,28 +1,34 @@ """The CorpusModifier class handles modification of corpus call sequences based on fuzzer config""" import os -import yaml import hashlib import json import shutil -import copy import datetime +import yaml + from slither import Slither from fuzz_utils.utils.slither_utils import get_target_contract from fuzz_utils.utils.error_handler import handle_exit +from fuzz_utils.utils.crytic_print import CryticPrint +# pylint: disable=too-many-instance-attributes class CorpusModifier: """ Handles modifying the corpus based on the fuzzer configuration. """ - fuzzer_fields: dict = {"echidna": ["maxTimeDelay", "maxBlockDelay", "filterFunctions"], "medusa": ["blockTimestampDelayMax", "blockNumberDelayMax"]} - corpora_format: dict = {"echidna": {"coverage": [], "reproducers": []}, "medusa": {"call_sequences": {"immutable": [], "mutable": []}, "test_results": []}} + + fuzzer_fields: dict = { + "echidna": ["maxTimeDelay", "maxBlockDelay", "filterFunctions"], + "medusa": ["blockTimestampDelayMax", "blockNumberDelayMax"], + } + corpora_format: dict = { + "echidna": {"coverage": [], "reproducers": []}, + "medusa": {"call_sequences": {"immutable": [], "mutable": []}, "test_results": []}, + } valid_modes: list = ["delete_sequence", "delete_calls"] fuzzer_config: dict | None = None - def __init__( - self, - config: dict, - slither: Slither | None - ) -> None: + + def __init__(self, config: dict, slither: Slither | None) -> None: if config: self.modifier_config = config if slither: @@ -38,15 +44,20 @@ def __init__( self.mode = config["mode"] if "targetContract" in config: self.target = get_target_contract(self.slither, config["targetContract"]) - self.corpus_copy = {} - self.rules_list = [self._is_incorrect_delay, self._is_blacklisted_function, self._is_nonexistent_function] + self.corpus_copy: list = [] + self.rules_list = [ + self._is_incorrect_delay, + self._is_blacklisted_function, + self._is_nonexistent_function, + ] self.modifications_list = [self._modify_invalid_caller] - def modify_corpus(self) -> None: """Modifies the current corpus and saves the new version""" # 1. Open the corpus and parse all the files - self.corpus_copy = self._copy_fuzzer_corpus(self.corpora_format[self.fuzzer], self.corpus_path) + self.corpus_copy = self._copy_fuzzer_corpus( + self.corpora_format[self.fuzzer], self.corpus_path + ) # 2. a) Copy current corpus somewhere in case something goes wrong? self.save_corpus_to_history() # 4. Apply the rules @@ -56,13 +67,17 @@ def modify_corpus(self) -> None: def dry_run(self) -> None: """Prints the calls that would be modified, without modifying them""" - self.corpus_copy = self._copy_fuzzer_corpus(self.corpora_format[self.fuzzer.lower()], self.corpus_path) - new_corpus = self._filter_corpus(self.mode, self.rules_list, self.modifications_list) + self.corpus_copy = self._copy_fuzzer_corpus( + self.corpora_format[self.fuzzer.lower()], self.corpus_path + ) + _ = self._filter_corpus(self.mode, self.rules_list, self.modifications_list) def save_corpus_to_history(self) -> None: """Saves the current corpus directory to the corpus history""" # 1. Fetch current corpus - corpus_to_save = self._copy_fuzzer_corpus(self.corpora_format[self.fuzzer.lower()], self.corpus_path) + corpus_to_save = self._copy_fuzzer_corpus( + self.corpora_format[self.fuzzer.lower()], self.corpus_path + ) # 2. Check if .fuzz_utils folder already exists, create it if not directory = self._create_or_fetch_hidden_directory() # 3. Convert into a string @@ -86,12 +101,19 @@ def save_corpus_to_history(self) -> None: # 6. Add to history timestamp = datetime.datetime.now().isoformat() comment = input("Enter a comment for this save: ") - history[corpus_hash] = {"path": self.corpus_path, "timestamp": timestamp, "content": corpus_to_save, "comment": comment} + history[corpus_hash] = { + "path": self.corpus_path, + "timestamp": timestamp, + "content": corpus_to_save, + "comment": comment, + } # 7. Save history # TODO check if this can fail with open(history_path, "w", encoding="utf-8") as f: json.dump(history, f) + CryticPrint().print_information(f"Corpus saved to history with hash: {corpus_hash}") + def restore_corpus_from_history(self, corpus_hash: str) -> None: """Overrides the current corpus directory with a historical version""" directory = self._create_or_fetch_hidden_directory() @@ -120,11 +142,11 @@ def list_historic_corpora(self) -> None: for key, value in history.items(): print(f'{key}: {value["comment"]} (saved at {value["timestamp"]})') else: - print("No historic corpora were found.") - - + print("No historic corpora found.") - def _override_corpus_directory(self, directory_path: str, contents_list: list) -> None: + def _override_corpus_directory( # pylint: disable=no-self-use + self, directory_path: str, contents_list: list + ) -> None: for root, dirs, files in os.walk(directory_path): for file in files: file_path = os.path.join(root, file) @@ -143,10 +165,11 @@ def _override_corpus_directory(self, directory_path: str, contents_list: list) - os.makedirs(directory, exist_ok=True) # Write the file contents - with open(file_path, "w", encoding="utf-8") as file: - json.dump(file_content, file, indent=4) + # TODO fix the mypy errors later on + with open(file_path, "w", encoding="utf-8") as file: # type: ignore[assignment] + json.dump(file_content, file, indent=4) # type: ignore[arg-type] - def _create_or_fetch_hidden_directory(self) -> str: + def _create_or_fetch_hidden_directory(self) -> str: # pylint: disable=no-self-use current_directory = os.getcwd() directory_name = os.path.join(current_directory, ".fuzz_utils") os.makedirs(directory_name, exist_ok=True) @@ -162,18 +185,32 @@ def _is_incorrect_delay(self, call_object: dict) -> bool: block_delay: int if self.fuzzer == "echidna": - maxTimeDelay = self.fuzzer_config["maxTimeDelay"] if "maxTimeDelay" in self.fuzzer_config else None - maxBlockDelay = self.fuzzer_config["maxBlockDelay"] if "maxBlockDelay" in self.fuzzer_config else None + maxTimeDelay = ( + self.fuzzer_config["maxTimeDelay"] if "maxTimeDelay" in self.fuzzer_config else None + ) + maxBlockDelay = ( + self.fuzzer_config["maxBlockDelay"] + if "maxBlockDelay" in self.fuzzer_config + else None + ) time_delay = int(call_object["delay"][0], 16) block_delay = int(call_object["delay"][1], 16) elif self.fuzzer == "medusa": - maxTimeDelay = self.fuzzer_config["fuzzing"]["blockTimestampDelayMax"] if "blockTimestampDelayMax" in self.fuzzer_config["fuzzing"] else None - maxBlockDelay = self.fuzzer_config["fuzzing"]["blockNumberDelayMax"] if "blockNumberDelayMax" in self.fuzzer_config["fuzzing"] else None + maxTimeDelay = ( + self.fuzzer_config["fuzzing"]["blockTimestampDelayMax"] + if "blockTimestampDelayMax" in self.fuzzer_config["fuzzing"] + else None + ) + maxBlockDelay = ( + self.fuzzer_config["fuzzing"]["blockNumberDelayMax"] + if "blockNumberDelayMax" in self.fuzzer_config["fuzzing"] + else None + ) time_delay = call_object["blockTimestampDelay"] block_delay = call_object["blockNumberDelay"] else: raise ValueError(f"Invalid fuzzer: {self.fuzzer}") - + if maxTimeDelay: if time_delay > maxTimeDelay: return True @@ -188,8 +225,8 @@ def _is_nonexistent_function(self, call_object: dict) -> bool: return False function_name: str - #contracts = self.slither.contracts_derived - #functions = [y.name for x in contracts for y in x.functions_entry_points] + # contracts = self.slither.contracts_derived + # functions = [y.name for x in contracts for y in x.functions_entry_points] # TODO enable multiple targets later functions = [x.name for x in self.target.functions_entry_points] @@ -212,12 +249,18 @@ def _is_blacklisted_function(self, call_object: dict) -> bool: blacklisted_functions: list | None if self.fuzzer == "echidna": function_name = call_object["call"]["contents"][0] - blacklisted_functions: list | None = self.fuzzer_config["filterFunctions"] if "filterFunctions" in self.fuzzer_config else None + blacklisted_functions = ( + self.fuzzer_config["filterFunctions"] + if "filterFunctions" in self.fuzzer_config + else None + ) else: raise ValueError("Function blacklisting is only available in Echidna.") if blacklisted_functions: - if function_name in blacklisted_functions: # pylint: disable=unsupported-membership-test + if ( + function_name in blacklisted_functions + ): # pylint: disable=unsupported-membership-test return True return False @@ -235,17 +278,23 @@ def _filter_corpus(self, mode: str, rules_list: list, modification_list: list) - new_corpus: list = [] for idx, value in enumerate(self.corpus_copy): # A list of files with call sequences in them - resulting_sequence = self._filter_call_sequence(mode, rules_list, modification_list, value["content"]) + resulting_sequence = self._filter_call_sequence( + mode, rules_list, modification_list, value["content"] + ) if resulting_sequence: - new_corpus.append({"path": self.corpus_copy[idx]["path"], "content": resulting_sequence}) + new_corpus.append( + {"path": self.corpus_copy[idx]["path"], "content": resulting_sequence} + ) return new_corpus - def _filter_call_sequence(self, mode: str, rules_list: list, modification_list: list, call_sequence: list) -> dict | None: - def should_skip(call): + def _filter_call_sequence( + self, mode: str, rules_list: list, modification_list: list, call_sequence: list + ) -> list | None: + def should_skip(call: dict) -> bool: return any(rule_fn(call) for rule_fn in rules_list) - def replace_fields(call) -> dict: + def replace_fields(call: dict) -> dict: for modify_fn in modification_list: call = modify_fn(call) return call @@ -266,9 +315,8 @@ def replace_fields(call) -> dict: return resulting_sequence if resulting_sequence else None - - def _copy_fuzzer_corpus(self, corpus: dict, current_path: str) -> list | None: - temp_corpus = [] + def _copy_fuzzer_corpus(self, corpus: dict, current_path: str) -> list: + temp_corpus: list = [] for key in corpus.keys(): subdir_path = os.path.join(current_path, key) if isinstance(corpus[key], dict): @@ -276,7 +324,7 @@ def _copy_fuzzer_corpus(self, corpus: dict, current_path: str) -> list | None: else: temp_corpus.extend(self._fetch_directory_files(subdir_path)) - return temp_corpus if temp_corpus else None + return temp_corpus def _fetch_directory_files(self, directory: str) -> list: file_list: list = [] @@ -288,7 +336,9 @@ def _fetch_directory_files(self, directory: str) -> list: try: with open(full_path, "r", encoding="utf-8") as file: if self.fuzzer.lower() == "echidna": - file_list.append({"path": full_path, "content": yaml.safe_load(file)}) + file_list.append( + {"path": full_path, "content": yaml.safe_load(file)} + ) else: file_list.append({"path": full_path, "content": json.load(file)}) except Exception: # pylint: disable=broad-except @@ -308,7 +358,7 @@ def _fetch_fuzzer_config(self, fields: list[str]) -> dict: complete_config = yaml.safe_load(file) else: complete_config = json.load(file)["fuzzing"] - except Exception: # pylint: disable=broad-except + except Exception: # pylint: disable=broad-except handle_exit(f"Failed to find the fuzzer configuration file at {self.config_path}") for key, value in complete_config.items(): @@ -316,4 +366,3 @@ def _fetch_fuzzer_config(self, fields: list[str]) -> dict: filtered_config[key] = value return filtered_config - diff --git a/fuzz_utils/parsing/commands/modify_corpus.py b/fuzz_utils/parsing/commands/modify_corpus.py index 3fd600a..2dc95d0 100644 --- a/fuzz_utils/parsing/commands/modify_corpus.py +++ b/fuzz_utils/parsing/commands/modify_corpus.py @@ -4,7 +4,7 @@ from fuzz_utils.utils.crytic_print import CryticPrint from fuzz_utils.modify_corpus.CorpusModifier import CorpusModifier from fuzz_utils.utils.error_handler import handle_exit -from fuzz_utils.parsing.parser_util import check_config_and_set_default_values, open_config +from fuzz_utils.parsing.parser_util import check_config_and_set_default_values COMMAND: str = "modify-corpus" @@ -81,14 +81,16 @@ def modify_command(args: Namespace) -> None: config["modifySenders"] = {} # Process list for item in args.modify_senders: - assert("=" in item) + assert "=" in item senders = item.split("=") config["modifySenders"][senders[0]] = senders[1] if args.filter_functions: config["filterFunctions"] = args.filter_functions if args.dry_run: config["dryRun"] = args.dry_run - CryticPrint().print_error("The --dry-run command isn't implemented yet, come back in a bit!") + CryticPrint().print_error( + "The --dry-run command isn't implemented yet, come back in a bit!" + ) check_config_and_set_default_values( config, @@ -103,8 +105,8 @@ def modify_command(args: Namespace) -> None: if config["fuzzer"] not in {"echidna", "medusa"}: handle_exit( - f"\n* The requested fuzzer {config['fuzzer']} is not supported. Supported fuzzers: echidna, medusa." - ) + f"\n* The requested fuzzer {config['fuzzer']} is not supported. Supported fuzzers: echidna, medusa." + ) corpus_modifier = CorpusModifier(config, slither) corpus_modifier.modify_corpus() diff --git a/fuzz_utils/parsing/commands/restore.py b/fuzz_utils/parsing/commands/restore.py index adea575..9d55d45 100644 --- a/fuzz_utils/parsing/commands/restore.py +++ b/fuzz_utils/parsing/commands/restore.py @@ -10,7 +10,10 @@ def restore_flags(parser: ArgumentParser) -> None: """The unit test generation parser flags""" parser.add_argument( - "-ch", "--hash", dest="corpus_hash", help="Hash of the historic corpus that will be restored." + "-ch", + "--hash", + dest="corpus_hash", + help="Hash of the historic corpus that will be restored.", ) parser.add_argument( "-lh", @@ -33,6 +36,7 @@ def restore_command(args: Namespace) -> None: corpus_modifier.list_historic_corpora() else: if args.corpus_hash: + CryticPrint().print_information(f"Restoring corpus with hash: {args.corpus_hash}") corpus_modifier.restore_corpus_from_history(args.corpus_hash) else: - handle_exit("No hash was provided!") \ No newline at end of file + handle_exit("No hash was provided!") diff --git a/fuzz_utils/parsing/commands/snapshot.py b/fuzz_utils/parsing/commands/snapshot.py index 7c0ef50..f369412 100644 --- a/fuzz_utils/parsing/commands/snapshot.py +++ b/fuzz_utils/parsing/commands/snapshot.py @@ -40,4 +40,5 @@ def snapshot_command(args: Namespace) -> None: ) corpus_modifier = CorpusModifier(config, None) + CryticPrint().print_information(f"Saving corpus {args.corpus_dir} to history...") corpus_modifier.save_corpus_to_history()