From 4cc47ae35e0b1a05a9f1d5566fe94f7860d0fd4d Mon Sep 17 00:00:00 2001 From: Zion Leonahenahe Basque Date: Wed, 21 Feb 2024 14:29:33 -0700 Subject: [PATCH] Fix a regression in Ghidra backend and installer (#33) * Fix a reintroduced bug in struct setting for Ghidra * move where the bridge is inited * Fix bad lifting * bump --- libbs/__init__.py | 2 +- libbs/api/__init__.py | 2 -- libbs/api/decompiler_interface.py | 5 +++- libbs/decompilers/ghidra/interface.py | 39 +++++++++++++-------------- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/libbs/__init__.py b/libbs/__init__.py index a97367c0..392df32c 100644 --- a/libbs/__init__.py +++ b/libbs/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.22.0" +__version__ = "0.22.1" import logging logging.getLogger("libbs").addHandler(logging.NullHandler()) diff --git a/libbs/api/__init__.py b/libbs/api/__init__.py index 4feff4e3..495ecfc1 100644 --- a/libbs/api/__init__.py +++ b/libbs/api/__init__.py @@ -2,8 +2,6 @@ from .decompiler_interface import DecompilerInterface from .type_parser import CTypeParser, CType -import logging - from .decompiler_interface import ( DecompilerInterface ) diff --git a/libbs/api/decompiler_interface.py b/libbs/api/decompiler_interface.py index 0ebfc6a5..e83e4cf2 100644 --- a/libbs/api/decompiler_interface.py +++ b/libbs/api/decompiler_interface.py @@ -136,6 +136,10 @@ def _init_gui_components(self, *args, **kwargs): def _init_gui_plugin(self, *args, **kwargs): return None + def shutdown(self): + if self._artifact_watchers_started: + self.stop_artifact_watchers() + # # Public API: # These functions are the main API for interacting with the decompiler. In general, every function that takes @@ -255,7 +259,6 @@ def decompile(self, addr: int) -> Optional[str]: # TODO: make this a function call after transitioning decompiler artifacts to LiveState for func_addr, func in self._functions().items(): if func.addr <= addr < (func.addr + func.size): - func_found = True break else: func = None diff --git a/libbs/decompilers/ghidra/interface.py b/libbs/decompilers/ghidra/interface.py index 9ce023dd..94abc961 100644 --- a/libbs/decompilers/ghidra/interface.py +++ b/libbs/decompilers/ghidra/interface.py @@ -56,9 +56,10 @@ def __init__(self, loop_on_plugin=True, **kwargs): self.ghidra: Optional[GhidraAPIWrapper] = None super().__init__(name="ghidra", artifact_lifter=GhidraArtifactLifter(self), supports_undo=True, **kwargs) - # Connect to the remote bridge, assumes Ghidra is already running! + def _init_gui_components(self, *args, **kwargs): if not self.connect_ghidra_bridge(): - raise Exception("Failed to connect to remote Ghidra Bridge. Did you start it first?") + raise Exception("Failed to connect to the Ghidra Bridge. Check the Ghidra GUI for failures!") + super()._init_gui_components(*args, **kwargs) def _init_headless_components(self, *args, **kwargs): if self._headless_dec_path is None: @@ -85,6 +86,10 @@ def _init_headless_components(self, *args, **kwargs): self._headless_script_name ]) + time.sleep(1) + if not self.connect_ghidra_bridge(): + raise Exception(f"Failed to connect to the Ghidra Bridge. Check if the {self._headless_dec_path} binary was ever started.") + def _find_headless_proc(self): for proc in psutil.process_iter(): try: @@ -103,11 +108,10 @@ def _find_headless_proc(self): return proc def shutdown(self): + super().shutdown() self.ghidra.bridge.remote_shutdown() # Wait until headless binary gets shutdown - while True: - if not self._find_headless_proc(): - break + while self._find_headless_proc(): time.sleep(1) self._headless_g_project.cleanup() @@ -151,7 +155,7 @@ def gui_active_context(self): if active_addr != self._last_addr: self._last_addr = active_addr self._last_func = self._gfunc_to_bsfunc(self._get_nearest_function(active_addr)) - self._last_func.addr = self.art_lifter.lower_addr(self._last_func.addr) + self._last_func.addr = self.art_lifter.lift_addr(self._last_func.addr) return self._last_func @@ -183,11 +187,6 @@ def connect_ghidra_bridge(self): self.ghidra = GhidraAPIWrapper(self, connection_timeout=25) return self.ghidra.connected - def decompile(self, addr: int) -> Optional[str]: - # TODO: allow the super to do this again - function = self.art_lifter.lower(self.functions[addr]) - return self._decompile(function) - def _decompile(self, function: Function) -> Optional[str]: dec_obj = self.get_decompilation_object(function) if dec_obj is None: @@ -200,7 +199,7 @@ def _decompile(self, function: Function) -> Optional[str]: return str(dec_func.getC()) def get_decompilation_object(self, function: Function) -> Optional[object]: - return self._ghidra_decompile(self._get_nearest_function(function.addr)) + return self._ghidra_decompile(self._get_nearest_function(self.art_lifter.lower_addr(function.addr))) # # Extra API @@ -359,13 +358,13 @@ def _set_struct(self, struct: Struct, header=True, members=True, **kwargs) -> bo struct: Struct = struct old_ghidra_struct = self._get_struct_by_name('/' + struct.name) data_manager = self.ghidra.currentProgram.getDataTypeManager() - handler = self.ghidra.import_module_object("ghidra.program.model.artifacts", "DataTypeConflictHandler") - structType = self.ghidra.import_module_object("ghidra.program.model.artifacts", "StructureDataType") - byteType = self.ghidra.import_module_object("ghidra.program.model.artifacts", "ByteDataType") + handler = self.ghidra.import_module_object("ghidra.program.model.data", "DataTypeConflictHandler") + structType = self.ghidra.import_module_object("ghidra.program.model.data", "StructureDataType") + byteType = self.ghidra.import_module_object("ghidra.program.model.data", "ByteDataType") ghidra_struct = structType(struct.name, 0) for offset in struct.members: member = struct.members[offset] - ghidra_struct.add(byteType.artifactsType, 1, member.name, "") + ghidra_struct.add(byteType.dataType, 1, member.name, "") ghidra_struct.growStructure(member.size - 1) for dtc in ghidra_struct.getComponents(): if dtc.getFieldName() == member.name: @@ -441,9 +440,9 @@ def _set_enum(self, enum: Enum, **kwargs) -> bool: corrected_enum_name = "/" + enum.name old_ghidra_enum = self.ghidra.currentProgram.getDataTypeManager().getDataType(corrected_enum_name) data_manager = self.ghidra.currentProgram.getDataTypeManager() - handler = self.ghidra.import_module_object("ghidra.program.model.artifacts", "DataTypeConflictHandler") - enumType = self.ghidra.import_module_object("ghidra.program.model.artifacts", "EnumDataType") - categoryPath = self.ghidra.import_module_object("ghidra.program.model.artifacts", "CategoryPath") + handler = self.ghidra.import_module_object("ghidra.program.model.data", "DataTypeConflictHandler") + enumType = self.ghidra.import_module_object("ghidra.program.model.data", "EnumDataType") + categoryPath = self.ghidra.import_module_object("ghidra.program.model.data", "CategoryPath") ghidra_enum = enumType(categoryPath('/'), enum.name, 4) for m_name, m_val in enum.members.items(): ghidra_enum.add(m_name, m_val) @@ -466,7 +465,7 @@ def _enums(self) -> Dict[str, Enum]: names: Optional[List[str]] = self.ghidra.bridge.remote_eval( "[dType.getPathName() " "for dType in currentProgram.getDataTypeManager().getAllDataTypes()" - "if str(type(dType)) == \"\"]" + "if str(type(dType)) == \"\"]" ) return {name[1:]: Enum(name[1:], self._get_enum_members(name)) for name in names if name.count('/') == 1} if names else {}