Skip to content

Commit

Permalink
Lift and lower artifacts on setting and getting (#21)
Browse files Browse the repository at this point in the history
* Lift and lower artifacts on setting and getting

* remove some ghidra cruff

* Make things more explicit in ArtifactDict

* Fix a bug in binja setting

* Fix Ghidra setter
  • Loading branch information
mahaloz authored Jan 8, 2024
1 parent c8f2568 commit c7b721b
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 224 deletions.
2 changes: 1 addition & 1 deletion libbs/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.0"
__version__ = "0.15.0"
7 changes: 0 additions & 7 deletions libbs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,13 @@ def main():
parser.add_argument(
"--single-decompiler-install", nargs=2, metavar=('decompiler', 'path'), help="Install DAILA into a single decompiler. Decompiler must be one of: ida, ghidra, binja, angr."
)
parser.add_argument(
"--run-ghidra-ui", action="store_true", help="""
Execute the Ghidra file selector UI for running LibBS scripts.
"""
)
args = parser.parse_args()

if args.single_decompiler_install:
decompiler, path = args.single_decompiler_install
LibBSPluginInstaller().install(interactive=False, paths_by_target={decompiler: path})
elif args.install:
install()
elif args.run_ghidra_ui:
return run_ghidra_ui()


if __name__ == "__main__":
Expand Down
57 changes: 43 additions & 14 deletions libbs/api/artifact_dict.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,51 @@
from typing import Type
import typing
import logging

from libbs.artifacts import (
Artifact, Comment, Enum, FunctionHeader, Function, FunctionArgument,
GlobalVariable, Patch, StackVariable, Struct, StructMember
)

if typing.TYPE_CHECKING:
from libbs.api import DecompilerInterface

_l = logging.getLogger(__name__)


class ArtifactDict(dict):
def __init__(self, artifact_cls, decompiler_interface: "DecompilerInterface", error_on_duplicate=False):
"""
The ArtifactDict is a Dictionary wrapper around the getting/setting/listing of artifacts in the decompiler. This
allows for a more pythonic interface to the decompiler artifacts. For example, instead of doing:
deci._set_function(func)
You can do:
>>> deci.functions[func.addr] = func
This class is not meant to be instantiated directly, but rather through the DecompilerInterface class.
There is currently some interesting affects and caveats to using this class:
- When you list artifacts, by calling list(), you will get a light copy of the artifacts. This means that if you
modify the artifacts in the list, they will not be reflected in the decompiler. You also do need get current
data in the decompiler, only an acknowledgement that the artifact exists.
- You must reassign the artifact to the dictionary to update the decompiler.
- When assigning something to the dictionary, it must always be in its lifted form. You will also only get lifted
artifacts back from the dictionary.
- For convience, you can access functions by their lowered address
"""

def __init__(self, artifact_cls, deci: "DecompilerInterface", error_on_duplicate=False):
super().__init__()

self._di = decompiler_interface
self._deci = deci
self._error_on_duplicate = error_on_duplicate
self._art_function = {
# ArtifactType: (setter, getter, lister)
Function: (self._di._set_function, self._di._get_function, self._di._functions),
StackVariable: (self._di._set_stack_variable, self._di._get_stack_variable, self._di._stack_variables),
GlobalVariable: (self._di._set_global_variable, self._di._get_global_var, self._di._global_vars),
Struct: (self._di._set_struct, self._di._get_struct, self._di._structs),
Enum: (self._di._set_enum, self._di._get_enum, self._di._enums),
Comment: (self._di._set_comment, self._di._get_comment, self._di._comments),
Patch: (self._di._set_patch, self._di._get_patch, self._di._patches)
Function: (self._deci._set_function, self._deci._get_function, self._deci._functions),
StackVariable: (self._deci._set_stack_variable, self._deci._get_stack_variable, self._deci._stack_variables),
GlobalVariable: (self._deci._set_global_variable, self._deci._get_global_var, self._deci._global_vars),
Struct: (self._deci._set_struct, self._deci._get_struct, self._deci._structs),
Enum: (self._deci._set_enum, self._deci._get_enum, self._deci._enums),
Comment: (self._deci._set_comment, self._deci._get_comment, self._deci._comments),
Patch: (self._deci._set_patch, self._deci._get_patch, self._deci._patches)
}

functions = self._art_function.get(artifact_cls, None)
Expand All @@ -37,14 +59,21 @@ def __len__(self):
return len(self._artifact_lister())

def __getitem__(self, item):
data = self._artifact_getter(item)
if data is None:
art = self._artifact_getter(item)
if art is None:
raise KeyError

return data
return self._deci.art_lifter.lift(art)

def __setitem__(self, key, value):
if not self._artifact_setter(value) and self._error_on_duplicate:
if not isinstance(value, self._artifact_class):
raise ValueError(f"Attempting to set a value of type {type(value)} to a dict of type {self._artifact_class}")

if hasattr(value, "addr") and value.addr is None:
value.addr = key

art = self._deci.art_lifter.lower(value)
if not self._artifact_setter(art) and self._error_on_duplicate:
raise ValueError(f"Set value {value} is already present at key {key}")

def __contains__(self, item):
Expand Down
20 changes: 16 additions & 4 deletions libbs/api/artifact_lifter.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import logging
import typing

from libbs.artifacts import StackVariable, Artifact
from libbs.api.type_parser import CTypeParser

if typing.TYPE_CHECKING:
from libbs.api import DecompilerInterface

_l = logging.getLogger(name=__name__)


class ArtifactLifter:
def __init__(self, controller, types=None):
self.controller = controller
def __init__(self, deci: "DecompilerInterface", types=None):
self.deci = deci
self.type_parser = CTypeParser(extra_types=types)

#
Expand All @@ -29,7 +33,11 @@ def lift_type(self, type_str: str) -> str:
pass

def lift_addr(self, addr: int) -> int:
pass
if addr < self.deci.binary_base_addr:
self.deci.warning(f"Lifting an address that appears already lifted: {addr}...")
return addr
else:
return addr - self.deci.binary_base_addr

def lift_stack_offset(self, offset: int, func_addr: int) -> int:
pass
Expand All @@ -38,7 +46,11 @@ def lower_type(self, type_str: str) -> str:
pass

def lower_addr(self, addr: int) -> int:
pass
if addr >= self.deci.binary_base_addr:
self.deci.warning(f"Lifting an address that appears already lifted: {addr}...")
return addr
else:
return addr + self.deci.binary_base_addr

def lower_stack_offset(self, offset: int, func_addr: int) -> int:
pass
Expand Down
30 changes: 14 additions & 16 deletions libbs/api/decompiler_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
artifact_write_callbacks: Optional[Dict[Type[Artifact], List[Callable]]] = None,
):
self.name = name
self.artifact_lifer = artifact_lifter
self.art_lifter = artifact_lifter
self.type_parser = CTypeParser()
self.supports_undo = supports_undo
self.qt_version = qt_version
Expand Down Expand Up @@ -195,6 +195,14 @@ def gui_ask_for_string(self, question, title="Plugin Question") -> str:
# the decompiler regardless of internal decompiler API.
#

