Skip to content

Commit

Permalink
init commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Learner0x5a committed May 17, 2022
0 parents commit ee0e91a
Show file tree
Hide file tree
Showing 10 changed files with 1,291 additions and 0 deletions.
49 changes: 49 additions & 0 deletions IDAGenDFG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import idaapi
import idautils
import idc
import ida_pro
import ida_auto
import os, sys
from libdataflow import ida_dataflow_analysis
from argparse import ArgumentParser

def main(OUTPUT_DIR:str) -> None:
os.makedirs(OUTPUT_DIR, exist_ok=True)

textStartEA = 0
textEndEA = 0
for seg in idautils.Segments():
if (idc.get_segm_name(seg)==".text"):
textStartEA = idc.get_segm_start(seg)
textEndEA = idc.get_segm_end(seg)
break

for func in idautils.Functions(textStartEA, textEndEA):
# Ignore Library Code
flags = idc.get_func_attr(func, idc.FUNCATTR_FLAGS)
if flags & idc.FUNC_LIB:
print(hex(func), "FUNC_LIB", idc.get_func_name(func))
continue
try:
ida_dataflow_analysis(func, idc.get_func_name(func), OUTPUT_DIR, defuse_only=True)
except Exception as e:
print('Skip function {} due to dataflow analysis error: {}'.format(idc.get_func_name(func),e))

if __name__ == '__main__':
if len(idc.ARGV) < 2:
print('\n\nGenerating DFG & Def-Use Graph with IDA Pro and MIASM')
print('\tNeed to specify the output dir with -o option')
print('\tUsage: /path/to/ida -A -Lida.log -S"{} -o <output_dir>" /path/to/binary\n\n'.format(idc.ARGV[0]))
ida_pro.qexit(1)

parser = ArgumentParser(description="IDAPython script for generating dataflow graph of each function in the given binary")
parser.add_argument("-o", "--output_dir", help="Output dir", default='./outputs', nargs='?')
# parser.add_argument("-s", "--symb", help="Symbolic execution mode",
# action="store_true")
args = parser.parse_args()

ida_auto.auto_wait()

main(args.output_dir)

ida_pro.qexit(0)
339 changes: 339 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ida-dfg

IDA Pro data-flow graph generator

Tested with IDA Pro 7.6 and miasm 7ee593d

## libdataflow.py

封装了两个核心接口给其他脚本用
- `ida_dataflow_analysis`: 面向IDA + MIASM的场景
- `miasm_dataflow_analysis`: 单独使用,不需要IDA Pro

## IDAGenDFG.py

IDAPython调用的脚本

`/path/to/ida -A -Lida.log -S"path/to/IDAGenDFG.py -o <output_dir>" /path/to/binary`

## deprecated/graph_dataflow.py

新版miasm支持的DFG/ReachinDefinition/DefUse分析

## deprecated/libdfg.py

代码升级 & debug工作停止,因为新版miasm自身支持dfg生成。

但是这部分代码的价值在于学习如何将miasm用到IDAPython里,详见`dataflow_analysis`函数。



## miasm的一些核心概念:
- machine类: 定义架构、反汇编引擎、lifter
- LocationDB类:各类数据结构的loc_key(unique id),例如AsmBlock, IRBlock的loc_key;以及定义了offset和loc_key相互转换的函数
- Instruction类:可以在miasm.core.cpu内查看其成员函数、变量
- AsmCFG类、AsmBlock类:汇编控制流图、基本块
- IRBlock类、AssignBlock类:AsmBlock经Lifter翻译得到IRBlock,每一个IRBlock有若干个AssignBlock
* 每个AssignBlock对应一条IR赋值语句(src -> dst),同时也可以对应回一条汇编指令(assignblk.instr)

## miasm的局限性

