Skip to content

Commit

Permalink
Support artifact watching in Binja (#30)
Browse files Browse the repository at this point in the history
* Fix Binja interface

* Add support for Binja artifact watchers

* Catch and record comments

* Bump and update readme
  • Loading branch information
mahaloz authored Jan 29, 2024
1 parent e4353e8 commit 38977fc
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 152 deletions.
8 changes: 0 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,3 @@ for func_addr, light_func in deci.functions.items():

Notice, when using the `items` function the function is `light`, meaning it does not contain stack vars and other
info. This also means using `keys`, `values`, or `list` on an artifact dictionary will have the same affect.

### IDA
- [ ] G/S Comments

### Ghidra
- [ ] Change Callbacks
- [ ] Get/Set Comments

4 changes: 2 additions & 2 deletions examples/change_watcher_plugin/bs_change_watcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def create_plugin(*args, **kwargs):
gui_init_kwargs=kwargs
)
# create a function to print a string in the decompiler console
decompiler_printer = lambda *x, **y: deci.print(f"Changed {x}{y}")
decompiler_printer = lambda *x: deci.print(f"Changed {x}")
# register the callback for all the types we want to print
deci.artifact_write_callbacks = {
typ: [decompiler_printer] for typ in (FunctionHeader, StackVariable, Enum, Struct, GlobalVariable, Comment,)
Expand All @@ -32,7 +32,7 @@ def create_plugin(*args, **kwargs):
deci.gui_register_ctx_menu(
"StartArtifactChangeWatcher",
"Start watching artifact changes",
lambda: deci.start_artifact_watchers(),
lambda *x, **y: deci.start_artifact_watchers(),
category="ArtifactChangeWatcher"
)

Expand Down
2 changes: 1 addition & 1 deletion libbs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.21.1"
__version__ = "0.22.0"

import logging
logging.getLogger("libbs").addHandler(logging.NullHandler())
Expand Down
3 changes: 1 addition & 2 deletions libbs/artifacts/comment.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ class Comment(Artifact):

def __init__(self, addr, comment, func_addr=None, decompiled=False, last_change=None):
super(Comment, self).__init__(last_change=last_change)
if comment:
self.comment = self.linewrap_comment(comment)

self.comment = self.linewrap_comment(comment) if comment else ""
self.decompiled = decompiled
self.addr = addr # type: int
self.func_addr = func_addr
Expand Down
214 changes: 143 additions & 71 deletions libbs/decompilers/binja/hooks.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,16 @@
import re

from binaryninjaui import (
UIContext,
DockHandler,
DockContextHandler,
UIAction,
UIActionHandler,
Menu,
SidebarWidget,
SidebarWidgetType,
Sidebar,
)
from collections import defaultdict
from typing import Dict

import binaryninja
from binaryninja import PluginCommand
from binaryninja.types import StructureType, EnumerationType
from binaryninja import SymbolType
from binaryninja.binaryview import BinaryDataNotification

from collections import defaultdict
import logging

from .interface import BinjaInterface
from binsync.artifacts import (
Artifact,
Function, FunctionHeader, FunctionArgument, Comment, GlobalVariable, Enum, StructMember
from libbs.artifacts import (
FunctionHeader, FunctionArgument, GlobalVariable, StackVariable, Comment
)

l = logging.getLogger(__name__)
Expand All @@ -36,104 +23,189 @@
class DataMonitor(BinaryDataNotification):
def __init__(self, view, interface):
super().__init__()
self._view = view
self._interface = interface
self._func_addr_requested = None
self._func_before_change = None
self._bv = view
self._interface: BinjaInterface = interface
self._changing_func_addr = None
self._changing_func_pre_change = None
self._seen_comments = defaultdict(dict)

def function_updated(self, view, func_):
if self._interface.sync_lock.locked() or self._func_before_change is None:
# TODO: add support for creating functions here
return
# updates that occur without a service request are requests for comment changes
if self._changing_func_pre_change is None:
#
# comments
#

func_addr = func_.start
current_comments = dict(func_.comments)
if self._seen_comments[func_addr] != current_comments:
# comments changed
old_comments = self._seen_comments
new_comments = current_comments

for addr, old_comment in old_comments.items():
new_comment = new_comments.get(addr, None)
if new_comment == old_comment:
continue

self._interface.comment_changed(
self._interface.art_lifter.lift(
Comment(
addr, str(new_comment) if new_comment else "", decompiled=True, func_addr=func_addr
)
)
)

for addr, new_comment in new_comments.items():
if addr in old_comments:
continue

if new_comment:
self._interface.comment_changed(
self._interface.art_lifter.lift(
Comment(addr, str(new_comment), decompiled=True, func_addr=func_addr)
)
)

self._seen_comments[func_addr] = current_comments

# service requested function only
if self._func_addr_requested == func_.start:
l.debug(f"Update on {hex(self._func_addr_requested)} being processed...")
self._func_addr_requested = None
if self._changing_func_pre_change is not None and self._changing_func_addr == func_.start:
l.debug(f"Update on {hex(self._changing_func_addr)} being processed...")
self._changing_func_addr = None

# convert to libbs Function type for diffing
bn_func = view.get_function_at(func_.start)
bs_func = BinjaInterface.bn_func_to_bs(bn_func)
current_comments = dict(bn_func.comments)

#
# header
# NOTE: function name done inside symbol update hook
#

# check if the headers differ
if self._func_before_change.header.diff(bs_func.header):
self._interface.schedule_job(
self._interface.push_artifact,
bs_func.header
)

# NOTE: function name done inside symbol update hook
if self._changing_func_pre_change.header.diff(bs_func.header):
old_header: FunctionHeader = self._changing_func_pre_change.header
new_header: FunctionHeader = bs_func.header

old_args = old_header.args or {}
for off, old_arg in old_args.items():
new_arg = new_header.args.get(off, None)
if new_arg is None:
# TODO: support deleting args
continue

if old_arg == new_arg:
continue

diff_arg = FunctionArgument(off, None, None, None)
if old_arg.name != new_arg.name:
diff_arg.name = str(new_arg.name)

if old_arg.type != new_arg.type:
diff_arg.type = str(new_arg.type)

if old_arg.size != new_arg.size:
diff_arg.size = int(new_arg.size)

self._interface.function_header_changed(
self._interface.art_lifter.lift(
FunctionHeader(None, old_header.addr, args={off: diff_arg})
)
)

# new func args added to header
for off, new_arg in bs_func.args.items():
if off in old_args:
continue

self._interface.function_header_changed(
self._interface.art_lifter.lift(
FunctionHeader(None, old_header.addr, args={
off: FunctionArgument(off, str(new_arg.name), str(new_arg.type), int(new_arg.size))
}
)
)
)

#
# stack vars
#

for off, var in self._func_before_change.stack_vars.items():
if off in bs_func.stack_vars and var != bs_func.stack_vars[off]:
new_var = bs_func.stack_vars[off]
if re.match(r"var_\d+[_\d+]{0,1}", new_var.name) \
or new_var.name in {'__saved_rbp', '__return_addr',}:
header_args_names = set([arg.name for arg in bs_func.header.args.values()])
if self._changing_func_pre_change.stack_vars != bs_func.stack_vars:
old_svs: Dict[int, StackVariable] = self._changing_func_pre_change.stack_vars
new_svs: Dict[int, StackVariable] = bs_func.stack_vars

for off, old_sv in old_svs.items():
new_sv = new_svs.get(off, None)
if new_sv is None or new_sv.name in header_args_names:
continue

self._interface.schedule_job(
self._interface.push_artifact,
new_var
if old_sv == new_sv:
continue

diff_sv = StackVariable(off, None, None, old_sv.size, bs_func.addr)
if old_sv.name != new_sv.name:
diff_sv.name = str(new_sv.name)

if old_sv.type != new_sv.type:
diff_sv.type = str(new_sv.type)

self._interface.stack_variable_changed(
self._interface.art_lifter.lift(diff_sv)
)

for off, new_sv in new_svs.items():
if off in old_svs or new_sv.name in header_args_names:
continue

self._interface.stack_variable_changed(
self._interface.art_lifter.lift(
StackVariable(off, str(new_sv.name), str(new_sv.type), new_sv.size, bs_func.addr)
)
)

self._func_before_change = None
self._changing_func_pre_change = None

def function_update_requested(self, view, func):
if not self._interface.sync_lock.locked() and self._func_addr_requested is None:
if self._changing_func_addr is None:
l.debug(f"Update on {func} requested...")
self._func_addr_requested = func.start
self._func_before_change = BinjaInterface.bn_func_to_bs(func)
self._changing_func_addr = func.start
self._changing_func_pre_change = BinjaInterface.bn_func_to_bs(func)

def symbol_updated(self, view, sym):
if self._interface.sync_lock.locked():
return

l.debug(f"Symbol update Requested on {sym}...")
if sym.type == SymbolType.FunctionSymbol:
l.debug(f" -> Function Symbol")
func = view.get_function_at(sym.address)
bs_func = BinjaInterface.bn_func_to_bs(func)
self._interface.schedule_job(
self._interface.push_artifact,
FunctionHeader(sym.name, sym.address, type_=bs_func.header.type, args=bs_func.header.args)
self._interface.function_header_changed(
self._interface.art_lifter.lift(FunctionHeader(bs_func.name, bs_func.addr))
)
elif sym.type == SymbolType.DataSymbol:
l.debug(f" -> Data Symbol")
var: binaryninja.DataVariable = view.get_data_var_at(sym.address)

self._interface.schedule_job(
self._interface.push_artifact,
GlobalVariable(var.address, var.name, type_=str(var.type), size=var.type.width)
self._interface.global_variable_changed(
self._interface.art_lifter.lift(
GlobalVariable(int(sym.address), str(var.name), type_=str(var.type), size=int(var.type.width))
)
)
else:
l.debug(f" -> Other Symbol: {sym.type}")
print(f" -> Other Symbol: {sym.type}")
pass

def type_defined(self, view, name, type_):
l.debug(f"Type Defined: {name} {type_}")
name = str(name)
if self._interface.sync_lock.locked():
return

if isinstance(type_, StructureType):
bs_struct = BinjaInterface.bn_struct_to_bs(name, type_)
self._interface.schedule_job(
self._interface.push_artifact,
bs_struct
self._interface.struct_changed(
self._interface.art_lifter.lift(bs_struct)
)

elif isinstance(type_, EnumerationType):
bs_enum = BinjaInterface.bn_enum_to_bs(name, type_)
self._interface.schedule_job(self._interface.push_artifact, bs_enum)


def start_data_monitor(view, controller):
notification = DataMonitor(view, controller)
view.register_notification(notification)
self._interface.enum_changed(
self._interface.art_lifter.lift(bs_enum)
)
Loading

0 comments on commit 38977fc

Please sign in to comment.