@property
def binary_base_addr(self) -> int:
"""
Returns the base address of the binary in the decompiler. This is useful for calculating offsets
in the binary. Also mandatory for using the lifting and lowering API.
"""
raise NotImplementedError

@property
def binary_hash(self) -> str:
"""
Expand Down Expand Up @@ -237,7 +245,7 @@ def decompile(self, addr: int) -> Optional[str]:
return None

# TODO: make this a function call after transitioning decompiler artifacts to LiveState
for search_addr in (addr, self.artifact_lifer.lower_addr(addr)):
for search_addr in (addr, self.art_lifter.lower_addr(addr)):
func_found = False
for func_addr, func in self._functions().items():
if func.addr <= search_addr < (func.addr + func.size):
Expand Down Expand Up @@ -487,7 +495,7 @@ def set_artifact(self, artifact: Artifact, lower=True, **kwargs) -> bool:
>>> func = Function(0xdeadbeef, 0x800)
>>> func.name = "main"
>>> controller.set_artifact(func)
>>> deci.set_artifact(func)
@param artifact:
@param lower: Wether to convert the Artifacts types and offset into the local decompilers format
Expand All @@ -506,7 +514,7 @@ def set_artifact(self, artifact: Artifact, lower=True, **kwargs) -> bool:
}

if lower:
artifact = self.lower_artifact(artifact)
artifact = self.art_lifter.lower(artifact)

setter = set_map.get(type(artifact), None)
if setter is None:
Expand Down Expand Up @@ -545,16 +553,6 @@ def global_variable_changed(self, gvar: GlobalVariable, **kwargs):
for callback_func in self.artifact_write_callbacks[GlobalVariable]:
callback_func(gvar, **kwargs)

#
# Client API & Shortcuts
#

def lift_artifact(self, artifact: Artifact) -> Artifact:
return self.artifact_lifer.lift(artifact)

def lower_artifact(self, artifact: Artifact) -> Artifact:
return self.artifact_lifer.lower(artifact)

#
# Fillers:
# A filler function is generally responsible for pulling down artifacts from a specific user state
Expand Down Expand Up @@ -583,7 +581,7 @@ def artifact_set_event_handler(
:return:
"""