- 反汇编较慢
- 无法处理80bit浮点数
276 changes: 276 additions & 0 deletions libdataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
import os
from future.utils import viewitems, viewvalues
from utils import guess_machine

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.expression.expression import get_expr_mem
from miasm.analysis.data_analysis import inter_block_flow #, intra_block_flow_raw
from miasm.core.graph import DiGraph
from miasm.ir.symbexec import SymbolicExecutionEngine
from miasm.analysis.data_flow import DeadRemoval, ReachingDefinitions, DiGraphDefUse
from miasm.core.locationdb import LocationDB
from miasm.core.bin_stream_ida import bin_stream_ida

def intra_block_flow_symb(lifter, _, flow_graph, irblock, in_nodes, out_nodes):
symbols_init = lifter.arch.regs.regs_init.copy()
sb = SymbolicExecutionEngine(lifter, symbols_init)
sb.eval_updt_irblock(irblock)
print('*' * 40)
print(irblock)


out = sb.modified(mems=False)
current_nodes = {}
# Gen mem arg to mem node links
for dst, src in out:
src = sb.eval_expr(dst)
for n in [dst, src]:

all_mems = set()
all_mems.update(get_expr_mem(n))

for n in all_mems:
node_n_w = (irblock.loc_key, 0, n)
if not n == src:
continue
o_r = n.ptr.get_r(mem_read=False, cst_read=True)
for i, n_r in enumerate(o_r):
if n_r in current_nodes:
node_n_r = current_nodes[n_r]
else:
node_n_r = (irblock.loc_key, i, n_r)
if not n_r in in_nodes:
in_nodes[n_r] = node_n_r
flow_graph.add_uniq_edge(node_n_r, node_n_w)

# Gen data flow links
for dst in out:
src = sb.eval_expr(dst)
nodes_r = src.get_r(mem_read=False, cst_read=True)
nodes_w = set([dst])
for n_r in nodes_r:
if n_r in current_nodes:
node_n_r = current_nodes[n_r]
else:
node_n_r = (irblock.loc_key, 0, n_r)
if not n_r in in_nodes:
in_nodes[n_r] = node_n_r

flow_graph.add_node(node_n_r)
for n_w in nodes_w:
node_n_w = (irblock.loc_key, 1, n_w)
out_nodes[n_w] = node_n_w

flow_graph.add_node(node_n_w)
flow_graph.add_uniq_edge(node_n_r, node_n_w)



def intra_block_flow_raw(lifter, ircfg, flow_graph, irb, in_nodes, out_nodes):
"""
Create data flow for an irbloc using raw IR expressions
"""
current_nodes = {}
for i, assignblk in enumerate(irb):
dict_rw = assignblk.get_rw(cst_read=True)
current_nodes.update(out_nodes)

# gen mem arg to mem node links
all_mems = set()
for node_w, nodes_r in viewitems(dict_rw):
for n in nodes_r.union([node_w]):
all_mems.update(get_expr_mem(n))
if not all_mems:
continue

for n in all_mems:
node_n_w = (hex(assignblk.instr.offset), i, n)
if not n in nodes_r:
continue
o_r = n.ptr.get_r(mem_read=False, cst_read=True)
for n_r in o_r:
if n_r in current_nodes:
node_n_r = current_nodes[n_r]
else:
node_n_r = (hex(assignblk.instr.offset), i, n_r)
current_nodes[n_r] = node_n_r
in_nodes[n_r] = node_n_r
flow_graph.add_uniq_edge(node_n_r, node_n_w)

# gen data flow links
for node_w, nodes_r in viewitems(dict_rw):
for n_r in nodes_r:
if n_r in current_nodes:
node_n_r = current_nodes[n_r]
else:
node_n_r = (hex(assignblk.instr.offset), i, n_r)
current_nodes[n_r] = node_n_r
in_nodes[n_r] = node_n_r

flow_graph.add_node(node_n_r)

node_n_w = (hex(assignblk.instr.offset), i + 1, node_w)
out_nodes[node_w] = node_n_w

