diff --git a/examples/change_watcher_plugin/bs_change_watcher/__init__.py b/examples/change_watcher_plugin/bs_change_watcher/__init__.py index ea366f7a..041b122c 100644 --- a/examples/change_watcher_plugin/bs_change_watcher/__init__.py +++ b/examples/change_watcher_plugin/bs_change_watcher/__init__.py @@ -6,7 +6,7 @@ def create_plugin(*args, **kwargs): """ This is the entry point that all decompilers will call in various ways. To remain agnostic, - always pass the args and kwargs to the ui_init_args and ui_init_kwargs of DecompilerInterface, inited + always pass the args and kwargs to the gui_init_args and gui_init_kwargs of DecompilerInterface, inited through the discover api. """ @@ -18,8 +18,8 @@ def create_plugin(*args, **kwargs): deci = DecompilerInterface.discover( plugin_name="ArtifactChangeWatcher", init_plugin=True, - ui_init_args=args, - ui_init_kwargs=kwargs + gui_init_args=args, + gui_init_kwargs=kwargs ) # create a function to print a string in the decompiler console decompiler_printer = lambda *x, **y: deci.print(f"Changed {x}{y}") @@ -29,7 +29,7 @@ def create_plugin(*args, **kwargs): } # register a menu to open when you right click on the psuedocode view - deci.register_ctx_menu_item( + deci.gui_register_ctx_menu( "StartArtifactChangeWatcher", "Start watching artifact changes", lambda: deci.start_artifact_watchers(), diff --git a/libbs/__init__.py b/libbs/__init__.py index 1317d755..8a04062d 100644 --- a/libbs/__init__.py +++ b/libbs/__init__.py @@ -1 +1,8 @@ -__version__ = "0.18.0" +__version__ = "0.19.0" + +import logging +logging.getLogger("libbs").addHandler(logging.NullHandler()) +from libbs.logger import Loggers +loggers = Loggers() +del Loggers +del logging diff --git a/libbs/api/artifact_dict.py b/libbs/api/artifact_dict.py index 3fe27c24..e9e720eb 100644 --- a/libbs/api/artifact_dict.py +++ b/libbs/api/artifact_dict.py @@ -59,6 +59,12 @@ def __len__(self): return len(self._artifact_lister()) def __getitem__(self, item): + """ + Takes a lifted identifier as input and returns a lifted artifact + """ + if isinstance(item, int): + item = self._deci.art_lifter.lower_addr(item) + art = self._artifact_getter(item) if art is None: raise KeyError @@ -66,17 +72,23 @@ def __getitem__(self, item): return self._deci.art_lifter.lift(art) def __setitem__(self, key, value): + """ + Both key and value must be lifted artifacts + """ if not isinstance(value, self._artifact_class): raise ValueError(f"Attempting to set a value of type {type(value)} to a dict of type {self._artifact_class}") - if hasattr(value, "addr") and value.addr is None: - value.addr = key + if isinstance(key, int): + key = self._deci.art_lifter.lower_addr(key) art = self._deci.art_lifter.lower(value) if not self._artifact_setter(art) and self._error_on_duplicate: raise ValueError(f"Set value {value} is already present at key {key}") def __contains__(self, item): + if isinstance(item, int): + item = self._deci.art_lifter.lower_addr(item) + data = self._artifact_getter(item) return data is not None diff --git a/libbs/api/artifact_lifter.py b/libbs/api/artifact_lifter.py index 738bc80a..3c47a3c1 100644 --- a/libbs/api/artifact_lifter.py +++ b/libbs/api/artifact_lifter.py @@ -34,7 +34,7 @@ def lift_type(self, type_str: str) -> str: def lift_addr(self, addr: int) -> int: if addr < self.deci.binary_base_addr: - self.deci.warning(f"Lifting an address that appears already lifted: {addr}...") + self.deci.debug(f"Lifting an address that appears already lifted: {addr}...") return addr else: return addr - self.deci.binary_base_addr @@ -47,7 +47,7 @@ def lower_type(self, type_str: str) -> str: def lower_addr(self, addr: int) -> int: if addr >= self.deci.binary_base_addr: - self.deci.warning(f"Lowering an address that appears already lowered: {addr}...") + self.deci.debug(f"Lowering an address that appears already lowered: {addr}...") return addr else: return addr + self.deci.binary_base_addr @@ -60,7 +60,7 @@ def lower_stack_offset(self, offset: int, func_addr: int) -> int: # def _lift_or_lower_artifact(self, artifact, mode): - target_attrs = ("type", "offset", "addr") + target_attrs = ("type", "offset", "addr", "func_addr") if mode not in ("lower", "lift"): return None @@ -82,7 +82,8 @@ def _lift_or_lower_artifact(self, artifact, mode): lifting_func = getattr(self, f"{mode}_stack_offset") setattr(lifted_art, attr, lifting_func(curr_val, lifted_art.addr)) else: - lifting_func = getattr(self, f"{mode}_{attr}") + attr_func_name = attr if attr != "func_addr" else "addr" + lifting_func = getattr(self, f"{mode}_{attr_func_name}") setattr(lifted_art, attr, lifting_func(curr_val)) # recursively correct nested artifacts diff --git a/libbs/api/decompiler_interface.py b/libbs/api/decompiler_interface.py index 611b97ee..4a9e3bef 100644 --- a/libbs/api/decompiler_interface.py +++ b/libbs/api/decompiler_interface.py @@ -4,7 +4,7 @@ import threading from collections import defaultdict from functools import wraps -from typing import Dict, Optional, Union, Tuple, List, Callable, Type +from typing import Dict, Optional, Tuple, List, Callable, Type import libbs from libbs.api.artifact_lifter import ArtifactLifter @@ -14,7 +14,7 @@ Artifact, Function, FunctionHeader, StackVariable, Comment, GlobalVariable, Patch, - Enum, Struct + Enum, Struct, FunctionArgument ) from libbs.decompilers import SUPPORTED_DECOMPILERS, ANGR_DECOMPILER, \ BINJA_DECOMPILER, IDA_DECOMPILER, GHIDRA_DECOMPILER @@ -34,22 +34,6 @@ def _requires_decompilation(self, *args, **kwargs): return _requires_decompilation -def artifact_write_event(f): - @wraps(f) - def _artifact_set_event(self: "DecompilerInterface", *args, **kwargs): - return self.artifact_set_event_handler(f, *args, **kwargs) - - return _artifact_set_event - - -class DummyArtifactSetLock: - def __enter__(self): - pass - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - class DecompilerInterface: def __init__( self, @@ -66,8 +50,8 @@ def __init__( plugin_name: str = f"generic_libbs_plugin", # [category/name] = (action_string, callback_func) gui_ctx_menu_actions: Optional[dict] = None, - ui_init_args: Optional[Tuple] = None, - ui_init_kwargs: Optional[Dict] = None, + gui_init_args: Optional[Tuple] = None, + gui_init_kwargs: Optional[Dict] = None, # [artifact_class] = list(callback_func) artifact_write_callbacks: Optional[Dict[Type[Artifact], List[Callable]]] = None, ): @@ -94,47 +78,22 @@ def __init__( # callback functions, keyed by Artifact class self.artifact_write_callbacks = artifact_write_callbacks or defaultdict(list) - # artifact dict aliases + # artifact dict aliases: + # these are the public API for artifacts that are used by the decompiler interface self.functions = ArtifactDict(Function, self, error_on_duplicate=error_on_artifact_duplicates) self.comments = ArtifactDict(Comment, self, error_on_duplicate=error_on_artifact_duplicates) self.enums = ArtifactDict(Enum, self, error_on_duplicate=error_on_artifact_duplicates) self.structs = ArtifactDict(Struct, self, error_on_duplicate=error_on_artifact_duplicates) self.patches = ArtifactDict(Patch, self, error_on_duplicate=error_on_artifact_duplicates) - #self.stack_vars = ArtifactDict(StackVariable, self, error_on_duplicate=error_on_artifact_duplicates) + self.global_vars = ArtifactDict(GlobalVariable, self, error_on_duplicate=error_on_artifact_duplicates) self._decompiler_available = decompiler_available if not self.headless: - args = ui_init_args or [] - kwargs = ui_init_kwargs or {} - self._init_ui_components(*args, **kwargs) + args = gui_init_args or [] + kwargs = gui_init_kwargs or {} + self._init_gui_components(*args, **kwargs) - # - # Decompiler GUI API - # - - def start_artifact_watchers(self): - """ - Starts the artifact watchers for the decompiler. This is a special function that is called - by the decompiler interface when the decompiler is ready to start watching for changes in the - decompiler. This is useful for plugins that want to watch for changes in the decompiler and - react to them. - - @return: - """ - self.info("Starting BinSync artifact watchers...") - self._artifact_watchers_started = True - - def stop_artifact_watchers(self): - """ - Stops the artifact watchers for the decompiler. This is a special function that is called - by the decompiler interface when the decompiler is ready to stop watching for changes in the - decompiler. This is useful for plugins that want to watch for changes in the decompiler and - react to them. - """ - self.info("Stopping BinSync artifact watchers...") - self._artifact_watchers_started = False - - def _init_ui_components(self, *args, **kwargs): + def _init_gui_components(self, *args, **kwargs): from libbs.ui.version import set_ui_version set_ui_version(self.qt_version) @@ -158,12 +117,24 @@ def _init_ui_components(self, *args, **kwargs): # register all context menu actions for action in self._gui_ctx_menu_actions: category, name, action_string, callback_func = action - self.register_ctx_menu_item(name, action_string, callback_func, category=category) + self.gui_register_ctx_menu(name, action_string, callback_func, category=category) def _init_gui_plugin(self, *args, **kwargs): return None - def active_context(self) -> libbs.artifacts.Function: + # + # Public API: + # These functions are the main API for interacting with the decompiler. In general, every function that takes + # an Artifact (including addresses) should be in the lifted form. Additionally, every function that returns an + # Artifact should be in the lifted form. This is to ensure that the decompiler interface is always in sync with + # the lifter. For getting and setting artifacts, the ArtifactDicts defined in the init should be used. + # + + # + # GUI API + # + + def gui_active_context(self) -> libbs.artifacts.Function: """ Returns an libbs Function. Currently only functions are supported as current contexts. This function will be called very frequently, so its important that its implementation is fast @@ -171,7 +142,7 @@ def active_context(self) -> libbs.artifacts.Function: """ raise NotImplementedError - def goto_address(self, func_addr) -> None: + def gui_goto(self, func_addr) -> None: """ Relocates decompiler display to provided address @@ -180,21 +151,43 @@ def goto_address(self, func_addr) -> None: """ raise NotImplementedError - def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool: + def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: raise NotImplementedError def gui_ask_for_string(self, question, title="Plugin Question") -> str: + """ + Opens a GUI dialog box that asks the user for a string. If not overriden by the decompiler interface, + this will default to a Qt dialog box that is based on the decompilers Qt version. + """ from libbs.ui.utils import gui_ask_for_string return gui_ask_for_string(question, title=title) - # - # Override Mandatory API: - # These functions create a public API for things that hold a reference to the Controller from either another - # thread or object. This is most useful for use in the UI, which can use this API to make general requests from - # the decompiler regardless of internal decompiler API. + # Override Mandatory API # + def start_artifact_watchers(self): + """ + Starts the artifact watchers for the decompiler. This is a special function that is called + by the decompiler interface when the decompiler is ready to start watching for changes in the + decompiler. This is useful for plugins that want to watch for changes in the decompiler and + react to them. + + @return: + """ + self.info("Starting BinSync artifact watchers...") + self._artifact_watchers_started = True + + def stop_artifact_watchers(self): + """ + Stops the artifact watchers for the decompiler. This is a special function that is called + by the decompiler interface when the decompiler is ready to stop watching for changes in the + decompiler. This is useful for plugins that want to watch for changes in the decompiler and + react to them. + """ + self.info("Stopping BinSync artifact watchers...") + self._artifact_watchers_started = False + @property def binary_base_addr(self) -> int: """ @@ -240,29 +233,28 @@ def decompiler_available(self) -> bool: return True def decompile(self, addr: int) -> Optional[str]: + addr = self.art_lifter.lower_addr(addr) if not self.decompiler_available: _l.error("Decompiler is not available.") return None # TODO: make this a function call after transitioning decompiler artifacts to LiveState - for search_addr in (addr, self.art_lifter.lower_addr(addr)): - func_found = False - for func_addr, func in self._functions().items(): - if func.addr <= search_addr < (func.addr + func.size): - func_found = True - break - else: - func = None - - if func_found: + for func_addr, func in self._functions().items(): + if func.addr <= addr < (func.addr + func.size): + func_found = True break else: + func = None + + if func is None: + self.warning(f"Failed to find function for address {hex(addr)}") return None + func = self.art_lifter.lower(func) try: decompilation = self._decompile(func) except Exception as e: - _l.warning(f"Failed to decompile function at {hex(addr)}: {e}") + self.warning(f"Failed to decompile function at {hex(addr)}: {e}") decompilation = None return decompilation @@ -286,7 +278,6 @@ def get_decompilation_object(self, function: Function) -> Optional[object]: # # Override Optional API: - # These are API that provide extra introspection for plugins that may rely on LibBS Interface # def undo(self): @@ -315,10 +306,9 @@ def rename_local_variables_by_names(self, func: Function, name_map: Dict[str, st return False # - # Artifact API: - # These functions are the main API for interacting with the decompiler artifacts. Generally, these functions - # should all be implemented by the decompiler interface, but in the case that they are not, they should not - # crash the LibBS Interface. + # Private Artifact API: + # Unlike the public API, every function in this section should take and return artifacts in their native (lowered) + # form. # # functions @@ -450,42 +440,57 @@ def _set_function_header(self, fheader: FunctionHeader, **kwargs) -> bool: return False # - # special + # Change Callback API # - def global_artifacts(self): - """ - Returns a light version of all artifacts that are global (non function associated): - - structs, gvars, enums + def function_header_changed(self, fheader: FunctionHeader, **kwargs): + for callback_func in self.artifact_write_callbacks[FunctionHeader]: + threading.Thread(target=callback_func, args=(fheader,), kwargs=kwargs, daemon=True).start() - @return: - """ - g_artifacts = {} - for f in [self._structs, self._global_vars, self._enums]: - g_artifacts.update(f()) + def stack_variable_changed(self, svar: StackVariable, **kwargs): + for callback_func in self.artifact_write_callbacks[StackVariable]: + threading.Thread(target=callback_func, args=(svar,), kwargs=kwargs, daemon=True).start() - return g_artifacts + def comment_changed(self, comment: Comment, **kwargs): + for callback_func in self.artifact_write_callbacks[Comment]: + threading.Thread(target=callback_func, args=(comment,), kwargs=kwargs, daemon=True).start() - def global_artifact(self, lookup_item: Union[str, int]): - """ - Returns a live libbs.artifacts version of the Artifact located at the lookup_item location, which can - lookup any artifact supported in `global_artifacts` + def struct_changed(self, struct: Struct, deleted=False, **kwargs): + kwargs["deleted"] = deleted + for callback_func in self.artifact_write_callbacks[Struct]: + threading.Thread(target=callback_func, args=(struct,), kwargs=kwargs, daemon=True).start() - @param lookup_item: - @return: - """ + def enum_changed(self, enum: Enum, deleted=False, **kwargs): + kwargs["deleted"] = deleted + for callback_func in self.artifact_write_callbacks[Enum]: + threading.Thread(target=callback_func, args=(enum,), kwargs=kwargs, daemon=True).start() - if isinstance(lookup_item, int): - return self._get_global_var(lookup_item) - elif isinstance(lookup_item, str): - artifact = self._get_struct(lookup_item) - if artifact: - return artifact + def global_variable_changed(self, gvar: GlobalVariable, **kwargs): + for callback_func in self.artifact_write_callbacks[GlobalVariable]: + threading.Thread(target=callback_func, args=(gvar,), kwargs=kwargs, daemon=True).start() - artifact = self._get_enum(lookup_item) - return artifact + # + # Special Loggers and Printers + # - return None + def print(self, msg: str, **kwargs): + print(msg) + + def info(self, msg: str, **kwargs): + _l.info(msg) + + def debug(self, msg: str, **kwargs): + _l.debug(msg) + + def warning(self, msg: str, **kwargs): + _l.warning(msg) + + def error(self, msg: str, **kwargs): + _l.error(msg) + + # + # Utils + # def set_artifact(self, artifact: Artifact, lower=True, **kwargs) -> bool: """ @@ -523,96 +528,17 @@ def set_artifact(self, artifact: Artifact, lower=True, **kwargs) -> bool: return setter(artifact, **kwargs) - # - # Change Callback API - # TODO: all the code in this category on_* is experimental and not ready for production use - # all this code should be implemented in the other decompilers or moved to a different project - # - - def function_header_changed(self, fheader: FunctionHeader, **kwargs): - for callback_func in self.artifact_write_callbacks[FunctionHeader]: - callback_func(fheader, **kwargs) - - def stack_variable_changed(self, svar: StackVariable, **kwargs): - for callback_func in self.artifact_write_callbacks[StackVariable]: - callback_func(svar, **kwargs) - - def comment_changed(self, comment: Comment, **kwargs): - for callback_func in self.artifact_write_callbacks[Comment]: - callback_func(comment, **kwargs) - - def struct_changed(self, struct: Struct, deleted=False, **kwargs): - for callback_func in self.artifact_write_callbacks[Struct]: - callback_func(struct, deleted=deleted, **kwargs) - - def enum_changed(self, enum: Enum, deleted=False, **kwargs): - for callback_func in self.artifact_write_callbacks[Enum]: - callback_func(enum, deleted=deleted, **kwargs) - - def global_variable_changed(self, gvar: GlobalVariable, **kwargs): - for callback_func in self.artifact_write_callbacks[GlobalVariable]: - callback_func(gvar, **kwargs) - - # - # Fillers: - # A filler function is generally responsible for pulling down artifacts from a specific user state - # and reflecting those changes in decompiler view (like the text on the screen). Normally, these changes - # will also be accompanied by a Git commit to the master users state to save the changes from pull and - # fill into their BS database. In special cases, a filler may only update the decompiler UI but not directly - # cause a save of the BS state. - # - - def artifact_set_event_handler( - self, setter_func, artifact: Artifact, *args, **kwargs - ): - """ - This function handles any event which tries to set an Artifact into the decompiler. This handler does two - important tasks: - 1. Locks callback handlers, so you don't get infinite callbacks - 2. "Lowers" the artifact, so it's artifacts types match the decompilers - - Because of this, it's recommended that when overriding this function you always call super() at the end of - your override so it's set correctly in the decompiler. - - :param setter_func: - :param artifact: - :param args: - :param kwargs: - :return: - """ - - lowered_artifact = self.art_lifter.lower(artifact) - lock = self.artifact_write_lock if not self.artifact_write_lock.locked() else DummyArtifactSetLock() - with lock: - try: - had_changes = setter_func(lowered_artifact, **kwargs) - except ValueError: - had_changes = False - - return had_changes - - # - # Special Loggers and Printers - # - - def print(self, msg: str, **kwargs): - print(msg) - - def info(self, msg: str, **kwargs): - _l.info(msg) - - def debug(self, msg: str, **kwargs): - _l.debug(msg) - - def warning(self, msg: str, **kwargs): - _l.warning(msg) - - def error(self, msg: str, **kwargs): - _l.error(msg) - - # - # Utils - # + @staticmethod + def get_identifiers(artifact: Artifact) -> Tuple: + if isinstance(artifact, (Function, FunctionHeader, GlobalVariable, Patch, Comment)): + return (artifact.addr,) + elif isinstance(artifact, StackVariable): + return artifact.addr, artifact.offset + elif isinstance(artifact, FunctionArgument): + # TODO: add addr to function arguments + return (artifact.offset,) + elif isinstance(artifact, (Struct, Enum)): + return (artifact.name,) def type_is_user_defined(self, type_str, state=None): if not type_str: @@ -628,7 +554,7 @@ def type_is_user_defined(self, type_str, state=None): return None base_type_str = type_.base_type.type - return base_type_str if base_type_str in state._structs.keys() else None + return base_type_str if base_type_str in self.structs.keys() else None @staticmethod def _find_global_in_call_frames(global_name, max_frames=10): diff --git a/libbs/artifacts/artifact.py b/libbs/artifacts/artifact.py index cc5a9ca7..4cda31a5 100644 --- a/libbs/artifacts/artifact.py +++ b/libbs/artifacts/artifact.py @@ -125,6 +125,28 @@ def invert_diff(cls, diff_dict: Dict): return inverted_diff + def reset_last_change(self): + """ + Resets the change time of the Artifact. + In subclasses, this should also reset all artifacts with nested artifacts + """ + self.last_change = None + + def overwrite_merge(self, obj2: "Artifact", **kwargs): + """ + This function should really be overwritten by its subclass + """ + merge_obj = self.copy() + if not obj2 or merge_obj == obj2: + return merge_obj + + for attr in self.__slots__: + a2 = getattr(obj2, attr) + if a2 is not None: + setattr(merge_obj, attr, a2) + + return merge_obj + def nonconflict_merge(self, obj2: "Artifact", **kwargs): obj1 = self.copy() if not obj2 or obj1 == obj2: diff --git a/libbs/artifacts/func.py b/libbs/artifacts/func.py index 59f16b86..8ff03227 100644 --- a/libbs/artifacts/func.py +++ b/libbs/artifacts/func.py @@ -54,7 +54,7 @@ def __init__(self, name, addr, type_=None, args=None, last_change=None): self.name = name self.addr = addr self.type = type_ - self.args = args or {} + self.args: Dict[str, FunctionArgument] = args or {} def __str__(self): return f"" @@ -137,6 +137,34 @@ def copy(self): fh.args = {k: v.copy() for k, v in self.args.items()} return fh + def reset_last_change(self): + if self.args: + for arg in self.args.values(): + arg.reset_last_change() + + def overwrite_merge(self, obj2: "Artifact", **kwargs): + fh2: "FunctionHeader" = obj2 + merged_fh: "FunctionHeader" = self.copy() + if not fh2 or not isinstance(fh2, FunctionHeader) or self == fh2: + return merged_fh + + if fh2.name is not None: + merged_fh.name = fh2.name + if fh2.type is not None: + merged_fh.type = fh2.type + + # header args + for off, var in fh2.args.items(): + if var is not None: + if off in merged_fh.args: + merged_var = merged_fh.args[off].overwrite_merge(var) + else: + merged_var = var + + merged_fh.args[off] = merged_var + + return merged_fh + def nonconflict_merge(self, fh2: "FunctionHeader", **kwargs): fh1: "FunctionHeader" = self.copy() if not fh2 or not isinstance(fh2, FunctionHeader): @@ -304,6 +332,34 @@ def load(cls, func_toml): f.__setstate__(func_toml) return f + def reset_last_change(self): + if self.header: + self.header.reset_last_change() + + if self.stack_vars: + for sv in self.stack_vars.values(): + sv.reset_last_change() + + def overwrite_merge(self, obj2: "Artifact", **kwargs): + func2: "Function" = obj2 + merged_func: "Function" = self.copy() + if not func2 or self == func2: + return merged_func + + if func2.header is not None: + merged_func.header = merged_func.header.overwrite_merge(func2.header) + + for off, var in func2.stack_vars.items(): + if var is not None: + if off in merged_func.stack_vars: + merged_var = merged_func.stack_vars[off].overwrite_merge(var) + else: + merged_var = var + + merged_func.stack_vars[off] = merged_var + + return merged_func + def nonconflict_merge(self, func2: "Artifact", **kwargs): func1: "Function" = self.copy() diff --git a/libbs/decompilers/angr/compat.py b/libbs/decompilers/angr/compat.py index 87c1c5e1..67ad21bf 100644 --- a/libbs/decompilers/angr/compat.py +++ b/libbs/decompilers/angr/compat.py @@ -90,11 +90,13 @@ def build_context_menu_node(self, node): # pylint: disable=unused-argument def handle_stack_var_renamed(self, func, offset, old_name, new_name): + print("handle_stack_var_renamed") if func is None: return False decompilation = self.interface.decompile_function(func) stack_var = self.interface.find_stack_var_in_codegen(decompilation, offset) + print("handle_stack_var_renamed signal sent out to everyone") self.interface.stack_variable_changed(StackVariable(offset, new_name, None, stack_var.size, func.addr)) return True diff --git a/libbs/decompilers/angr/interface.py b/libbs/decompilers/angr/interface.py index 4fa7a940..bb32afe7 100644 --- a/libbs/decompilers/angr/interface.py +++ b/libbs/decompilers/angr/interface.py @@ -79,6 +79,7 @@ def binary_path(self) -> Optional[str]: return None def get_func_size(self, func_addr) -> int: + func_addr = self.art_lifter.lower_addr(func_addr) try: func = self.main_instance.project.kb.functions[func_addr] return func.size @@ -164,10 +165,10 @@ def _init_gui_plugin(self, *args, **kwargs): self.workspace.plugins.register_active_plugin(self._plugin_name, self.gui_plugin) return self.gui_plugin - def goto_address(self, func_addr): + def gui_goto(self, func_addr): self.workspace.jump_to(self.art_lifter.lower_addr(func_addr)) - def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool: + def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: if self.gui_plugin is None: l.critical("Cannot register context menu item without a GUI plugin.") return False @@ -176,7 +177,7 @@ def register_ctx_menu_item(self, name, action_string, callback_func, category=No self.gui_plugin.context_menu_items = self._ctx_menu_items return True - def active_context(self): + def gui_active_context(self): curr_view = self.workspace.view_manager.current_tab if not curr_view: return None @@ -189,8 +190,9 @@ def active_context(self): if func is None or func.am_obj is None: return None + func_addr = self.art_lifter.lift_addr(func.addr) return Function( - func.addr, 0, header=FunctionHeader(func.name, func.addr) + func_addr, func.size, header=FunctionHeader(func.name, func_addr) ) # @@ -268,26 +270,39 @@ def _set_function_header(self, fheader: FunctionHeader, decompilation=None, **kw if fheader.args: for i, arg in fheader.args.items(): + if not arg: + continue + if i >= len(decompilation.cfunc.arg_list): break - if decompilation.cfunc.arg_list[i].variable.name != arg.name: - decompilation.cfunc.arg_list[i].variable.name = arg.name + + dec_arg = decompilation.cfunc.arg_list[i].variable + # TODO: set the types of the args + if arg.name and arg.name != dec_arg.name: + dec_arg.name = arg.name changes = True return changes def _set_stack_variable(self, svar: StackVariable, decompilation=None, **kwargs) -> bool: changed = False - code_var = AngrInterface.find_stack_var_in_codegen(decompilation, svar.offset) - if code_var: - code_var.name = svar.name - code_var.renamed = True + if not svar or not decompilation: + return changed + + dec_svar = AngrInterface.find_stack_var_in_codegen(decompilation, svar.offset) + if dec_svar and svar.name and svar.name != dec_svar.name: + # TODO: set the types of the stack vars + dec_svar.name = svar.name + dec_svar.renamed = True changed = True return changed def _set_comment(self, comment: Comment, decompilation=None, **kwargs) -> bool: changed = False + if not comment or not comment.comment: + return changed + if comment.decompiled and comment.addr != comment.func_addr: try: pos = decompilation.map_addr_to_pos.get_nearest_pos(comment.addr) diff --git a/libbs/decompilers/binja/interface.py b/libbs/decompilers/binja/interface.py index e57f314e..a4011e08 100644 --- a/libbs/decompilers/binja/interface.py +++ b/libbs/decompilers/binja/interface.py @@ -201,7 +201,7 @@ def get_decompilation_object(self, function: Function) -> Optional[object]: def _init_gui_plugin(self, *args, **kwargs): return self - def active_context(self): + def gui_active_context(self): all_contexts = UIContext.allContexts() if not all_contexts: return None @@ -224,7 +224,7 @@ def gui_ask_for_string(self, question, title="Plugin Question") -> str: resp = binaryninja.get_text_line_input(question, title) return resp.decode() if resp else "" - def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool: + def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: # TODO: this needs to have a wrapper function that passes the bv to the current deci # correct name, category, and action_string for Binja action_string = action_string.replace("/", "\\") @@ -238,7 +238,7 @@ def register_ctx_menu_item(self, name, action_string, callback_func, category=No ) return True - def goto_address(self, func_addr) -> None: + def gui_goto(self, func_addr) -> None: self.bv.offset = func_addr # @@ -328,26 +328,27 @@ def _set_stack_variable(self, svar: StackVariable, bn_func=None, **kwargs) -> bo bn_offset = svar.offset if bn_offset in current_bn_vars: # name - if str(current_bn_vars[bn_offset].name) != svar.name: + if svar.name and svar.name != str(current_bn_vars[bn_offset].name): current_bn_vars[bn_offset].name = svar.name updates |= True # type - try: - type_, _ = self.bv.parse_type_string(svar.type) - except Exception: - type_ = None - - if type_ is not None: - if self.art_lifter.lift_type(str(current_bn_vars[bn_offset].type)) != type_: - current_bn_vars[bn_offset].type = type_ + if svar.type: try: - bn_func.create_user_stack_var(bn_offset, type_, svar.name) - bn_func.create_auto_stack_var(bn_offset, type_, svar.name) - except Exception as e: - l.warning(f"BinSync could not sync stack variable at offset {bn_offset}: {e}") + bs_svar_type, _ = self.bv.parse_type_string(svar.type) + except Exception: + bs_svar_type = None - updates |= True + if bs_svar_type is not None: + if self.art_lifter.lift_type(str(current_bn_vars[bn_offset].type)) != bs_svar_type: + current_bn_vars[bn_offset].type = bs_svar_type + try: + bn_func.create_user_stack_var(bn_offset, bs_svar_type, svar.name) + bn_func.create_auto_stack_var(bn_offset, bs_svar_type, svar.name) + except Exception as e: + l.warning(f"BinSync could not sync stack variable at offset {bn_offset}: {e}") + + updates |= True return updates diff --git a/libbs/decompilers/ghidra/interface.py b/libbs/decompilers/ghidra/interface.py index 3d1db487..da2b1ffa 100644 --- a/libbs/decompilers/ghidra/interface.py +++ b/libbs/decompilers/ghidra/interface.py @@ -46,51 +46,13 @@ def __init__(self, loop_on_plugin=True, **kwargs): self.loop_on_plugin = loop_on_plugin - # - # Controller API - # - - @property - def binary_base_addr(self) -> int: - return self.ghidra.currentProgram.getImageBase().getOffset() - - @property - def binary_hash(self) -> str: - return self.ghidra.currentProgram.executableMD5 - - @property - def binary_path(self) -> Optional[str]: - return self.ghidra.currentProgram.executablePath - - def get_func_size(self, func_addr) -> int: - gfunc = self._get_nearest_function(func_addr) - return int(gfunc.getBody().getNumAddresses()) - - def connect_ghidra_bridge(self): - self.ghidra = GhidraAPIWrapper(self) - return self.ghidra.connected - - def decompile(self, addr: int) -> Optional[str]: - # TODO: allow the super to do this again - function = self.art_lifter.lower(self.functions[addr]) - return self._decompile(function) - - def _decompile(self, function: Function) -> Optional[str]: - dec_obj = self.get_decompilation_object(function) - if dec_obj is None: - return None - - dec_func = dec_obj.getDecompiledFunction() - if dec_func is None: - return None - - return str(dec_func.getC()) - - def get_decompilation_object(self, function: Function) -> Optional[object]: - return self._ghidra_decompile(self._get_nearest_function(function.addr)) + def _init_gui_components(self, *args, **kwargs): + if not self.connect_ghidra_bridge(): + raise Exception("Failed to connect to remote Ghidra Bridge. Did you start it first?") + super()._init_gui_components(*args, **kwargs) # - # GUI API + # GUI # @property @@ -110,12 +72,7 @@ def gui_plugin(self): def gui_plugin(self, value): pass - def _init_ui_components(self, *args, **kwargs): - if not self.connect_ghidra_bridge(): - raise Exception("Failed to connect to remote Ghidra Bridge. Did you start it first?") - super()._init_ui_components(*args, **kwargs) - - def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool: + def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: ctx_menu_action = create_context_action(self.ghidra, name, action_string, callback_func, category or "LibBS") self.ghidra.getState().getTool().addAction(ctx_menu_action) return True @@ -126,7 +83,7 @@ def gui_ask_for_string(self, question, title="Plugin Question") -> str: ) return answer if answer else "" - def active_context(self): + def gui_active_context(self): active_addr = self.ghidra.currentLocation.getAddress().getOffset() if active_addr is None: return Function(0, 0) @@ -134,15 +91,59 @@ def active_context(self): if active_addr != self._last_addr: self._last_addr = active_addr self._last_func = self._gfunc_to_bsfunc(self._get_nearest_function(active_addr)) + self._last_func.addr = self.art_lifter.lower_addr(self._last_func.addr) return self._last_func - def goto_address(self, func_addr) -> None: + def gui_goto(self, func_addr) -> None: + func_addr = self.art_lifter.lower_addr(func_addr) self.ghidra.goTo(self.ghidra.toAddr(func_addr)) # - # Override Optional API: - # There are API that provide extra introspection for plugins that may rely on LibBS Interface + # Mandatory API + # + + @property + def binary_base_addr(self) -> int: + return self.ghidra.currentProgram.getImageBase().getOffset() + + @property + def binary_hash(self) -> str: + return self.ghidra.currentProgram.executableMD5 + + @property + def binary_path(self) -> Optional[str]: + return self.ghidra.currentProgram.executablePath + + def get_func_size(self, func_addr) -> int: + gfunc = self._get_nearest_function(func_addr) + return int(gfunc.getBody().getNumAddresses()) + + def connect_ghidra_bridge(self): + self.ghidra = GhidraAPIWrapper(self) + return self.ghidra.connected + + def decompile(self, addr: int) -> Optional[str]: + # TODO: allow the super to do this again + function = self.art_lifter.lower(self.functions[addr]) + return self._decompile(function) + + def _decompile(self, function: Function) -> Optional[str]: + dec_obj = self.get_decompilation_object(function) + if dec_obj is None: + return None + + dec_func = dec_obj.getDecompiledFunction() + if dec_func is None: + return None + + return str(dec_func.getC()) + + def get_decompilation_object(self, function: Function) -> Optional[object]: + return self._ghidra_decompile(self._get_nearest_function(function.addr)) + + # + # Extra API # def undo(self): @@ -165,11 +166,11 @@ def rename_local_variables_by_names(self, func: Function, name_map: Dict[str, st return self._update_local_variable_symbols(symbols_to_update) if symbols_to_update else False # - # Artifact API + # Private Artifact API # def _set_function(self, func: Function, **kwargs) -> bool: - func_addr = func.header.addr + func_addr = self.art_lifter.lower_addr(func.header.addr) decompilation = self._ghidra_decompile(self._get_nearest_function(func_addr)) changes = super()._set_function(func, decompilation=decompilation, **kwargs) return changes @@ -233,18 +234,23 @@ def _functions(self) -> Dict[int, Function]: @ghidra_transaction def _set_stack_variable(self, svar: StackVariable, **kwargs) -> bool: changes = False + if not svar: + return changes + decompilation = kwargs.get('decompilation', None) or self._ghidra_decompile(self._get_function(svar.addr)) ghidra_func = decompilation.getFunction() if decompilation else self._get_nearest_function(svar.addr) gstack_var = self._get_gstack_var(ghidra_func, svar.offset) if not gstack_var: - return False + return changes src_type = self.ghidra.import_module_object("ghidra.program.model.symbol", "SourceType") + # name if svar.name and svar.name != gstack_var.getName(): gstack_var.setName(svar.name, src_type.USER_DEFINED) changes = True + # type if svar.type: parsed_type = self.typestr_to_gtype(svar.type) if parsed_type is not None and parsed_type != str(gstack_var.getDataType()): @@ -403,13 +409,12 @@ def _enums(self) -> Dict[str, Enum]: ) return {name[1:]: Enum(name[1:], self._get_enum_members(name)) for name in names if name.count('/') == 1} if names else {} - # - # TODO: REMOVE ME THIS IS THE BINSYNC CODE - # Filler/Setter API - # - @ghidra_transaction def fill_global_var(self, var_addr, user=None, artifact=None, **kwargs): + """ + TODO: remove me and implement me properly as setters and getters + """ + changes = False global_var: GlobalVariable = artifact all_global_vars = self.global_vars() @@ -435,6 +440,9 @@ def fill_global_var(self, var_addr, user=None, artifact=None, **kwargs): return changes def global_var(self, addr) -> Optional[GlobalVariable]: + """ + TODO: remove me and implement me properly as setters and getters + """ light_global_vars = self.global_vars() for offset, global_var in light_global_vars.items(): if offset == addr: @@ -452,6 +460,9 @@ def global_var(self, addr) -> Optional[GlobalVariable]: return global_var def global_vars(self) -> Dict[int, GlobalVariable]: + """ + TODO: remove me and implement me properly as setters and getters + """ symbol_type = self.ghidra.import_module_object("ghidra.program.model.symbol", "SymbolType") symbol_table = self.ghidra.currentProgram.getSymbolTable() # optimize by grabbing all symbols at once diff --git a/libbs/decompilers/ida/artifact_lifter.py b/libbs/decompilers/ida/artifact_lifter.py index 86ba5299..30f2849d 100644 --- a/libbs/decompilers/ida/artifact_lifter.py +++ b/libbs/decompilers/ida/artifact_lifter.py @@ -29,4 +29,4 @@ def lower_type(self, type_str: str) -> str: return type_str def lower_stack_offset(self, offset: int, func_addr: int) -> int: - return offset #compat.ida_to_angr_stack_offset(func_addr, offset) + return abs(offset) #compat.ida_to_angr_stack_offset(func_addr, offset) diff --git a/libbs/decompilers/ida/compat.py b/libbs/decompilers/ida/compat.py index e750715f..db5ef6d4 100644 --- a/libbs/decompilers/ida/compat.py +++ b/libbs/decompilers/ida/compat.py @@ -121,8 +121,8 @@ def ida_to_bs_stack_offset(func_addr, ida_stack_off): frame_size = idc.get_struc_size(frame) last_member_size = idaapi.get_member_size(frame.get_member(frame.memqty - 1)) - angr_stack_off = ida_stack_off - frame_size + last_member_size - return angr_stack_off + bs_soff = ida_stack_off - frame_size + last_member_size + return bs_soff @execute_write @@ -175,7 +175,7 @@ def get_func_name(ea) -> typing.Optional[str]: def get_func_size(ea): func = idaapi.get_func(ea) if not func: - return 0 + raise ValueError("Unable to find function!") return func.size() @@ -294,7 +294,7 @@ def function_header(ida_code_view) -> FunctionHeader: @execute_write @requires_decompilation -def set_function_header(libbs_header: libbs.artifacts.FunctionHeader, exit_on_bad_type=False, ida_code_view=None): +def set_function_header(bs_header: libbs.artifacts.FunctionHeader, exit_on_bad_type=False, ida_code_view=None): data_changed = False func_addr = ida_code_view.cfunc.entry_ea cur_ida_func = function_header(ida_code_view) @@ -303,8 +303,8 @@ def set_function_header(libbs_header: libbs.artifacts.FunctionHeader, exit_on_ba # FUNCTION NAME # - if libbs_header.name and libbs_header.name != cur_ida_func.name: - set_ida_func_name(func_addr, libbs_header.name) + if bs_header.name and bs_header.name != cur_ida_func.name: + set_ida_func_name(func_addr, bs_header.name) # # FUNCTION RET TYPE @@ -312,9 +312,9 @@ def set_function_header(libbs_header: libbs.artifacts.FunctionHeader, exit_on_ba func_name = get_func_name(func_addr) cur_ret_type_str = str(ida_code_view.cfunc.type.get_rettype()) - if libbs_header.type and libbs_header.type != cur_ret_type_str: + if bs_header.type and bs_header.type != cur_ret_type_str: old_prototype = str(ida_code_view.cfunc.type).replace("(", f" {func_name}(", 1) - new_prototype = old_prototype.replace(cur_ret_type_str, libbs_header.type, 1) + new_prototype = old_prototype.replace(cur_ret_type_str, bs_header.type, 1) success = bool( ida_typeinf.apply_tinfo(func_addr, convert_type_str_to_ida_type(new_prototype), ida_typeinf.TINFO_DEFINITE) ) @@ -331,21 +331,24 @@ def set_function_header(libbs_header: libbs.artifacts.FunctionHeader, exit_on_ba # types_to_change = {} - for idx, libbs_arg in libbs_header.args.items(): + for idx, bs_arg in bs_header.args.items(): + if not bs_arg: + continue + if idx >= len(cur_ida_func.args): break cur_ida_arg = cur_ida_func.args[idx] + # record the type to change + if bs_arg.type and bs_arg.type != cur_ida_arg.type: + types_to_change[idx] = (cur_ida_arg.type, bs_arg.type) + # change the name - if libbs_arg.name and libbs_arg.name != cur_ida_arg.name: - success = ida_code_view.rename_lvar(ida_code_view.cfunc.arguments[idx], libbs_arg.name, 1) + if bs_arg.name and bs_arg.name != cur_ida_arg.name: + success = ida_code_view.rename_lvar(ida_code_view.cfunc.arguments[idx], bs_arg.name, 1) data_changed |= success - # record the type to change - if libbs_arg.type and libbs_arg.type != cur_ida_arg.type: - types_to_change[idx] = (cur_ida_arg.type, libbs_arg.type) - # crazy prototype parsing func_prototype = str(ida_code_view.cfunc.type).replace("(", f" {func_name}(", 1) proto_split = func_prototype.split("(", maxsplit=1) @@ -475,9 +478,6 @@ def set_stack_variable(svar: StackVariable, decompiler_available=True, **kwargs) l.warning(f"Function {svar.addr:x} does not have an associated function frame. Stopping sync here!") return False - if svar.name and ida_struct.set_member_name(frame, svar.offset, svar.name): - changes |= True - if svar.type: ida_type = convert_type_str_to_ida_type(svar.type) if ida_type is None: @@ -485,6 +485,12 @@ def set_stack_variable(svar: StackVariable, decompiler_available=True, **kwargs) return changes changes |= set_stack_vars_types({svar.offset: ida_type}, ida_code_view) + if changes: + ida_code_view.cfunc.refresh_func_ctext() + + frame = idaapi.get_frame(svar.addr) + if svar.name and ida_struct.set_member_name(frame, svar.offset, svar.name): + changes |= True return changes @@ -500,21 +506,27 @@ def set_ida_comment(addr, cmt, decompiled=False): return False rpt = 1 + ida_code_view = None + if decompiled: + try: + ida_code_view = acquire_pseudocode_vdui(func.start_ea) + except Exception: + pass # function comment if addr == func.start_ea: idc.set_func_cmt(addr, cmt, rpt) + if ida_code_view: + ida_code_view.refresh_view(True) return True # a comment in decompilation elif decompiled: - try: - cfunc = idaapi.decompile(addr) - except Exception: + if ida_code_view is None: ida_bytes.set_cmt(addr, cmt, rpt) return True - eamap = cfunc.get_eamap() + eamap = ida_code_view.cfunc.get_eamap() decomp_obj_addr = eamap[addr][0].ea tl = idaapi.treeloc_t() @@ -523,18 +535,17 @@ def set_ida_comment(addr, cmt, decompiled=False): tl.ea = a for itp in range(idaapi.ITP_SEMI, idaapi.ITP_COLON): tl.itp = itp - cfunc.set_user_cmt(tl, cmt) - cfunc.save_user_cmts() - cfunc.refresh_func_ctext() + ida_code_view.cfunc.set_user_cmt(tl, cmt) + ida_code_view.cfunc.save_user_cmts() + ida_code_view.cfunc.refresh_func_ctext() # attempt to set until it does not fail (orphan itself) - if not cfunc.has_orphan_cmts(): - cfunc.save_user_cmts() + if not ida_code_view.cfunc.has_orphan_cmts(): + ida_code_view.cfunc.save_user_cmts() + ida_code_view.refresh_view(True) return True - cfunc.del_orphan_cmts() - + ida_code_view.cfunc.del_orphan_cmts() return False - # a comment in disassembly else: ida_bytes.set_cmt(addr, cmt, rpt) @@ -609,16 +620,19 @@ def set_stack_vars_types(var_type_dict, ida_code_view) -> bool: data_changed = False fixed_point = False + func_addr = ida_code_view.cfunc.entry_ea while not fixed_point: fixed_point = True for lvar in ida_code_view.cfunc.lvars: - cur_off = lvar.location.stkoff() - ida_code_view.cfunc.get_stkoff_delta() - if lvar.is_stk_var() and cur_off in var_type_dict: - if str(lvar.type()) != str(var_type_dict[cur_off]): - data_changed |= ida_code_view.set_lvar_type(lvar, var_type_dict.pop(cur_off)) - fixed_point = False - # make sure to break, in case the size of lvars array has now changed - break + if lvar.is_stk_var(): + # TODO: this algorithm may need be corrected for programs with func args on the stack + cur_off = abs(ida_to_bs_stack_offset(func_addr, lvar.location.stkoff())) + if cur_off in var_type_dict: + if str(lvar.type()) != str(var_type_dict[cur_off]): + data_changed |= ida_code_view.set_lvar_type(lvar, var_type_dict.pop(cur_off)) + fixed_point = False + # make sure to break, in case the size of lvars array has now changed + break return data_changed @@ -852,6 +866,11 @@ def set_enum(bs_enum: Enum): # IDA GUI r/w # +@execute_write +def get_image_base(): + return idaapi.get_imagebase() + + @execute_write def acquire_pseudocode_vdui(addr): """ @@ -1009,7 +1028,7 @@ def __init__(self, *args, name=None, comment=None, interface=None, **kwargs): self.interface: "IDAInterface" = interface def init(self): - self.interface._init_ui_hooks() + self.interface._init_gui_hooks() return idaapi.PLUGIN_KEEP def run(self, arg): diff --git a/libbs/decompilers/ida/hooks.py b/libbs/decompilers/ida/hooks.py index f4bd4a2d..3fb6f02a 100644 --- a/libbs/decompilers/ida/hooks.py +++ b/libbs/decompilers/ida/hooks.py @@ -21,11 +21,8 @@ # # ---------------------------------------------------------------------------- -import threading -from functools import wraps import logging from typing import TYPE_CHECKING -import time from PyQt5 import QtCore from PyQt5.QtGui import QKeyEvent @@ -43,7 +40,7 @@ from . import compat from libbs.artifacts import ( - FunctionHeader, FunctionArgument, StackVariable, + FunctionHeader, StackVariable, Comment, GlobalVariable, Enum, Struct ) @@ -60,19 +57,6 @@ IDA_CMT_TYPES = {IDA_CMT_CMT, IDA_EXTRA_CMT, IDA_RANGE_CMT} -# -# Decorators -# - -def disable_while_writing(f): - @wraps(f) - def _disable_while_writing(self, *args, **kwargs): - if not self.interface.artifact_write_lock.locked(): - return f(self, *args, **kwargs) - - return _disable_while_writing - - # # IDA GUI Hooks # @@ -141,11 +125,9 @@ def __init__(self, interface): self.interface: "IDAInterface" = interface self._seen_function_prototypes = {} - @disable_while_writing def local_types_changed(self): return 0 - @disable_while_writing def ti_changed(self, ea, type_, fields): pfn = ida_funcs.get_func(ea) # only record return type changes @@ -175,19 +157,16 @@ def ida_enum_changed(self, enum_id, new_name=None, deleted=False): self.interface.enum_changed(_enum, deleted=deleted) - @disable_while_writing def enum_created(self, enum): self.ida_enum_changed(enum) return 0 # XXX - use enum_deleted(self, id) instead? - @disable_while_writing def deleting_enum(self, id): self.ida_enum_changed(id, deleted=True) return 0 # XXX - use enum_renamed(self, id) instead? - @disable_while_writing def renaming_enum(self, id, is_enum, newname): enum_id = id if not is_enum: @@ -199,21 +178,17 @@ def renaming_enum(self, id, is_enum, newname): self.ida_enum_changed(enum_id, new_name=newname) return 0 - @disable_while_writing def enum_bf_changed(self, id): return 0 - @disable_while_writing def enum_cmt_changed(self, tid, repeatable_cmt): return 0 - @disable_while_writing def enum_member_created(self, id, cid): self.ida_enum_changed(id) return 0 # XXX - use enum_member_deleted(self, id, cid) instead? - @disable_while_writing def deleting_enum_member(self, id, cid): self.ida_enum_changed(id) return 0 @@ -270,7 +245,6 @@ def ida_stack_var_changed(self, sptr, mptr): StackVariable(bs_offset, new_name, type_str, size, func_addr) ) - @disable_while_writing def struc_created(self, tid): sptr = ida_struct.get_struc(tid) if not sptr.is_frame(): @@ -279,14 +253,12 @@ def struc_created(self, tid): return 0 # XXX - use struc_deleted(self, struc_id) instead? - @disable_while_writing def deleting_struc(self, sptr): if not sptr.is_frame(): self.ida_struct_changed(sptr.id, deleted=True) return 0 - @disable_while_writing def struc_align_changed(self, sptr): if not sptr.is_frame(): self.ida_struct_changed(sptr.id) @@ -294,7 +266,6 @@ def struc_align_changed(self, sptr): return 0 # XXX - use struc_renamed(self, sptr) instead? - @disable_while_writing def renaming_struc(self, id, oldname, newname): sptr = ida_struct.get_struc(id) if not sptr.is_frame(): @@ -304,28 +275,24 @@ def renaming_struc(self, id, oldname, newname): self.ida_struct_changed(id, new_name=newname) return 0 - @disable_while_writing def struc_expanded(self, sptr): if not sptr.is_frame(): self.ida_struct_changed(sptr.id) return 0 - @disable_while_writing def struc_member_created(self, sptr, mptr): if not sptr.is_frame(): self.ida_struct_changed(sptr.id) return 0 - @disable_while_writing def struc_member_deleted(self, sptr, off1, off2): if not sptr.is_frame(): self.ida_struct_changed(sptr.id) return 0 - @disable_while_writing def struc_member_renamed(self, sptr, mptr): if sptr.is_frame(): self.ida_stack_var_changed(sptr, mptr) @@ -334,7 +301,6 @@ def struc_member_renamed(self, sptr, mptr): return 0 - @disable_while_writing def struc_member_changed(self, sptr, mptr): if sptr.is_frame(): self.ida_stack_var_changed(sptr, mptr) @@ -343,7 +309,6 @@ def struc_member_changed(self, sptr, mptr): return 0 - @disable_while_writing def renamed(self, ea, new_name, local_name): # ignore any changes landing here for structs and stack vars if ida_struct.is_member_id(ea) or ida_struct.get_struc(ea) or ida_enum.get_enum_name(ea): @@ -383,7 +348,6 @@ def ida_comment_changed(self, comment: str, address: int, cmt_type: str): return 0 - @disable_while_writing def cmt_changed(self, ea, repeatable_cmt): if repeatable_cmt: cmt = ida_bytes.get_cmt(ea, repeatable_cmt) @@ -391,14 +355,12 @@ def cmt_changed(self, ea, repeatable_cmt): self.ida_comment_changed(cmt, ea, IDA_CMT_CMT) return 0 - @disable_while_writing def range_cmt_changed(self, kind, a, cmt, repeatable): cmt = idc.get_func_cmt(a.start_ea, repeatable) if cmt: self.ida_comment_changed(cmt, a.start_ea, IDA_RANGE_CMT) return 0 - @disable_while_writing def extra_cmt_changed(self, ea, line_idx, cmt): cmt = ida_bytes.get_cmt(ea, 0) if cmt: @@ -409,7 +371,6 @@ def extra_cmt_changed(self, ea, line_idx, cmt): # Unused handlers, to be implemented eventually # - @disable_while_writing def struc_cmt_changed(self, id, repeatable_cmt): """ fullname = ida_struct.get_struc_name(id) @@ -422,11 +383,9 @@ def struc_cmt_changed(self, id, repeatable_cmt): """ return 0 - @disable_while_writing def sgr_changed(self, start_ea, end_ea, regnum, value, old_value, tag): return 0 - @disable_while_writing def byte_patched(self, ea, old_value): return 0 diff --git a/libbs/decompilers/ida/interface.py b/libbs/decompilers/ida/interface.py index 2596913b..4e101c2b 100755 --- a/libbs/decompilers/ida/interface.py +++ b/libbs/decompilers/ida/interface.py @@ -1,15 +1,13 @@ -import threading import logging from typing import Dict, Optional, List from collections import OrderedDict, defaultdict -from functools import wraps import idc import idaapi import ida_hexrays import libbs -from libbs.api.decompiler_interface import DecompilerInterface, artifact_write_event +from libbs.api.decompiler_interface import DecompilerInterface from libbs.artifacts import ( StackVariable, Function, FunctionHeader, Struct, Comment, GlobalVariable, Enum, Patch, Artifact ) @@ -43,13 +41,56 @@ def __init__(self, **kwargs): # GUI properties self._updated_ctx = None + def _init_gui_hooks(self): + """ + This function can only be called from inside the compat.GenericIDAPlugin and is meant for IDA code which + should be run as a plugin. + """ + self._ui_hooks = [ + ScreenHook(self), + ContextMenuHooks(self, menu_strs=self._ctx_menu_names), + IDPHooks(self), + ] + for hook in self._ui_hooks: + hook.hook() + + def _init_gui_plugin(self, *args, **kwargs): + return compat.GenericIDAPlugin(*args, name=self._plugin_name, interface=self, **kwargs) + + # + # GUI + # + + def gui_ask_for_string(self, question, title="Plugin Question") -> str: + resp = idaapi.ask_str("", 0, question) + return resp if resp else "" + + def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: + # Function explaining action + explain_action = idaapi.action_desc_t( + name, + action_string, + compat.GenericAction(name, callback_func), + "", + action_string, + 199 + ) + idaapi.register_action(explain_action) + idaapi.attach_action_to_menu( + f"Edit/{category}/{name}" if category else f"Edit/{name}", + name, + idaapi.SETMENU_APP + ) + self._ctx_menu_names.append((name, category or "")) + return True + # - # Controller Interaction + # Mandatory API # @property def binary_base_addr(self) -> int: - return idaapi.get_imagebase() + return compat.get_image_base() @property def binary_hash(self) -> str: @@ -60,6 +101,7 @@ def binary_path(self) -> Optional[str]: return compat.get_binary_path() def get_func_size(self, func_addr) -> int: + func_addr = self.art_lifter.lower_addr(func_addr) return compat.get_func_size(func_addr) @property @@ -90,6 +132,7 @@ def xrefs_to(self, artifact: Artifact) -> List[Artifact]: return xrefs def get_decompilation_object(self, function: Function) -> Optional[object]: + function = self.art_lifter.lower(function) dec = idaapi.decompile(function.addr) if dec is None: return None @@ -123,68 +166,19 @@ def stop_artifact_watchers(self): for hook in self._artifact_watcher_hooks: hook.unhook() - def gui_ask_for_string(self, question, title="Plugin Question") -> str: - resp = idaapi.ask_str("", 0, question) - return resp if resp else "" - - def _init_ui_hooks(self): - """ - This function can only be called from inside the compat.GenericIDAPlugin and is meant for IDA code which - should be run as a plugin. - """ - self._ui_hooks = [ - ScreenHook(self), - ContextMenuHooks(self, menu_strs=self._ctx_menu_names), - IDPHooks(self), - ] - for hook in self._ui_hooks: - hook.hook() - - def _init_gui_plugin(self, *args, **kwargs): - return compat.GenericIDAPlugin(*args, name=self._plugin_name, interface=self, **kwargs) - - def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool: - # Function explaining action - explain_action = idaapi.action_desc_t( - name, - action_string, - compat.GenericAction(name, callback_func), - "", - action_string, - 199 - ) - idaapi.register_action(explain_action) - idaapi.attach_action_to_menu( - f"Edit/{category}/{name}" if category else f"Edit/{name}", - name, - idaapi.SETMENU_APP - ) - self._ctx_menu_names.append((name, category or "")) - return True - - def _ea_to_func(self, addr): - if not addr or addr == idaapi.BADADDR: - return None - - func_addr = compat.ida_func_addr(addr) - if func_addr is None: - return None - - func = libbs.artifacts.Function( - func_addr, 0, header=FunctionHeader(compat.get_func_name(func_addr), func_addr) - ) - return func - - def active_context(self): + def gui_active_context(self): if not self._init_plugin: - return self._ea_to_func(compat.get_screen_ea()) + bs_func = self._ea_to_func(compat.get_screen_ea()) + if bs_func is None: + return None - return self._updated_ctx + bs_func.addr = self.art_lifter.lift_addr(bs_func.addr) + return bs_func - def update_active_context(self, addr): - self._updated_ctx = self._ea_to_func(addr) + return self._updated_ctx - def goto_address(self, func_addr) -> None: + def gui_goto(self, func_addr) -> None: + func_addr = self.art_lifter.lower_addr(func_addr) compat.jumpto(func_addr) # @@ -201,6 +195,7 @@ def local_variable_names(self, func: Function) -> List[str]: @requires_decompilation def rename_local_variables_by_names(self, func: Function, name_map: Dict[str, str]) -> bool: + func = self.art_lifter.lower(func) return compat.rename_local_variables_by_names(func, name_map) # @@ -329,6 +324,28 @@ def _set_function_header(self, fheader: FunctionHeader, **kwargs) -> bool: # utils # + def update_active_context(self, addr): + bs_func = self._ea_to_func(addr) + if bs_func is None: + return + + bs_func.addr = self.art_lifter.lift_addr(bs_func.addr) + self._updated_ctx = bs_func + + @staticmethod + def _ea_to_func(addr): + if not addr or addr == idaapi.BADADDR: + return None + + func_addr = compat.ida_func_addr(addr) + if func_addr is None: + return None + + func = libbs.artifacts.Function( + func_addr, 0, header=FunctionHeader(compat.get_func_name(func_addr), func_addr) + ) + return func + @staticmethod def _collect_continuous_patches(min_addr=None, max_addr=None, stop_after_first=False) -> Dict[int, Patch]: patches = {} diff --git a/libbs/plugin_installer.py b/libbs/plugin_installer.py index 8b09e7f7..16ba3daf 100644 --- a/libbs/plugin_installer.py +++ b/libbs/plugin_installer.py @@ -46,7 +46,7 @@ class PluginInstaller: ) def __init__(self, targets=None, target_install_paths=None): - self.targets = targets or self.DECOMPILERS+self.DEBUGGERS + self.targets = targets if targets is not None else self.DECOMPILERS+self.DEBUGGERS self._home = Path(os.getenv("HOME") or "~/").expanduser().absolute() self.target_install_paths = target_install_paths or {} #or self._populate_installs_from_config() self._successful_installs = {} @@ -245,33 +245,14 @@ def install_gdb(self, path=None, interactive=True): class LibBSPluginInstaller(PluginInstaller): def __init__(self, targets=None, target_install_paths=None): - super().__init__(targets=targets or PluginInstaller.DECOMPILERS, target_install_paths=target_install_paths) + targets = targets or PluginInstaller.DECOMPILERS + super().__init__(targets=targets, target_install_paths=target_install_paths) self._libbs_plugins_path = self.find_pkg_files("libbs").joinpath("decompiler_stubs") def display_prologue(self): print(textwrap.dedent(""" Now installing LibBS plugins for all supported decompilers...""")) - def install_ida(self, path=None, interactive=True): - ida_plugin_path = super().install_ida(path=path, interactive=interactive) - if ida_plugin_path is None: - return None - - src_ida_libbs_py = self._libbs_plugins_path.joinpath("ida_libbs.py") - dst_ida_libbs_py = ida_plugin_path.joinpath("ida_libbs.py") - self.link_or_copy(src_ida_libbs_py, dst_ida_libbs_py) - return ida_plugin_path - - def install_angr(self, path=None, interactive=True): - angr_plugin_path = super().install_angr(path=path, interactive=interactive) - if angr_plugin_path is None: - return None - - src_angr_libbs_pkg = self._libbs_plugins_path.joinpath("angr_libbs") - dst_angr_libbs_pkg = angr_plugin_path.joinpath("angr_libbs") - self.link_or_copy(src_angr_libbs_pkg, dst_angr_libbs_pkg, is_dir=True) - return angr_plugin_path - def install_ghidra(self, path=None, interactive=True): ghidra_path = super().install_ghidra(path=path, interactive=interactive) if ghidra_path is None: @@ -290,13 +271,3 @@ def install_ghidra(self, path=None, interactive=True): self.link_or_copy(src_script, dst_ghidra_script) self.link_or_copy(src_script_shutdown, dst_script_shutdown) return ghidra_path - - def install_binja(self, path=None, interactive=True): - binja_plugin_path = super().install_binja(path=path, interactive=interactive) - if binja_plugin_path is None: - return None - - src_path = self._libbs_plugins_path.joinpath("binja_libbs") - dst_path = binja_plugin_path.joinpath("binja_libbs") - self.link_or_copy(src_path, dst_path, is_dir=True) - return binja_plugin_path