Skip to content

Commit

Permalink
Fix a regression in Ghidra backend and installer (#33)
Browse files Browse the repository at this point in the history
* Fix a reintroduced bug in struct setting for Ghidra

* move where the bridge is inited

* Fix bad lifting

* bump
  • Loading branch information
mahaloz authored Feb 21, 2024
1 parent 38977fc commit 4cc47ae
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 24 deletions.
2 changes: 1 addition & 1 deletion libbs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.22.0"
__version__ = "0.22.1"

import logging
logging.getLogger("libbs").addHandler(logging.NullHandler())
Expand Down
2 changes: 0 additions & 2 deletions libbs/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from .decompiler_interface import DecompilerInterface
from .type_parser import CTypeParser, CType

import logging

from .decompiler_interface import (
DecompilerInterface
)
Expand Down
5 changes: 4 additions & 1 deletion libbs/api/decompiler_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def _init_gui_components(self, *args, **kwargs):
def _init_gui_plugin(self, *args, **kwargs):
return None

def shutdown(self):
if self._artifact_watchers_started:
self.stop_artifact_watchers()

#
# Public API:
# These functions are the main API for interacting with the decompiler. In general, every function that takes
Expand Down Expand Up @@ -255,7 +259,6 @@ def decompile(self, addr: int) -> Optional[str]:
# TODO: make this a function call after transitioning decompiler artifacts to LiveState
for func_addr, func in self._functions().items():
if func.addr <= addr < (func.addr + func.size):
func_found = True
break
else:
func = None
Expand Down
39 changes: 19 additions & 20 deletions libbs/decompilers/ghidra/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ def __init__(self, loop_on_plugin=True, **kwargs):
self.ghidra: Optional[GhidraAPIWrapper] = None
super().__init__(name="ghidra", artifact_lifter=GhidraArtifactLifter(self), supports_undo=True, **kwargs)

# Connect to the remote bridge, assumes Ghidra is already running!
def _init_gui_components(self, *args, **kwargs):
if not self.connect_ghidra_bridge():
raise Exception("Failed to connect to remote Ghidra Bridge. Did you start it first?")
raise Exception("Failed to connect to the Ghidra Bridge. Check the Ghidra GUI for failures!")
super()._init_gui_components(*args, **kwargs)

def _init_headless_components(self, *args, **kwargs):
if self._headless_dec_path is None:
Expand All @@ -85,6 +86,10 @@ def _init_headless_components(self, *args, **kwargs):
self._headless_script_name
])

time.sleep(1)
if not self.connect_ghidra_bridge():
raise Exception(f"Failed to connect to the Ghidra Bridge. Check if the {self._headless_dec_path} binary was ever started.")

def _find_headless_proc(self):
for proc in psutil.process_iter():
try:
Expand All @@ -103,11 +108,10 @@ def _find_headless_proc(self):
return proc

def shutdown(self):
super().shutdown()
self.ghidra.bridge.remote_shutdown()
# Wait until headless binary gets shutdown
while True:
if not self._find_headless_proc():
break
while self._find_headless_proc():
time.sleep(1)
self._headless_g_project.cleanup()

Expand Down Expand Up @@ -151,7 +155,7 @@ def gui_active_context(self):
if active_addr != self._last_addr:
self._last_addr = active_addr
self._last_func = self._gfunc_to_bsfunc(self._get_nearest_function(active_addr))
self._last_func.addr = self.art_lifter.lower_addr(self._last_func.addr)
self._last_func.addr = self.art_lifter.lift_addr(self._last_func.addr)

return self._last_func

Expand Down Expand Up @@ -183,11 +187,6 @@ def connect_ghidra_bridge(self):
self.ghidra = GhidraAPIWrapper(self, connection_timeout=25)
return self.ghidra.connected

def decompile(self, addr: int) -> Optional[str]:
# TODO: allow the super to do this again
function = self.art_lifter.lower(self.functions[addr])
return self._decompile(function)

def _decompile(self, function: Function) -> Optional[str]:
dec_obj = self.get_decompilation_object(function)
if dec_obj is None:
Expand All @@ -200,7 +199,7 @@ def _decompile(self, function: Function) -> Optional[str]:
return str(dec_func.getC())

def get_decompilation_object(self, function: Function) -> Optional[object]:
return self._ghidra_decompile(self._get_nearest_function(function.addr))
return self._ghidra_decompile(self._get_nearest_function(self.art_lifter.lower_addr(function.addr)))

#
# Extra API
Expand Down Expand Up @@ -359,13 +358,13 @@ def _set_struct(self, struct: Struct, header=True, members=True, **kwargs) -> bo
struct: Struct = struct
old_ghidra_struct = self._get_struct_by_name('/' + struct.name)
data_manager = self.ghidra.currentProgram.getDataTypeManager()
handler = self.ghidra.import_module_object("ghidra.program.model.artifacts", "DataTypeConflictHandler")
structType = self.ghidra.import_module_object("ghidra.program.model.artifacts", "StructureDataType")
byteType = self.ghidra.import_module_object("ghidra.program.model.artifacts", "ByteDataType")
handler = self.ghidra.import_module_object("ghidra.program.model.data", "DataTypeConflictHandler")
structType = self.ghidra.import_module_object("ghidra.program.model.data", "StructureDataType")
byteType = self.ghidra.import_module_object("ghidra.program.model.data", "ByteDataType")
ghidra_struct = structType(struct.name, 0)
for offset in struct.members:
member = struct.members[offset]
ghidra_struct.add(byteType.artifactsType, 1, member.name, "")
ghidra_struct.add(byteType.dataType, 1, member.name, "")
ghidra_struct.growStructure(member.size - 1)
for dtc in ghidra_struct.getComponents():
if dtc.getFieldName() == member.name:
Expand Down Expand Up @@ -441,9 +440,9 @@ def _set_enum(self, enum: Enum, **kwargs) -> bool:
corrected_enum_name = "/" + enum.name
old_ghidra_enum = self.ghidra.currentProgram.getDataTypeManager().getDataType(corrected_enum_name)
data_manager = self.ghidra.currentProgram.getDataTypeManager()
handler = self.ghidra.import_module_object("ghidra.program.model.artifacts", "DataTypeConflictHandler")
enumType = self.ghidra.import_module_object("ghidra.program.model.artifacts", "EnumDataType")
categoryPath = self.ghidra.import_module_object("ghidra.program.model.artifacts", "CategoryPath")
handler = self.ghidra.import_module_object("ghidra.program.model.data", "DataTypeConflictHandler")
enumType = self.ghidra.import_module_object("ghidra.program.model.data", "EnumDataType")
categoryPath = self.ghidra.import_module_object("ghidra.program.model.data", "CategoryPath")
ghidra_enum = enumType(categoryPath('/'), enum.name, 4)
for m_name, m_val in enum.members.items():
ghidra_enum.add(m_name, m_val)
Expand All @@ -466,7 +465,7 @@ def _enums(self) -> Dict[str, Enum]:
names: Optional[List[str]] = self.ghidra.bridge.remote_eval(
"[dType.getPathName() "
"for dType in currentProgram.getDataTypeManager().getAllDataTypes()"
"if str(type(dType)) == \"<type 'ghidra.program.artifactsbase.artifacts.EnumDB'>\"]"
"if str(type(dType)) == \"<type 'ghidra.program.database.data.EnumDB'>\"]"
)
return {name[1:]: Enum(name[1:], self._get_enum_members(name)) for name in names if
name.count('/') == 1} if names else {}
Expand Down

0 comments on commit 4cc47ae

Please sign in to comment.