flow_graph.add_node(node_n_w)
flow_graph.add_uniq_edge(node_n_r, node_n_w)



def node2str(node):
out = "%s,%s\\l\\\n%s" % node
return out


def gen_function_data_flow_graph(lifter, ircfg, ad, block_flow_cb) -> DiGraph:
'''
generate data flow graph for a given function
'''
irblock_0 = None
for irblock in viewvalues(ircfg.blocks):
loc_key = irblock.loc_key
offset = ircfg.loc_db.get_location_offset(loc_key)
# print('{} -> {}'.format(hex(offset), irblock.loc_key))
if offset == ad:
irblock_0 = irblock
break
assert irblock_0 is not None
flow_graph = DiGraph()
flow_graph.node2str = node2str


irb_in_nodes = {}
irb_out_nodes = {}
for label in ircfg.blocks:
irb_in_nodes[label] = {}
irb_out_nodes[label] = {}

for label, irblock in viewitems(ircfg.blocks):
block_flow_cb(lifter, ircfg, flow_graph, irblock, irb_in_nodes[label], irb_out_nodes[label])

# for label in ircfg.blocks:
# print(label)
# print('IN', [str(x) for x in irb_in_nodes[label]])
# print('OUT', [str(x) for x in irb_out_nodes[label]])

# print('*' * 20, 'interblock', '*' * 20)
inter_block_flow(lifter, ircfg, flow_graph, irblock_0.loc_key, irb_in_nodes, irb_out_nodes)

return flow_graph


def ida_dataflow_analysis(function_addr:int, function_name:str, output_dir:str, defuse_only: bool = False) -> None:

loc_db = LocationDB()

###################### IDA specific #######################
machine = guess_machine()
bin_stream = bin_stream_ida()

# Populate symbols with ida names
import idautils
for ad, name in idautils.Names():
if name is None:
continue
loc_db.add_location(name, ad)


###################### Reverse-tool-independent ######################

mdis = machine.dis_engine(bin_stream, loc_db=loc_db, dont_dis_nulstart_bloc=True)
mdis.follow_call = True
lifter = machine.lifter_model_call(loc_db=loc_db)

print('disassembling function: {}:{}'.format(hex(function_addr), function_name))
asmcfg = mdis.dis_multiblock(function_addr)

print('generating IR...')
ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
deadrm = DeadRemoval(lifter)
# deadrm(ircfg) # TODO: 这里会删掉一部分IR,需要研究一下

with open(os.path.join(output_dir, '{}.asm2ir'.format(function_name)),'w') as f:
# print('\tOFFSET\t| ASM\t| SRC -> DST')
f.write('\tOFFSET\t| ASM\t| SRC -> DST\n')
for lbl, irblock in ircfg.blocks.items():
insr = []
for assignblk in irblock:
for dst, src in assignblk.iteritems():
# print('\t{}\t| {}\t| {} -> {}'.format(hex(assignblk.instr.offset), assignblk.instr, src, dst))
f.write('\t{}\t| {}\t| {} -> {}\n'.format(hex(assignblk.instr.offset), assignblk.instr, src, dst))

if not defuse_only:
block_flow_cb = intra_block_flow_raw # if args.symb else intra_block_flow_symb

dfg = gen_function_data_flow_graph(lifter, ircfg, function_addr, block_flow_cb)
open(os.path.join(output_dir,'{}_dfg.dot'.format(function_name)), 'w').write(dfg.dot())

reaching_defs = ReachingDefinitions(ircfg)
defuse = DiGraphDefUse(reaching_defs)
open(os.path.join(output_dir,'{}_defuse.dot'.format(function_name)), 'w').write(defuse.dot())

'''
根据block_loc_key + assignblk_idx 可以推算出instr offset,所以这个def-use图也是可以对应回指令的
'''
LocKeyIdx2InstrOffset = {}
for block in viewvalues(reaching_defs.ircfg.blocks):
for index, assignblk in enumerate(block):
LocKeyIdx2InstrOffset['{}_{}'.format(block.loc_key, index)] = hex(assignblk.instr.offset)

