Skip to content

Commit

Permalink
Feature: support fine-grained Context callbacks (#88)
Browse files Browse the repository at this point in the history
* Add support for context callbacks

* Feat: Support fine-grained contexts

* Make it work for BinSync again

* Everything working in BinSync

* bump
  • Loading branch information
mahaloz authored Jul 21, 2024
1 parent 6d6fa82 commit 36a5665
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 77 deletions.
10 changes: 6 additions & 4 deletions examples/change_watcher_plugin/bs_change_watcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def create_plugin(*args, **kwargs):

from libbs.api import DecompilerInterface
from libbs.artifacts import (
FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment
FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment, Context
)

deci = DecompilerInterface.discover(
Expand All @@ -24,10 +24,12 @@ def create_plugin(*args, **kwargs):
gui_init_kwargs=kwargs
)
# create a function to print a string in the decompiler console
decompiler_printer = lambda *x: deci.print(f"Changed {x}")
decompiler_printer = lambda *x, **y: deci.print(f"Changed {x}")
# register the callback for all the types we want to print
deci.artifact_write_callbacks = {
typ: [decompiler_printer] for typ in (FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment,)
deci.artifact_change_callbacks = {
typ: [decompiler_printer] for typ in (
FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment, Context
)
}

# register a menu to open when you right click on the psuedocode view
Expand Down
2 changes: 1 addition & 1 deletion libbs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.13.0"
__version__ = "1.14.0"


import logging
Expand Down
2 changes: 1 addition & 1 deletion libbs/api/artifact_lifter.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _lift_or_lower_artifact(self, artifact, mode):
for attr in target_attrs:
if hasattr(lifted_art, attr):
curr_val = getattr(lifted_art, attr)
if not curr_val:
if curr_val is None:
continue

# special handling for stack variables
Expand Down
51 changes: 36 additions & 15 deletions libbs/api/decompiler_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
Artifact,
Function, FunctionHeader, StackVariable,
Comment, GlobalVariable, Patch,
Enum, Struct, FunctionArgument, Decompilation
Enum, Struct, FunctionArgument, Context, Decompilation
)
from libbs.decompilers import SUPPORTED_DECOMPILERS, ANGR_DECOMPILER, \
BINJA_DECOMPILER, IDA_DECOMPILER, GHIDRA_DECOMPILER
Expand Down Expand Up @@ -59,7 +59,7 @@ def __init__(
gui_init_args: Optional[Tuple] = None,
gui_init_kwargs: Optional[Dict] = None,
# [artifact_class] = list(callback_func)
artifact_write_callbacks: Optional[Dict[Type[Artifact], List[Callable]]] = None,
artifact_change_callbacks: Optional[Dict[Type[Artifact], List[Callable]]] = None,
thread_artifact_callbacks: bool = True,
):
self.name = name
Expand All @@ -69,11 +69,9 @@ def __init__(
self.qt_version = qt_version
self._error_on_artifact_duplicates = error_on_artifact_duplicates

# GUI things
self.headless = headless
self._headless_dec_path = Path(headless_dec_path) if headless_dec_path else None
self._binary_path = Path(binary_path) if binary_path else None

self._init_plugin = init_plugin
self._unparsed_gui_ctx_actions = gui_ctx_menu_actions or {}
# (category, name, action_string, callback_func)
Expand All @@ -86,7 +84,7 @@ def __init__(
self.artifact_write_lock = threading.Lock()

# callback functions, keyed by Artifact class
self.artifact_write_callbacks = artifact_write_callbacks or defaultdict(list)
self.artifact_change_callbacks = artifact_change_callbacks or defaultdict(list)
self._thread_artifact_callbacks = thread_artifact_callbacks

# artifact dict aliases:
Expand Down Expand Up @@ -165,11 +163,11 @@ def shutdown(self):
# GUI API
#

def gui_active_context(self) -> libbs.artifacts.Function:
def gui_active_context(self) -> Optional[libbs.artifacts.Context]:
"""
Returns a libbs Function. Currently only functions are supported as current contexts.
This function will be called very frequently, so its important that its implementation is fast
and can be done many times in the decompiler.
Returns the active location that the user is currently _clicked_ on in the decompiler.
This is returned as a Context object, which can address and screen naming information dependent
on the decompilers exposed data.
"""
raise NotImplementedError

Expand Down Expand Up @@ -255,6 +253,18 @@ def binary_path(self) -> Optional[str]:
"""
return self._binary_path

def fast_get_function(self, func_addr) -> Optional[Function]:
"""
Attempts to get a light version of the Function at func_addr.
This function implements special logic to be faster than grabbing all light-functions, or grabbing
a decompiled function. Use this API in the case where you may need to get a single functions info
many times in a loop.
@param func_addr:
@return:
"""
raise NotImplementedError

def get_func_size(self, func_addr) -> int:
"""
Returns the size of a function
Expand Down Expand Up @@ -529,9 +539,20 @@ def _set_function_header(self, fheader: FunctionHeader, **kwargs) -> bool:
# lift it ONCE inside this function. Each one will return the lifted form, for easier overriding.
#

def gui_context_changed(self, ctx: Context, **kwargs) -> libbs.artifacts.Context:
# XXX: should this be lifted?
for callback_func in self.artifact_change_callbacks[Context]:
args = (ctx,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
else:
callback_func(*args, **kwargs)

return ctx

def function_header_changed(self, fheader: FunctionHeader, **kwargs) -> FunctionHeader:
lifted_fheader = self.art_lifter.lift(fheader)
for callback_func in self.artifact_write_callbacks[FunctionHeader]:
for callback_func in self.artifact_change_callbacks[FunctionHeader]:
args = (lifted_fheader,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
Expand All @@ -542,7 +563,7 @@ def function_header_changed(self, fheader: FunctionHeader, **kwargs) -> Function

def stack_variable_changed(self, svar: StackVariable, **kwargs) -> StackVariable:
lifted_svar = self.art_lifter.lift(svar)
for callback_func in self.artifact_write_callbacks[StackVariable]:
for callback_func in self.artifact_change_callbacks[StackVariable]:
args = (lifted_svar,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
Expand All @@ -554,7 +575,7 @@ def stack_variable_changed(self, svar: StackVariable, **kwargs) -> StackVariable
def comment_changed(self, comment: Comment, deleted=False, **kwargs) -> Comment:
kwargs["deleted"] = deleted
lifted_cmt = self.art_lifter.lift(comment)
for callback_func in self.artifact_write_callbacks[Comment]:
for callback_func in self.artifact_change_callbacks[Comment]:
args = (lifted_cmt,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
Expand All @@ -566,7 +587,7 @@ def comment_changed(self, comment: Comment, deleted=False, **kwargs) -> Comment:
def struct_changed(self, struct: Struct, deleted=False, **kwargs) -> Struct:
kwargs["deleted"] = deleted
lifted_struct = self.art_lifter.lift(struct)
for callback_func in self.artifact_write_callbacks[Struct]:
for callback_func in self.artifact_change_callbacks[Struct]:
args = (lifted_struct,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
Expand All @@ -578,7 +599,7 @@ def struct_changed(self, struct: Struct, deleted=False, **kwargs) -> Struct:
def enum_changed(self, enum: Enum, deleted=False, **kwargs) -> Enum:
kwargs["deleted"] = deleted
lifted_enum = self.art_lifter.lift(enum)
for callback_func in self.artifact_write_callbacks[Enum]:
for callback_func in self.artifact_change_callbacks[Enum]:
args = (lifted_enum,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
Expand All @@ -589,7 +610,7 @@ def enum_changed(self, enum: Enum, deleted=False, **kwargs) -> Enum:

def global_variable_changed(self, gvar: GlobalVariable, **kwargs) -> GlobalVariable:
lifted_gvar = self.art_lifter.lift(gvar)
for callback_func in self.artifact_write_callbacks[GlobalVariable]:
for callback_func in self.artifact_change_callbacks[GlobalVariable]:
args = (lifted_gvar,)
if self._thread_artifact_callbacks:
threading.Thread(target=callback_func, args=args, kwargs=kwargs, daemon=True).start()
Expand Down
1 change: 1 addition & 0 deletions libbs/artifacts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .patch import Patch
from .stack_variable import StackVariable
from .struct import Struct, StructMember
from .context import Context

ART_NAME_TO_CLS = {
Function.__name__: Function,
Expand Down
26 changes: 26 additions & 0 deletions libbs/artifacts/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Optional

from .artifact import Artifact


class Context(Artifact):
__slots__ = Artifact.__slots__ + (
"addr",
"func_addr",
"screen_name"
)

def __init__(self, addr: int = None, func_addr: Optional[int] = None, screen_name: str = None, **kwargs):
self.addr: Optional[int] = addr
self.func_addr: Optional[int] = func_addr
self.screen_name: str = screen_name
super().__init__(**kwargs)

def __str__(self):
post_text = f" screen={self.screen_name}" if self.screen_name else ""
if self.func_addr is not None:
post_text = f"@{hex(self.func_addr)}" + post_text
if self.addr is not None:
post_text = hex(self.addr) + post_text

return f"<Context {post_text}>"
24 changes: 18 additions & 6 deletions libbs/decompilers/angr/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
DecompilerInterface,
)
from libbs.artifacts import (
Function, FunctionHeader, Comment, StackVariable, FunctionArgument, Artifact, Decompilation
Function, FunctionHeader, Comment, StackVariable, FunctionArgument, Artifact, Decompilation, Context
)
from .artifact_lifter import AngrArtifactLifter

Expand Down Expand Up @@ -75,6 +75,18 @@ def binary_path(self) -> Optional[str]:
except Exception:
return None

def fast_get_function(self, func_addr) -> Optional[Function]:
lowered_addr = self.art_lifter.lower_addr(func_addr)
try:
_func = self.main_instance.project.kb.functions[lowered_addr]
except KeyError:
self.warning(f"Function at {hex(func_addr)} not found.")
return None

func = Function(addr=_func.addr, size=_func.size, name=_func.name)
func.header.type = _func.prototype.returnty.c_repr() if _func.prototype.returnty else None
return self.art_lifter.lift(func)

def get_func_size(self, func_addr) -> int:
func_addr = self.art_lifter.lower_addr(func_addr)
try:
Expand Down Expand Up @@ -186,7 +198,7 @@ def gui_register_ctx_menu(self, name, action_string, callback_func, category=Non
self.gui_plugin.context_menu_items = self._ctx_menu_items
return True

def gui_active_context(self):
def gui_active_context(self) -> Optional[Context]:
curr_view = self.workspace.view_manager.current_tab
if not curr_view:
return None
Expand All @@ -196,13 +208,13 @@ def gui_active_context(self):
except NotImplementedError:
return None

# TODO: support addr and screen_name for Context
if func is None or func.am_obj is None:
return None

func_addr = self.art_lifter.lift_addr(func.addr)
return Function(
func_addr, func.size, header=FunctionHeader(func.name, func_addr)
)
context = Context(addr=None, func_addr=func.addr)
return self.art_lifter.lift(context)


#
# Artifact API
Expand Down
27 changes: 18 additions & 9 deletions libbs/decompilers/binja/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from libbs.artifacts import (
Function, FunctionHeader, StackVariable,
Comment, GlobalVariable, Patch, StructMember, FunctionArgument,
Enum, Struct, Artifact, Decompilation
Enum, Struct, Artifact, Decompilation, Context
)

from .artifact_lifter import BinjaArtifactLifter
Expand Down Expand Up @@ -92,7 +92,7 @@ def __del__(self):
# GUI
#

def gui_active_context(self):
def gui_active_context(self) -> Optional[Context]:
all_contexts = UIContext.allContexts()
if not all_contexts:
return None
Expand All @@ -103,14 +103,14 @@ def gui_active_context(self):
return None

actionContext = handler.actionContext()
func = actionContext.function
if func is None:
if actionContext is None:
return None

func_addr = self.art_lifter.lift_addr(func.start)
return libbs.artifacts.Function(
func_addr, 0, header=FunctionHeader(func.name, func_addr)
)
func_addr = actionContext.function.start if actionContext.function is not None else None
addr = actionContext.address if actionContext.address is not None else None
# TODO: support screen_name
context = Context(addr=addr, func_addr=func_addr)
return self.art_lifter.lift(context)

def gui_goto(self, func_addr) -> None:
func_addr = self.art_lifter.lower_addr(func_addr)
Expand Down Expand Up @@ -163,6 +163,14 @@ def binary_path(self) -> Optional[str]:
except Exception:
return None

def fast_get_function(self, func_addr) -> Optional[Function]:
func_addr = self.art_lifter.lower_addr(func_addr)
func = self.bv.get_function_at(func_addr)
if not func:
return None

return self.art_lifter.lift(self.bn_func_to_bs(func))

def get_func_size(self, func_addr) -> int:
func_addr = self.art_lifter.lower_addr(func_addr)
func = self.bv.get_function_at(func_addr)
Expand Down Expand Up @@ -283,7 +291,7 @@ def rename_local_variables_by_names(self, func: Function, name_map: Dict[str, st

return update

def get_decompilation_object(self, function: Function) -> Optional[object]:
def get_decompilation_object(self, function: Function, **kwargs) -> Optional[object]:
"""
Binary Ninja has no internal object that needs to be refreshed.
"""
Expand Down Expand Up @@ -372,6 +380,7 @@ def _set_function_header(self, fheader: FunctionHeader, bn_func=None, **kwargs)
if bs_var.type and bs_var.type != self.art_lifter.lift_type(str(bn_var.type)):
bn_var.type = bs_var.type
updates |= True
# refresh
bn_var = bn_func.parameter_vars[i]

# name
Expand Down
Loading

0 comments on commit 36a5665

Please sign in to comment.