From a874aebe0977ec4c8618c1dee17d403e150056c8 Mon Sep 17 00:00:00 2001 From: mahaloz Date: Mon, 22 Jul 2024 19:08:58 -0700 Subject: [PATCH] Add api --- libbs/api/decompiler_interface.py | 54 ++++++++++++++-- libbs/artifacts/artifact.py | 2 +- libbs/decompilers/ghidra/interface.py | 93 ++++++++++++++++++++++++--- libbs/decompilers/ida/compat.py | 1 + libbs/decompilers/ida/interface.py | 2 +- tests/test_decompilers.py | 81 ++++++++++++++++++++++- 6 files changed, 216 insertions(+), 17 deletions(-) diff --git a/libbs/api/decompiler_interface.py b/libbs/api/decompiler_interface.py index f4cea62..538d8f6 100644 --- a/libbs/api/decompiler_interface.py +++ b/libbs/api/decompiler_interface.py @@ -314,7 +314,7 @@ def decompile(self, addr: int, map_lines=False, **kwargs) -> Optional[Decompilat return decompilation - def xrefs_to(self, artifact: Artifact) -> List[Artifact]: + def xrefs_to(self, artifact: Artifact, decompile=False) -> List[Artifact]: """ Returns a list of artifacts that reference the provided artifact. @param artifact: Artifact to find references to @@ -325,12 +325,49 @@ def xrefs_to(self, artifact: Artifact) -> List[Artifact]: return [] - def get_dependencies(self, artifact: Artifact, decompile=True, **kwargs) -> List[Artifact]: + def get_dependencies(self, artifact: Artifact, decompile=True, max_resolves=50, **kwargs) -> List[Artifact]: if not isinstance(artifact, Function): raise ValueError("Only functions are supported for get_dependencies") - # TODO: finish me - art_users = self.xrefs_to(artifact) + # collect all xrefs to the function (for global variables) + if decompile: + # the function was never decompiled + if artifact.dec_obj is None: + artifact = self.functions[artifact.addr] + + art_users = self.xrefs_to(artifact, decompile=decompile) + gvars = [art for art in art_users if isinstance(art, GlobalVariable)] + + # collect all structs/enums used in the function types + imported_types = set() + imported_types.add(self.get_defined_type(artifact.header.type)) + for arg in artifact.header.args.values(): + imported_types.add(self.get_defined_type(arg.type)) + for svar in artifact.stack_vars.values(): + imported_types.add(self.get_defined_type(svar.type)) + + # start resolving dependencies in structs + for _ in range(max_resolves): + new_imports = False + for imported_type in list(imported_types): + if isinstance(imported_type, Struct): + for member in imported_type.members.values(): + new_type = self.get_defined_type(member.type) + if new_type is not None and new_type not in imported_types: + imported_types.add(new_type) + new_imports = True + break + + if new_imports: + break + + if not new_imports: + break + else: + self.warning("Max dependency resolves reached, returning partial results") + + all_deps = [art for art in list(imported_types) + gvars if art is not None] + return all_deps def get_func_containing(self, addr: int) -> Optional[Function]: raise NotImplementedError @@ -700,7 +737,7 @@ def get_identifiers(artifact: Artifact) -> Tuple: elif isinstance(artifact, (Struct, Enum)): return (artifact.name,) - def type_is_user_defined(self, type_str, state=None): + def get_defined_type(self, type_str) -> Optional[Artifact]: if not type_str: return None @@ -714,7 +751,12 @@ def type_is_user_defined(self, type_str, state=None): return None base_type_str = type_.base_type.type - return base_type_str if base_type_str in self.structs.keys() else None + if base_type_str in self.structs: + return self.structs[base_type_str] + elif base_type_str in self.enums: + return self.enums[base_type_str] + + return None @staticmethod def _find_global_in_call_frames(global_name, max_frames=10): diff --git a/libbs/artifacts/artifact.py b/libbs/artifacts/artifact.py index 3ccd8fc..f064bb5 100644 --- a/libbs/artifacts/artifact.py +++ b/libbs/artifacts/artifact.py @@ -51,7 +51,7 @@ def __eq__(self, other): def __hash__(self): long_str = "" for attr in self.slots: - long_str += getattr(self, attr) + long_str += str(getattr(self, attr)) return hash(long_str) diff --git a/libbs/decompilers/ghidra/interface.py b/libbs/decompilers/ghidra/interface.py index 0a5fd2a..6b4cc3d 100644 --- a/libbs/decompilers/ghidra/interface.py +++ b/libbs/decompilers/ghidra/interface.py @@ -9,11 +9,11 @@ from jfx_bridge.bridge import BridgedObject from ghidra_bridge import GhidraBridge -from libbs.api import DecompilerInterface +from libbs.api import DecompilerInterface, CType from libbs.api.decompiler_interface import requires_decompilation from libbs.artifacts import ( Function, FunctionHeader, StackVariable, Comment, FunctionArgument, GlobalVariable, Struct, StructMember, Enum, - Decompilation, Context + Decompilation, Context, Artifact ) from .artifact_lifter import GhidraArtifactLifter @@ -288,6 +288,37 @@ def get_decompilation_object(self, function: Function, do_lower=True) -> Optiona lowered_addr = self.art_lifter.lower_addr(function.addr) if do_lower else function.addr return self._ghidra_decompile(self._get_nearest_function(lowered_addr)) + def xrefs_to(self, artifact: Artifact, decompile=False) -> List[Artifact]: + xrefs = super().xrefs_to(artifact) + if not decompile: + return xrefs + + artifact: Function + if artifact.dec_obj is None: + artifact = self.functions[artifact.addr] + decompilation_results = self.get_decompilation_object(artifact, do_lower=True) + + high_function = decompilation_results.getHighFunction() + if high_function is None: + return xrefs + + new_xrefs = [] + for global_sym in high_function.getGlobalSymbolMap().getSymbols(): + sym_storage = global_sym.getStorage() + if not sym_storage.isMemoryStorage(): + continue + + gvar = GlobalVariable( + addr=int(sym_storage.getMinAddress().getOffset()), + name=str(global_sym.getName()), + type_=str(global_sym.getDataType()) if global_sym.getDataType() else None, + size=int(global_sym.getSize()), + ) + new_xrefs.append(gvar) + + lifted_xrefs = [self.art_lifter.lift(x) for x in xrefs + new_xrefs] + return lifted_xrefs + # # Extra API # @@ -389,13 +420,15 @@ def _set_stack_variables(self, svars: List[StackVariable], **kwargs) -> bool: func_addr = first_svar.addr decompilation = kwargs.get('decompilation', None) or self._ghidra_decompile(self._get_function(func_addr)) ghidra_func = decompilation.getFunction() if decompilation else self._get_nearest_function(func_addr) - gstack_vars = self.__get_gstack_vars(ghidra_func) + gstack_vars = self.__get_decless_gstack_vars(ghidra_func) # this works because the func was already decompiled + #gstack_vars = self.__get_gstack_vars(decompilation.getHighFunction()) if not gstack_vars: return changes var_pairs = [] for svar in svars: for gstack_var in gstack_vars: + #if svar.offset == gstack_var.storage.stackOffset: if svar.offset == gstack_var.getStackOffset(): var_pairs.append((svar, gstack_var)) break @@ -403,19 +436,26 @@ def _set_stack_variables(self, svars: List[StackVariable], **kwargs) -> bool: rename_pairs = [] retype_pairs = [] changes = False + #updates = {} for svar, gstack_var in var_pairs: - if svar.name and svar.name != gstack_var.getName(): + #update_data = [gstack_var.name, None] + if svar.name and svar.name != gstack_var.name: changes |= True rename_pairs.append((gstack_var, svar.name)) + #update_data[0] = svar.name if svar.type: parsed_type = self.typestr_to_gtype(svar.type) if parsed_type is not None and parsed_type != str(gstack_var.getDataType()): changes |= True retype_pairs.append((gstack_var, parsed_type)) + #update_data[1] = parsed_type + + #updates[gstack_var] = update_data self.__set_sym_names(rename_pairs, SourceType.USER_DEFINED) self.__set_sym_types(retype_pairs, SourceType.USER_DEFINED) + #changes = self._update_local_variable_symbols(updates) return changes def _get_stack_variable(self, addr: int, offset: int, **kwargs) -> Optional[StackVariable]: @@ -496,6 +536,7 @@ def _set_struct(self, struct: Struct, header=True, members=True, **kwargs) -> bo ghidra_struct.clearAtOffset(i) ghidra_struct.replaceAtOffset(offset, gtype, member.size, member.name, "") break + # TODO: normalize the size of the struct if it did not grow enough try: if old_ghidra_struct is not None: data_manager.replaceDataType(old_ghidra_struct, ghidra_struct, True) @@ -795,17 +836,40 @@ def _ghidra_decompile(self, func: "GhidraFunction") -> "DecompileResult": def _get_gstack_var(self, func: "GhidraFunction", offset: int) -> Optional["LocalVariableDB"]: """ + TODO: this needs to be updated that when its called we get decomilation, and pass it to + __get_gstack_vars + @param func: @param offset: @return: """ - gstack_vars = self.__get_gstack_vars(func) + gstack_vars = self.__get_decless_gstack_vars(func) for var in gstack_vars: if var.getStackOffset() == offset: return var return None + def _headless_lookup_struct(self, typestr: str) -> Optional["DataType"]: + """ + This function is mostly a hack because getDataTypeManagerService does not have up to date + datatypes in headless mode, so any structs you create dont get registerd + """ + if not typestr: + return None + + type_: CType = self.type_parser.parse_type(typestr) + if not type_: + # it was not parseable + return None + + # type is known and parseable + if not type_.is_unknown: + return None + + base_type_str = type_.base_type.type + return self.currentProgram.getDataTypeManager().getDataType("/" + base_type_str) + def typestr_to_gtype(self, typestr: str) -> Optional["DataType"]: """ typestr should look something like: @@ -825,8 +889,14 @@ def typestr_to_gtype(self, typestr: str) -> Optional["DataType"]: try: parsed_type = dt_parser.parse(typestr) except Exception as e: + parsed_type = None + + if self.headless and parsed_type is None: + # try again in headless mode only! + parsed_type = self._headless_lookup_struct(typestr) + + if parsed_type is None: _l.warning(f"Failed to parse type string: {typestr}") - return None return parsed_type @@ -891,9 +961,16 @@ def _get_local_variable_symbols(self, func: Function) -> List[Tuple[str, "HighSy ] @ui_remote_eval - def __get_gstack_vars(self, func: "GhidraFunction") -> List["LocalVariableDB"]: + def __get_decless_gstack_vars(self, func: "GhidraFunction") -> List["LocalVariableDB"]: return [var for var in func.getAllVariables() if var.isStackVariable()] + @ui_remote_eval + def __get_gstack_vars(self, high_func: "HighFunction") -> List["LocalVariableDB"]: + return [ + var for var in high_func.getLocalSymbolMap().getSymbols() + if var.storage and var.storage.isStackStorage() + ] + @ui_remote_eval def __enum_names(self) -> List[Tuple[str, "EnumDB"]]: from .compat.imports import EnumDB @@ -907,7 +984,7 @@ def __enum_names(self) -> List[Tuple[str, "EnumDB"]]: @ui_remote_eval def __stack_variables(self, decompilation) -> List[Tuple[int, str, str, int]]: return [ - (int(sym.getStorage().getStackOffset()), str(sym.getName()), str(sym.getDataType()), int(sym.getSize())) + (int(sym.getStorage().getStackOffset()), str(sym.getName()), sym.getDataType().displayName, int(sym.getSize())) for sym in decompilation.getHighFunction().getLocalSymbolMap().getSymbols() if sym.getStorage().isStackStorage() ] diff --git a/libbs/decompilers/ida/compat.py b/libbs/decompilers/ida/compat.py index 84d0df1..df1fa7a 100644 --- a/libbs/decompilers/ida/compat.py +++ b/libbs/decompilers/ida/compat.py @@ -288,6 +288,7 @@ def _get_func_info(code_view): with IDAViewCTX(func_addr) as ida_code_view: func = _get_func_info(ida_code_view) + func.dec_obj = ida_code_view.cfunc if ida_code_view is not None else None return func diff --git a/libbs/decompilers/ida/interface.py b/libbs/decompilers/ida/interface.py index 388387e..8f4a5b9 100755 --- a/libbs/decompilers/ida/interface.py +++ b/libbs/decompilers/ida/interface.py @@ -116,7 +116,7 @@ def decompiler_available(self) -> bool: return self._decompiler_available - def xrefs_to(self, artifact: Artifact) -> List[Artifact]: + def xrefs_to(self, artifact: Artifact, decompile=False) -> List[Artifact]: if not isinstance(artifact, Function): _l.warning("xrefs_to is only implemented for functions.") return [] diff --git a/tests/test_decompilers.py b/tests/test_decompilers.py index 118c059..40d9b08 100644 --- a/tests/test_decompilers.py +++ b/tests/test_decompilers.py @@ -9,7 +9,7 @@ from libbs.api import DecompilerInterface from libbs.artifacts import FunctionHeader, StackVariable, Struct, GlobalVariable, Enum, Comment, ArtifactFormat, \ - Decompilation, Function + Decompilation, Function, StructMember from libbs.decompilers import IDA_DECOMPILER, ANGR_DECOMPILER, BINJA_DECOMPILER, GHIDRA_DECOMPILER from libbs.decompilers.ghidra.testing import HeadlessGhidraDecompiler @@ -91,6 +91,85 @@ def test_getting_artifacts(self): deci.shutdown() + def test_ghidra_artifact_dependency_resolving(self): + with tempfile.TemporaryDirectory() as temp_dir: + proj_name = "fauxware_ghidra" + + deci = DecompilerInterface.discover( + force_decompiler=GHIDRA_DECOMPILER, + headless=True, + binary_path=self.FAUXWARE_PATH, + project_location=Path(temp_dir), + project_name=proj_name, + ) + self.deci = deci + light_funcs = {addr: func for addr, func in deci.functions.items()} + auth_func_addr = deci.art_lifter.lift_addr(0x400664) + sneaky_gvar = deci.art_lifter.lift_addr(0x601048) + + # dont decompile the function to test it is decompiled on demand, however + # a normal use case would be to decompile it first + auth_func = light_funcs[auth_func_addr] + initial_deps = deci.get_dependencies(auth_func) + for art in initial_deps: + assert art is not None + assert art.dumps(fmt=ArtifactFormat.JSON) is not None + + assert len(initial_deps) == 1 + dep = initial_deps[0] + assert isinstance(dep, GlobalVariable) + assert dep.addr == sneaky_gvar + + # TODO: right now in headless Ghidra you cant ever set structs to variable types. + # This is a limitation of the headless decompiler, not the API. + # now create two structs that reference each other + # + # struct A { + # struct B *b; + # }; + # + # struct B { + # struct A *a; + # int size; + # }; + # + + #struct_a = Struct( + # name="A", + # members={ + # 0: StructMember(name="b", type_="B*", offset=0, size=8) + # }, + # size=8 + #) + #struct_b = Struct( + # name="B", + # members={ + # 0: StructMember(name="a", type_="A*", offset=0, size=8), + # 1: StructMember(name="size", type_="int", offset=8, size=4) + # }, + # size=12 + #) + + ## first add the structs to the decompiler, empty, so both names can exist + #deci.structs[struct_a.name] = Struct(name=struct_a.name, size=struct_a.size) + #deci.structs[struct_b.name] = Struct(name=struct_b.name, size=struct_b.size) + + ## now add the members to the structs + #deci.structs[struct_a.name] = struct_a + #deci.structs[struct_b.name] = struct_b + + ## now change a stack variable to be of type A + #auth_func = deci.functions[auth_func_addr] + #auth_func.stack_vars[-24].type = "A*" + #deci.functions[auth_func_addr] = auth_func + ## refresh the decompilation + #auth_func = deci.functions[auth_func_addr] + + ## now get the dependencies again + #new_deps = deci.get_dependencies(auth_func) + #assert len(new_deps) == 3 + + def test_ghidra_fauxware(self): deci = DecompilerInterface.discover( force_decompiler=GHIDRA_DECOMPILER,