# print(['{}:{}'.format(key,LocKeyIdx2InstrOffset[key]) for key in LocKeyIdx2InstrOffset])
open(os.path.join(output_dir,'{}_LocKeyIdx2InstrOffset.map'.format(function_name)), 'w').write(
'\n'.join(['{}:{}'.format(key,LocKeyIdx2InstrOffset[key]) for key in LocKeyIdx2InstrOffset]))


def miasm_dataflow_analysis(function_addr:int, function_name:str, output_dir:str, filepath:str, arch:str = "X86_64", defuse_only: bool = False) -> None:

bin_stream = Container.from_stream(open(filepath, 'rb'), loc_db).bin_stream
machine = Machine(arch)

loc_db = LocationDB()
mdis = machine.dis_engine(bin_stream, loc_db=loc_db, dont_dis_nulstart_bloc=True)
mdis.follow_call = True
lifter = machine.lifter_model_call(loc_db=loc_db)

print('disassembling function: {}:{}'.format(hex(function_addr), function_name))
asmcfg = mdis.dis_multiblock(function_addr)

print('generating IR...')
ircfg = lifter.new_ircfg_from_asmcfg(asmcfg)
deadrm = DeadRemoval(lifter)
# deadrm(ircfg) # TODO: 这里会删掉一部分IR,需要研究一下

with open(os.path.join(output_dir, '{}.asm2ir'.format(function_name)),'w') as f:
# print('\tOFFSET\t| ASM\t| SRC -> DST')
f.write('\tOFFSET\t| ASM\t| SRC -> DST\n')
for lbl, irblock in ircfg.blocks.items():
insr = []
for assignblk in irblock:
for dst, src in assignblk.iteritems():
# print('\t{}\t| {}\t| {} -> {}'.format(hex(assignblk.instr.offset), assignblk.instr, src, dst))
f.write('\t{}\t| {}\t| {} -> {}\n'.format(hex(assignblk.instr.offset), assignblk.instr, src, dst))

if not defuse_only:
block_flow_cb = intra_block_flow_raw # if args.symb else intra_block_flow_symb

dfg = gen_function_data_flow_graph(lifter, ircfg, function_addr, block_flow_cb)
open(os.path.join(output_dir,'{}_dfg.dot'.format(function_name)), 'w').write(dfg.dot())

reaching_defs = ReachingDefinitions(ircfg)
defuse = DiGraphDefUse(reaching_defs)
open(os.path.join(output_dir,'{}_defuse.dot'.format(function_name)), 'w').write(defuse.dot())

'''
根据block_loc_key + assignblk_idx 可以推算出instr offset,所以这个def-use图也是可以对应回指令的
'''
LocKeyIdx2InstrOffset = {}
for block in viewvalues(reaching_defs.ircfg.blocks):
for index, assignblk in enumerate(block):
LocKeyIdx2InstrOffset['{}_{}'.format(block.loc_key, index)] = hex(assignblk.instr.offset)

# print(['{}:{}'.format(key,LocKeyIdx2InstrOffset[key]) for key in LocKeyIdx2InstrOffset])
open(os.path.join(output_dir,'{}_LocKeyIdx2InstrOffset.map'.format(function_name)), 'w').write(
'\n'.join(['{}:{}'.format(key,LocKeyIdx2InstrOffset[key]) for key in LocKeyIdx2InstrOffset]))


Binary file added testcase/test
Binary file not shown.
17 changes: 17 additions & 0 deletions testcase/test-idapython.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import idc
import idautils
import idaapi
import ida_pro
import ida_auto
ida_auto.auto_wait()


for func in idautils.Functions():

func_name = idc.get_func_name(func)
print(hex(func),':',func_name)




ida_pro.qexit(0)
Loading

0 comments on commit ee0e91a

Please sign in to comment.