Skip to content

Commit

Permalink
Add api
Browse files Browse the repository at this point in the history
  • Loading branch information
mahaloz committed Jul 23, 2024
1 parent e6d896c commit a874aeb
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 17 deletions.
54 changes: 48 additions & 6 deletions libbs/api/decompiler_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def decompile(self, addr: int, map_lines=False, **kwargs) -> Optional[Decompilat

return decompilation

def xrefs_to(self, artifact: Artifact) -> List[Artifact]:
def xrefs_to(self, artifact: Artifact, decompile=False) -> List[Artifact]:
"""
Returns a list of artifacts that reference the provided artifact.
@param artifact: Artifact to find references to
Expand All @@ -325,12 +325,49 @@ def xrefs_to(self, artifact: Artifact) -> List[Artifact]:

return []

def get_dependencies(self, artifact: Artifact, decompile=True, **kwargs) -> List[Artifact]:
def get_dependencies(self, artifact: Artifact, decompile=True, max_resolves=50, **kwargs) -> List[Artifact]:
if not isinstance(artifact, Function):
raise ValueError("Only functions are supported for get_dependencies")

# TODO: finish me
art_users = self.xrefs_to(artifact)
# collect all xrefs to the function (for global variables)
if decompile:
# the function was never decompiled
if artifact.dec_obj is None:
artifact = self.functions[artifact.addr]

art_users = self.xrefs_to(artifact, decompile=decompile)
gvars = [art for art in art_users if isinstance(art, GlobalVariable)]

# collect all structs/enums used in the function types
imported_types = set()
imported_types.add(self.get_defined_type(artifact.header.type))
for arg in artifact.header.args.values():
imported_types.add(self.get_defined_type(arg.type))
for svar in artifact.stack_vars.values():
imported_types.add(self.get_defined_type(svar.type))

# start resolving dependencies in structs
for _ in range(max_resolves):
new_imports = False
for imported_type in list(imported_types):
if isinstance(imported_type, Struct):
for member in imported_type.members.values():
new_type = self.get_defined_type(member.type)
if new_type is not None and new_type not in imported_types:
imported_types.add(new_type)
new_imports = True
break

if new_imports:
break

if not new_imports:
break
else:
self.warning("Max dependency resolves reached, returning partial results")

all_deps = [art for art in list(imported_types) + gvars if art is not None]
return all_deps

def get_func_containing(self, addr: int) -> Optional[Function]:
raise NotImplementedError
Expand Down Expand Up @@ -700,7 +737,7 @@ def get_identifiers(artifact: Artifact) -> Tuple:
elif isinstance(artifact, (Struct, Enum)):
return (artifact.name,)

def type_is_user_defined(self, type_str, state=None):
def get_defined_type(self, type_str) -> Optional[Artifact]:
if not type_str:
return None

Expand All @@ -714,7 +751,12 @@ def type_is_user_defined(self, type_str, state=None):
return None

base_type_str = type_.base_type.type
return base_type_str if base_type_str in self.structs.keys() else None
if base_type_str in self.structs:
return self.structs[base_type_str]
elif base_type_str in self.enums:
return self.enums[base_type_str]

return None

@staticmethod
def _find_global_in_call_frames(global_name, max_frames=10):
Expand Down
2 changes: 1 addition & 1 deletion libbs/artifacts/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __eq__(self, other):
def __hash__(self):
long_str = ""
for attr in self.slots:
long_str += getattr(self, attr)
long_str += str(getattr(self, attr))

return hash(long_str)

Expand Down
93 changes: 85 additions & 8 deletions libbs/decompilers/ghidra/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from jfx_bridge.bridge import BridgedObject
from ghidra_bridge import GhidraBridge

from libbs.api import DecompilerInterface
from libbs.api import DecompilerInterface, CType
from libbs.api.decompiler_interface import requires_decompilation
from libbs.artifacts import (
Function, FunctionHeader, StackVariable, Comment, FunctionArgument, GlobalVariable, Struct, StructMember, Enum,
Decompilation, Context
Decompilation, Context, Artifact
)

from .artifact_lifter import GhidraArtifactLifter
Expand Down Expand Up @@ -288,6 +288,37 @@ def get_decompilation_object(self, function: Function, do_lower=True) -> Optiona
lowered_addr = self.art_lifter.lower_addr(function.addr) if do_lower else function.addr
return self._ghidra_decompile(self._get_nearest_function(lowered_addr))

def xrefs_to(self, artifact: Artifact, decompile=False) -> List[Artifact]:
xrefs = super().xrefs_to(artifact)
if not decompile:
return xrefs

artifact: Function
if artifact.dec_obj is None:
artifact = self.functions[artifact.addr]
decompilation_results = self.get_decompilation_object(artifact, do_lower=True)

high_function = decompilation_results.getHighFunction()
if high_function is None:
return xrefs

new_xrefs = []
for global_sym in high_function.getGlobalSymbolMap().getSymbols():
sym_storage = global_sym.getStorage()
if not sym_storage.isMemoryStorage():
continue

gvar = GlobalVariable(
addr=int(sym_storage.getMinAddress().getOffset()),
name=str(global_sym.getName()),
type_=str(global_sym.getDataType()) if global_sym.getDataType() else None,
size=int(global_sym.getSize()),
)
new_xrefs.append(gvar)

lifted_xrefs = [self.art_lifter.lift(x) for x in xrefs + new_xrefs]
return lifted_xrefs

#
# Extra API
#
Expand Down Expand Up @@ -389,33 +420,42 @@ def _set_stack_variables(self, svars: List[StackVariable], **kwargs) -> bool:
func_addr = first_svar.addr
decompilation = kwargs.get('decompilation', None) or self._ghidra_decompile(self._get_function(func_addr))
ghidra_func = decompilation.getFunction() if decompilation else self._get_nearest_function(func_addr)
gstack_vars = self.__get_gstack_vars(ghidra_func)
gstack_vars = self.__get_decless_gstack_vars(ghidra_func) # this works because the func was already decompiled
#gstack_vars = self.__get_gstack_vars(decompilation.getHighFunction())
if not gstack_vars:
return changes

var_pairs = []
for svar in svars:
for gstack_var in gstack_vars:
#if svar.offset == gstack_var.storage.stackOffset:
if svar.offset == gstack_var.getStackOffset():
var_pairs.append((svar, gstack_var))
break

rename_pairs = []
retype_pairs = []
changes = False
#updates = {}
for svar, gstack_var in var_pairs:
if svar.name and svar.name != gstack_var.getName():
#update_data = [gstack_var.name, None]
if svar.name and svar.name != gstack_var.name:
changes |= True
rename_pairs.append((gstack_var, svar.name))
#update_data[0] = svar.name

if svar.type:
parsed_type = self.typestr_to_gtype(svar.type)
if parsed_type is not None and parsed_type != str(gstack_var.getDataType()):
changes |= True
retype_pairs.append((gstack_var, parsed_type))
#update_data[1] = parsed_type

#updates[gstack_var] = update_data

self.__set_sym_names(rename_pairs, SourceType.USER_DEFINED)
self.__set_sym_types(retype_pairs, SourceType.USER_DEFINED)
#changes = self._update_local_variable_symbols(updates)
return changes

def _get_stack_variable(self, addr: int, offset: int, **kwargs) -> Optional[StackVariable]:
Expand Down Expand Up @@ -496,6 +536,7 @@ def _set_struct(self, struct: Struct, header=True, members=True, **kwargs) -> bo
ghidra_struct.clearAtOffset(i)
ghidra_struct.replaceAtOffset(offset, gtype, member.size, member.name, "")
break
# TODO: normalize the size of the struct if it did not grow enough
try:
if old_ghidra_struct is not None:
data_manager.replaceDataType(old_ghidra_struct, ghidra_struct, True)
Expand Down Expand Up @@ -795,17 +836,40 @@ def _ghidra_decompile(self, func: "GhidraFunction") -> "DecompileResult":

def _get_gstack_var(self, func: "GhidraFunction", offset: int) -> Optional["LocalVariableDB"]:
"""
TODO: this needs to be updated that when its called we get decomilation, and pass it to
__get_gstack_vars
@param func:
@param offset:
@return:
"""
gstack_vars = self.__get_gstack_vars(func)
gstack_vars = self.__get_decless_gstack_vars(func)
for var in gstack_vars:
if var.getStackOffset() == offset:
return var

return None

def _headless_lookup_struct(self, typestr: str) -> Optional["DataType"]:
"""
This function is mostly a hack because getDataTypeManagerService does not have up to date
datatypes in headless mode, so any structs you create dont get registerd
"""
if not typestr:
return None

type_: CType = self.type_parser.parse_type(typestr)
if not type_:
# it was not parseable
return None

# type is known and parseable
if not type_.is_unknown:
return None

base_type_str = type_.base_type.type
return self.currentProgram.getDataTypeManager().getDataType("/" + base_type_str)

def typestr_to_gtype(self, typestr: str) -> Optional["DataType"]:
"""
typestr should look something like:
Expand All @@ -825,8 +889,14 @@ def typestr_to_gtype(self, typestr: str) -> Optional["DataType"]:
try:
parsed_type = dt_parser.parse(typestr)
except Exception as e:
parsed_type = None

if self.headless and parsed_type is None:
# try again in headless mode only!
parsed_type = self._headless_lookup_struct(typestr)

if parsed_type is None:
_l.warning(f"Failed to parse type string: {typestr}")
return None

return parsed_type

Expand Down Expand Up @@ -891,9 +961,16 @@ def _get_local_variable_symbols(self, func: Function) -> List[Tuple[str, "HighSy
]

@ui_remote_eval
def __get_gstack_vars(self, func: "GhidraFunction") -> List["LocalVariableDB"]:
def __get_decless_gstack_vars(self, func: "GhidraFunction") -> List["LocalVariableDB"]:
return [var for var in func.getAllVariables() if var.isStackVariable()]

@ui_remote_eval
def __get_gstack_vars(self, high_func: "HighFunction") -> List["LocalVariableDB"]:
return [
var for var in high_func.getLocalSymbolMap().getSymbols()
if var.storage and var.storage.isStackStorage()
]

@ui_remote_eval
def __enum_names(self) -> List[Tuple[str, "EnumDB"]]:
from .compat.imports import EnumDB
Expand All @@ -907,7 +984,7 @@ def __enum_names(self) -> List[Tuple[str, "EnumDB"]]:
@ui_remote_eval
def __stack_variables(self, decompilation) -> List[Tuple[int, str, str, int]]:
return [
(int(sym.getStorage().getStackOffset()), str(sym.getName()), str(sym.getDataType()), int(sym.getSize()))
(int(sym.getStorage().getStackOffset()), str(sym.getName()), sym.getDataType().displayName, int(sym.getSize()))
for sym in decompilation.getHighFunction().getLocalSymbolMap().getSymbols()
if sym.getStorage().isStackStorage()
]
Expand Down
1 change: 1 addition & 0 deletions libbs/decompilers/ida/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ def _get_func_info(code_view):
with IDAViewCTX(func_addr) as ida_code_view:
func = _get_func_info(ida_code_view)

func.dec_obj = ida_code_view.cfunc if ida_code_view is not None else None
return func


Expand Down
2 changes: 1 addition & 1 deletion libbs/decompilers/ida/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def decompiler_available(self) -> bool:

return self._decompiler_available

def xrefs_to(self, artifact: Artifact) -> List[Artifact]:
def xrefs_to(self, artifact: Artifact, decompile=False) -> List[Artifact]:
if not isinstance(artifact, Function):
_l.warning("xrefs_to is only implemented for functions.")
return []
Expand Down
81 changes: 80 additions & 1 deletion tests/test_decompilers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from libbs.api import DecompilerInterface
from libbs.artifacts import FunctionHeader, StackVariable, Struct, GlobalVariable, Enum, Comment, ArtifactFormat, \
Decompilation, Function
Decompilation, Function, StructMember
from libbs.decompilers import IDA_DECOMPILER, ANGR_DECOMPILER, BINJA_DECOMPILER, GHIDRA_DECOMPILER
from libbs.decompilers.ghidra.testing import HeadlessGhidraDecompiler

Expand Down Expand Up @@ -91,6 +91,85 @@ def test_getting_artifacts(self):

deci.shutdown()

def test_ghidra_artifact_dependency_resolving(self):
with tempfile.TemporaryDirectory() as temp_dir:
proj_name = "fauxware_ghidra"

deci = DecompilerInterface.discover(
force_decompiler=GHIDRA_DECOMPILER,
headless=True,
binary_path=self.FAUXWARE_PATH,
project_location=Path(temp_dir),
project_name=proj_name,
)
self.deci = deci
light_funcs = {addr: func for addr, func in deci.functions.items()}
auth_func_addr = deci.art_lifter.lift_addr(0x400664)
sneaky_gvar = deci.art_lifter.lift_addr(0x601048)

# dont decompile the function to test it is decompiled on demand, however
# a normal use case would be to decompile it first
auth_func = light_funcs[auth_func_addr]
initial_deps = deci.get_dependencies(auth_func)
for art in initial_deps:
assert art is not None
assert art.dumps(fmt=ArtifactFormat.JSON) is not None

assert len(initial_deps) == 1
dep = initial_deps[0]
assert isinstance(dep, GlobalVariable)
assert dep.addr == sneaky_gvar

# TODO: right now in headless Ghidra you cant ever set structs to variable types.
# This is a limitation of the headless decompiler, not the API.
# now create two structs that reference each other
#
# struct A {
# struct B *b;
# };
#
# struct B {
# struct A *a;
# int size;
# };
#

#struct_a = Struct(
# name="A",
# members={
# 0: StructMember(name="b", type_="B*", offset=0, size=8)
# },
# size=8
#)
#struct_b = Struct(
# name="B",
# members={
# 0: StructMember(name="a", type_="A*", offset=0, size=8),
# 1: StructMember(name="size", type_="int", offset=8, size=4)
# },
# size=12
#)

## first add the structs to the decompiler, empty, so both names can exist
#deci.structs[struct_a.name] = Struct(name=struct_a.name, size=struct_a.size)
#deci.structs[struct_b.name] = Struct(name=struct_b.name, size=struct_b.size)

## now add the members to the structs
#deci.structs[struct_a.name] = struct_a
#deci.structs[struct_b.name] = struct_b

## now change a stack variable to be of type A
#auth_func = deci.functions[auth_func_addr]
#auth_func.stack_vars[-24].type = "A*"
#deci.functions[auth_func_addr] = auth_func
## refresh the decompilation
#auth_func = deci.functions[auth_func_addr]

## now get the dependencies again
#new_deps = deci.get_dependencies(auth_func)
#assert len(new_deps) == 3


def test_ghidra_fauxware(self):
deci = DecompilerInterface.discover(
force_decompiler=GHIDRA_DECOMPILER,
Expand Down

0 comments on commit a874aeb

Please sign in to comment.