diff --git a/README.md b/README.md index 6fc956e2..b486b967 100644 --- a/README.md +++ b/README.md @@ -65,11 +65,3 @@ for func_addr, light_func in deci.functions.items(): Notice, when using the `items` function the function is `light`, meaning it does not contain stack vars and other info. This also means using `keys`, `values`, or `list` on an artifact dictionary will have the same affect. - -### IDA -- [ ] G/S Comments - -### Ghidra -- [ ] Change Callbacks -- [ ] Get/Set Comments - diff --git a/examples/change_watcher_plugin/bs_change_watcher/__init__.py b/examples/change_watcher_plugin/bs_change_watcher/__init__.py index 041b122c..e1fb86aa 100644 --- a/examples/change_watcher_plugin/bs_change_watcher/__init__.py +++ b/examples/change_watcher_plugin/bs_change_watcher/__init__.py @@ -22,7 +22,7 @@ def create_plugin(*args, **kwargs): gui_init_kwargs=kwargs ) # create a function to print a string in the decompiler console - decompiler_printer = lambda *x, **y: deci.print(f"Changed {x}{y}") + decompiler_printer = lambda *x: deci.print(f"Changed {x}") # register the callback for all the types we want to print deci.artifact_write_callbacks = { typ: [decompiler_printer] for typ in (FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment,) @@ -32,7 +32,7 @@ def create_plugin(*args, **kwargs): deci.gui_register_ctx_menu( "StartArtifactChangeWatcher", "Start watching artifact changes", - lambda: deci.start_artifact_watchers(), + lambda *x, **y: deci.start_artifact_watchers(), category="ArtifactChangeWatcher" ) diff --git a/libbs/__init__.py b/libbs/__init__.py index c413a693..a97367c0 100644 --- a/libbs/__init__.py +++ b/libbs/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.21.1" +__version__ = "0.22.0" import logging logging.getLogger("libbs").addHandler(logging.NullHandler()) diff --git a/libbs/artifacts/comment.py b/libbs/artifacts/comment.py index 4e6f0401..1ddea895 100644 --- a/libbs/artifacts/comment.py +++ b/libbs/artifacts/comment.py @@ -14,9 +14,8 @@ class Comment(Artifact): def __init__(self, addr, comment, func_addr=None, decompiled=False, last_change=None): super(Comment, self).__init__(last_change=last_change) - if comment: - self.comment = self.linewrap_comment(comment) + self.comment = self.linewrap_comment(comment) if comment else "" self.decompiled = decompiled self.addr = addr # type: int self.func_addr = func_addr diff --git a/libbs/decompilers/binja/hooks.py b/libbs/decompilers/binja/hooks.py index a613a487..e8fa0947 100644 --- a/libbs/decompilers/binja/hooks.py +++ b/libbs/decompilers/binja/hooks.py @@ -1,29 +1,16 @@ -import re - -from binaryninjaui import ( - UIContext, - DockHandler, - DockContextHandler, - UIAction, - UIActionHandler, - Menu, - SidebarWidget, - SidebarWidgetType, - Sidebar, -) +from collections import defaultdict +from typing import Dict + import binaryninja -from binaryninja import PluginCommand from binaryninja.types import StructureType, EnumerationType from binaryninja import SymbolType from binaryninja.binaryview import BinaryDataNotification -from collections import defaultdict import logging from .interface import BinjaInterface -from binsync.artifacts import ( - Artifact, - Function, FunctionHeader, FunctionArgument, Comment, GlobalVariable, Enum, StructMember +from libbs.artifacts import ( + FunctionHeader, FunctionArgument, GlobalVariable, StackVariable, Comment ) l = logging.getLogger(__name__) @@ -36,104 +23,189 @@ class DataMonitor(BinaryDataNotification): def __init__(self, view, interface): super().__init__() - self._view = view - self._interface = interface - self._func_addr_requested = None - self._func_before_change = None + self._bv = view + self._interface: BinjaInterface = interface + self._changing_func_addr = None + self._changing_func_pre_change = None + self._seen_comments = defaultdict(dict) def function_updated(self, view, func_): - if self._interface.sync_lock.locked() or self._func_before_change is None: - # TODO: add support for creating functions here - return + # updates that occur without a service request are requests for comment changes + if self._changing_func_pre_change is None: + # + # comments + # + + func_addr = func_.start + current_comments = dict(func_.comments) + if self._seen_comments[func_addr] != current_comments: + # comments changed + old_comments = self._seen_comments + new_comments = current_comments + + for addr, old_comment in old_comments.items(): + new_comment = new_comments.get(addr, None) + if new_comment == old_comment: + continue + + self._interface.comment_changed( + self._interface.art_lifter.lift( + Comment( + addr, str(new_comment) if new_comment else "", decompiled=True, func_addr=func_addr + ) + ) + ) + + for addr, new_comment in new_comments.items(): + if addr in old_comments: + continue + + if new_comment: + self._interface.comment_changed( + self._interface.art_lifter.lift( + Comment(addr, str(new_comment), decompiled=True, func_addr=func_addr) + ) + ) + + self._seen_comments[func_addr] = current_comments # service requested function only - if self._func_addr_requested == func_.start: - l.debug(f"Update on {hex(self._func_addr_requested)} being processed...") - self._func_addr_requested = None + if self._changing_func_pre_change is not None and self._changing_func_addr == func_.start: + l.debug(f"Update on {hex(self._changing_func_addr)} being processed...") + self._changing_func_addr = None # convert to libbs Function type for diffing bn_func = view.get_function_at(func_.start) bs_func = BinjaInterface.bn_func_to_bs(bn_func) + current_comments = dict(bn_func.comments) # # header - # NOTE: function name done inside symbol update hook # # check if the headers differ - if self._func_before_change.header.diff(bs_func.header): - self._interface.schedule_job( - self._interface.push_artifact, - bs_func.header - ) - + # NOTE: function name done inside symbol update hook + if self._changing_func_pre_change.header.diff(bs_func.header): + old_header: FunctionHeader = self._changing_func_pre_change.header + new_header: FunctionHeader = bs_func.header + + old_args = old_header.args or {} + for off, old_arg in old_args.items(): + new_arg = new_header.args.get(off, None) + if new_arg is None: + # TODO: support deleting args + continue + + if old_arg == new_arg: + continue + + diff_arg = FunctionArgument(off, None, None, None) + if old_arg.name != new_arg.name: + diff_arg.name = str(new_arg.name) + + if old_arg.type != new_arg.type: + diff_arg.type = str(new_arg.type) + + if old_arg.size != new_arg.size: + diff_arg.size = int(new_arg.size) + + self._interface.function_header_changed( + self._interface.art_lifter.lift( + FunctionHeader(None, old_header.addr, args={off: diff_arg}) + ) + ) + + # new func args added to header + for off, new_arg in bs_func.args.items(): + if off in old_args: + continue + + self._interface.function_header_changed( + self._interface.art_lifter.lift( + FunctionHeader(None, old_header.addr, args={ + off: FunctionArgument(off, str(new_arg.name), str(new_arg.type), int(new_arg.size)) + } + ) + ) + ) + # # stack vars # - for off, var in self._func_before_change.stack_vars.items(): - if off in bs_func.stack_vars and var != bs_func.stack_vars[off]: - new_var = bs_func.stack_vars[off] - if re.match(r"var_\d+[_\d+]{0,1}", new_var.name) \ - or new_var.name in {'__saved_rbp', '__return_addr',}: + header_args_names = set([arg.name for arg in bs_func.header.args.values()]) + if self._changing_func_pre_change.stack_vars != bs_func.stack_vars: + old_svs: Dict[int, StackVariable] = self._changing_func_pre_change.stack_vars + new_svs: Dict[int, StackVariable] = bs_func.stack_vars + + for off, old_sv in old_svs.items(): + new_sv = new_svs.get(off, None) + if new_sv is None or new_sv.name in header_args_names: continue - self._interface.schedule_job( - self._interface.push_artifact, - new_var + if old_sv == new_sv: + continue + + diff_sv = StackVariable(off, None, None, old_sv.size, bs_func.addr) + if old_sv.name != new_sv.name: + diff_sv.name = str(new_sv.name) + + if old_sv.type != new_sv.type: + diff_sv.type = str(new_sv.type) + + self._interface.stack_variable_changed( + self._interface.art_lifter.lift(diff_sv) + ) + + for off, new_sv in new_svs.items(): + if off in old_svs or new_sv.name in header_args_names: + continue + + self._interface.stack_variable_changed( + self._interface.art_lifter.lift( + StackVariable(off, str(new_sv.name), str(new_sv.type), new_sv.size, bs_func.addr) + ) ) - self._func_before_change = None + self._changing_func_pre_change = None def function_update_requested(self, view, func): - if not self._interface.sync_lock.locked() and self._func_addr_requested is None: + if self._changing_func_addr is None: l.debug(f"Update on {func} requested...") - self._func_addr_requested = func.start - self._func_before_change = BinjaInterface.bn_func_to_bs(func) + self._changing_func_addr = func.start + self._changing_func_pre_change = BinjaInterface.bn_func_to_bs(func) def symbol_updated(self, view, sym): - if self._interface.sync_lock.locked(): - return - l.debug(f"Symbol update Requested on {sym}...") if sym.type == SymbolType.FunctionSymbol: l.debug(f" -> Function Symbol") func = view.get_function_at(sym.address) bs_func = BinjaInterface.bn_func_to_bs(func) - self._interface.schedule_job( - self._interface.push_artifact, - FunctionHeader(sym.name, sym.address, type_=bs_func.header.type, args=bs_func.header.args) + self._interface.function_header_changed( + self._interface.art_lifter.lift(FunctionHeader(bs_func.name, bs_func.addr)) ) elif sym.type == SymbolType.DataSymbol: l.debug(f" -> Data Symbol") var: binaryninja.DataVariable = view.get_data_var_at(sym.address) - - self._interface.schedule_job( - self._interface.push_artifact, - GlobalVariable(var.address, var.name, type_=str(var.type), size=var.type.width) + self._interface.global_variable_changed( + self._interface.art_lifter.lift( + GlobalVariable(int(sym.address), str(var.name), type_=str(var.type), size=int(var.type.width)) + ) ) else: - l.debug(f" -> Other Symbol: {sym.type}") + print(f" -> Other Symbol: {sym.type}") pass def type_defined(self, view, name, type_): l.debug(f"Type Defined: {name} {type_}") name = str(name) - if self._interface.sync_lock.locked(): - return - if isinstance(type_, StructureType): bs_struct = BinjaInterface.bn_struct_to_bs(name, type_) - self._interface.schedule_job( - self._interface.push_artifact, - bs_struct + self._interface.struct_changed( + self._interface.art_lifter.lift(bs_struct) ) - elif isinstance(type_, EnumerationType): bs_enum = BinjaInterface.bn_enum_to_bs(name, type_) - self._interface.schedule_job(self._interface.push_artifact, bs_enum) - - -def start_data_monitor(view, controller): - notification = DataMonitor(view, controller) - view.register_notification(notification) + self._interface.enum_changed( + self._interface.art_lifter.lift(bs_enum) + ) diff --git a/libbs/decompilers/binja/interface.py b/libbs/decompilers/binja/interface.py index a4011e08..0d888189 100644 --- a/libbs/decompilers/binja/interface.py +++ b/libbs/decompilers/binja/interface.py @@ -1,23 +1,22 @@ import threading import functools -from typing import Dict, Tuple, Optional, Iterable, Any, List +from typing import Dict, Optional, Any, List import hashlib import logging -from binaryninja import SymbolType -from binaryninjaui import ( - UIContext, - DockContextHandler, - UIActionHandler, - Menu, -) -import binaryninja -from binaryninja import PluginCommand -from binaryninja import lineardisassembly -from binaryninja.function import DisassemblySettings -from binaryninja.enums import DisassemblyOption, LinearDisassemblyLineType, InstructionTextTokenType -from binaryninja.enums import VariableSourceType -from binaryninja.types import StructureType, EnumerationType +BN_AVAILABLE = True +try: + import binaryninja +except ImportError: + BN_AVAILABLE = False + +if BN_AVAILABLE: + from binaryninja import SymbolType, PluginCommand, lineardisassembly + from binaryninjaui import UIContext + from binaryninja.function import DisassemblySettings + from binaryninja.enums import DisassemblyOption, LinearDisassemblyLineType, InstructionTextTokenType + from binaryninja.enums import VariableSourceType + from binaryninja.types import StructureType, EnumerationType from libbs.api.decompiler_interface import DecompilerInterface import libbs @@ -53,15 +52,83 @@ def thunk(): return wrapper -# -# Controller -# - class BinjaInterface(DecompilerInterface): def __init__(self, bv=None, **kwargs): - self.bv: binaryninja.BinaryView = bv + self.bv: "binaryninja.BinaryView" = bv + self._data_monitor = None super(BinjaInterface, self).__init__(name="binja", artifact_lifter=BinjaArtifactLifter(self), **kwargs) + def _init_headless_components(self, *args, check_dec_path=True, **kwargs): + super()._init_headless_components(*args, check_dec_path=False, **kwargs) + if not BN_AVAILABLE: + raise ImportError("Unable to import binaryninja module. Are you sure you have it installed with an enterprise license?") + + self.bv = binaryninja.load(str(self._binary_path)) + + def _init_gui_components(self, *args, **kwargs): + if binaryninja.core_ui_enabled(): + super()._init_gui_components(*args, **kwargs) + return True + else: + return False + + def _init_gui_plugin(self, *args, **kwargs): + return self + + def __del__(self): + if self.headless and BN_AVAILABLE: + self.bv.file.close() + + # + # GUI + # + + def gui_active_context(self): + all_contexts = UIContext.allContexts() + if not all_contexts: + return None + + ctx = all_contexts[0] + handler = ctx.contentActionHandler() + if handler is None: + return None + + actionContext = handler.actionContext() + func = actionContext.function + if func is None: + return None + + func_addr = self.art_lifter.lift_addr(func.start) + return libbs.artifacts.Function( + func_addr, 0, header=FunctionHeader(func.name, func_addr) + ) + + def gui_goto(self, func_addr) -> None: + func_addr = self.art_lifter.lower_addr(func_addr) + self.bv.offset = func_addr + + def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: + # TODO: this needs to have a wrapper function that passes the bv to the current deci + # correct name, category, and action_string for Binja + action_string = action_string.replace("/", "\\") + category = category.replace("/", "\\") if category else "" + + PluginCommand.register_for_address( + f"{category}\\{action_string}", + action_string, + callback_func, + is_valid=self.is_bn_func + ) + return True + + def gui_ask_for_string(self, question, title="Plugin Question") -> str: + resp = binaryninja.get_text_line_input(question, title) + return resp.decode() if resp else "" + + # + # Public API + # + @property def binary_base_addr(self) -> int: return self.bv.start @@ -84,6 +151,7 @@ def binary_path(self) -> Optional[str]: return None def get_func_size(self, func_addr) -> int: + func_addr = self.art_lifter.lower_addr(func_addr) func = self.bv.get_function_at(func_addr) if not func: return 0 @@ -110,6 +178,7 @@ def xrefs_to(self, artifact: Artifact) -> List[Artifact]: return xrefs def get_func_containing(self, addr: int) -> Optional[Function]: + addr = self.art_lifter.lower_addr(addr) funcs = self.bv.get_functions_containing(addr) if not funcs: return None @@ -159,7 +228,7 @@ def _decompile(self, function: Function) -> Optional[str]: return decomp def local_variable_names(self, func: Function) -> List[str]: - bn_func = self.addr_to_bn_func(self.bv, func.addr) + bn_func = self.addr_to_bn_func(self.bv, self.art_lifter.lower_addr(func.addr)) if bn_func is None: return [] @@ -167,7 +236,7 @@ def local_variable_names(self, func: Function) -> List[str]: @background_and_wait def rename_local_variables_by_names(self, func: Function, name_map: Dict[str, str]) -> bool: - bn_func = self.addr_to_bn_func(self.bv, func.addr) + bn_func = self.addr_to_bn_func(self.bv, self.art_lifter.lower_addr(func.addr)) if bn_func is None: return False @@ -194,52 +263,21 @@ def get_decompilation_object(self, function: Function) -> Optional[object]: """ return None - # - # GUI API - # - - def _init_gui_plugin(self, *args, **kwargs): - return self - - def gui_active_context(self): - all_contexts = UIContext.allContexts() - if not all_contexts: - return None - - ctx = all_contexts[0] - handler = ctx.contentActionHandler() - if handler is None: - return None - - actionContext = handler.actionContext() - func = actionContext.function - if func is None: - return None - - return libbs.artifacts.Function( - func.start, 0, header=FunctionHeader(func.name, func.start) - ) - - def gui_ask_for_string(self, question, title="Plugin Question") -> str: - resp = binaryninja.get_text_line_input(question, title) - return resp.decode() if resp else "" - - def gui_register_ctx_menu(self, name, action_string, callback_func, category=None) -> bool: - # TODO: this needs to have a wrapper function that passes the bv to the current deci - # correct name, category, and action_string for Binja - action_string = action_string.replace("/", "\\") - category = category.replace("/", "\\") if category else "" - - PluginCommand.register_for_address( - f"{category}\\{action_string}", - action_string, - callback_func, - is_valid=self.is_bn_func - ) - return True - - def gui_goto(self, func_addr) -> None: - self.bv.offset = func_addr + def start_artifact_watchers(self): + if not self._artifact_watchers_started: + from .hooks import DataMonitor + if self.bv is None: + raise RuntimeError("Cannot start artifact watchers without a BinaryView.") + + self._data_monitor = DataMonitor(self.bv, self) + self.bv.register_notification(self._data_monitor) + super().start_artifact_watchers() + + def stop_artifact_watchers(self): + if self._artifact_watchers_started: + self.bv.unregister_notification(self._data_monitor) + self._data_monitor = None + super().stop_artifact_watchers() # # Artifact API diff --git a/libbs/plugin_installer.py b/libbs/plugin_installer.py index 16ba3daf..678295a7 100644 --- a/libbs/plugin_installer.py +++ b/libbs/plugin_installer.py @@ -82,6 +82,8 @@ def install(self, interactive=True, paths_by_target=None): self.install_targets(interactive=interactive) except Exception as e: print(f"Stopping Install... because: {e}") + except KeyboardInterrupt: + print("Goodbye...") self.display_epilogue()