lowered_artifact = self.lower_artifact(artifact)
lowered_artifact = self.art_lifter.lower(artifact)
lock = self.artifact_write_lock if not self.artifact_write_lock.locked() else DummyArtifactSetLock()
with lock:
try:
Expand Down Expand Up @@ -697,7 +695,7 @@ def discover(
) -> Optional["DecompilerInterface"]:
"""
This function is a special API helper that will attempt to detect the decompiler it is running in and
return the valid BSController for that decompiler. You may also force the chosen controller.
return the valid BSController for that decompiler. You may also force the chosen deci.
@param force_decompiler: The optional string used to force a specific decompiler interface
@param interface_overrides: The optional dict used to override the class of a decompiler interface
Expand Down
7 changes: 0 additions & 7 deletions libbs/decompiler_stubs/ghidra_libbs/ghidra_libbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,5 @@
import subprocess
from libbs_vendored.ghidra_bridge_server import GhidraBridgeServer


def start_libbs_selector():
subprocess.Popen("libbs --run-ghidra-ui".split(" "))


if __name__ == "__main__":
GhidraBridgeServer.run_server(background=True)
# TODO: put the selector back
#start_libbs_selector()
6 changes: 0 additions & 6 deletions libbs/decompilers/angr/artifact_lifter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,12 @@ class AngrArtifactLifter(ArtifactLifter):
def __init__(self, interface):
super(AngrArtifactLifter, self).__init__(interface)

def lift_addr(self, addr: int) -> int:
return self.controller.rebase_addr(addr)

def lift_type(self, type_str: str) -> str:
return type_str

def lift_stack_offset(self, offset: int, func_addr: int) -> int:
return offset

def lower_addr(self, addr: int) -> int:
return self.controller.rebase_addr(addr, up=True)

def lower_type(self, type_str: str) -> str:
return type_str

Expand Down
25 changes: 7 additions & 18 deletions libbs/decompilers/angr/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def _init_headless_components(self):
# Decompiler API
#

@property
def binary_base_addr(self) -> int:
return self.main_instance.project.loader.main_object.mapped_base

@property
def binary_hash(self) -> str:
return self.main_instance.project.loader.main_object.md5.hex()
Expand All @@ -81,26 +85,12 @@ def get_func_size(self, func_addr) -> int:
except KeyError:
return 0

def rebase_addr(self, addr, up=False):
is_pie = self.main_instance.project.loader.main_object.pic
if not is_pie:
return addr

base_addr = self.main_instance.project.loader.main_object.mapped_base
rebased_addr = addr
if up and addr < base_addr:
rebased_addr = addr + base_addr
elif not up and addr > base_addr:
rebased_addr = addr - base_addr

return rebased_addr

def xrefs_to(self, artifact: Artifact) -> List[Artifact]:
if not isinstance(artifact, Function):
l.warning("xrefs_to is only implemented for functions.")
return []

function: Function = self.lower_artifact(artifact)
function: Function = self.art_lifter.lower(artifact)
program_cfg = self.main_instance.kb.cfgs.get_most_accurate()
if program_cfg is None:
return []
Expand Down Expand Up @@ -170,7 +160,7 @@ def _init_gui_plugin(self, *args, **kwargs):
return self.gui_plugin

def goto_address(self, func_addr):
self.workspace.jump_to(self.rebase_addr(func_addr, up=True))
self.workspace.jump_to(self.art_lifter.lower_addr(func_addr))

def register_ctx_menu_item(self, name, action_string, callback_func, category=None) -> bool:
if self.gui_plugin is None:
Expand All @@ -194,8 +184,7 @@ def active_context(self):
if func is None or func.am_obj is None:
return None

func_addr = self.rebase_addr(func.addr)

func_addr = self.art_lifter.lift_addr(func.addr)
return Function(
func_addr, 0, header=FunctionHeader(func.name, func_addr)
)
Expand Down
10 changes: 2 additions & 8 deletions libbs/decompilers/binja/artifact_lifter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ class BinjaArtifactLifter(ArtifactLifter):
"uint8_t": "unsigned char",
}

def __init__(self, controller):
super(BinjaArtifactLifter, self).__init__(controller)

def lift_addr(self, addr: int) -> int:
return addr
def __init__(self, deci):
super(BinjaArtifactLifter, self).__init__(deci)

def lift_type(self, type_str: str) -> str:
for bn_t, bs_t in self.lift_map.items():
Expand All @@ -28,9 +25,6 @@ def lift_type(self, type_str: str) -> str:
def lift_stack_offset(self, offset: int, func_addr: int) -> int:
return offset

def lower_addr(self, addr: int) -> int:
return addr

def lower_type(self, type_str: str) -> str:
return type_str

Expand Down
Loading

0 comments on commit c7b721b

Please sign in to comment.