Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LRU caches for performance improvements #24

Merged
merged 8 commits into from
Apr 19, 2022
73 changes: 30 additions & 43 deletions rainbow/rainbow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@
# Copyright 2019 Victor Servant, Ledger SAS


import functools
import math
import os
import weakref
from typing import Callable, Tuple
import capstone as cs
import colorama
import lief
import unicorn as uc
from pygments import highlight
from pygments.formatters import TerminalFormatter as formatter
from pygments.lexers import NasmLexer

from rainbow.color_functions import color
from rainbow.loaders import load_selector
from .color_functions import color
from .loaders import load_selector
from .tracers import regs_hd_sum_trace, regs_hw_sum_trace


class HookWeakMethod:
Expand Down Expand Up @@ -270,12 +271,12 @@ def setup(self):
self.block_hook = self.emu.hook_add(uc.UC_HOOK_BLOCK,
HookWeakMethod(self.block_handler))
if self.sca_mode:
if (self.sca_HD):
if self.sca_HD:
self.ct_hook = self.emu.hook_add(uc.UC_HOOK_CODE,
HookWeakMethod(self.sca_code_traceHD))
regs_hd_sum_trace, self)
else:
self.ct_hook = self.emu.hook_add(uc.UC_HOOK_CODE,
HookWeakMethod(self.sca_code_trace))
regs_hw_sum_trace, self)
self.tm_hook = self.emu.hook_add(
uc.UC_HOOK_MEM_READ | uc.UC_HOOK_MEM_WRITE,
HookWeakMethod(self.sca_trace_mem))
Expand Down Expand Up @@ -330,15 +331,28 @@ def trace_mem(self, uci, access, address, size, value, user_data):
val = color("CYAN", f"{val:8x}")
print(f" {val} <- [{addr}]", end=" ")

def disassemble_single(self, addr, size):
""" Disassemble a single instruction at address """
instruction = self.emu.mem_read(addr, size)
return next(self.disasm.disasm_lite(bytes(instruction), addr, 1))
# Least-recently used cache for Capstone calls to disasm or disasm_lite
@staticmethod
@functools.lru_cache(maxsize=4096)
def _disassemble_cache(call: Callable, instruction: bytes, addr: int):
return next(call(instruction, addr, 1))
aiooss-ledger marked this conversation as resolved.
Show resolved Hide resolved

def disassemble_single_detailed(self, addr, size):
""" Disassemble a single instruction at addr """
instruction = self.emu.mem_read(addr, 2 * size)
return next(self.disasm.disasm(bytes(instruction), addr, 1))
def disassemble_single(self, addr: int, size: int) -> Tuple[int, int, str, str]:
"""Disassemble a single instruction using Capstone lite

This returns the address, size, mnemonic, and operands of the
instruction at the specified address and size (in bytes).

If you want more information, you should use disassemble_single_detailed
method, but is 30% slower according to Capstone documentation.
"""
insn = self.emu.mem_read(addr, size)
return self._disassemble_cache(self.disasm.disasm_lite, bytes(insn), addr)

def disassemble_single_detailed(self, addr: int, size: int) -> cs.CsInsn:
"""Disassemble a single instruction using Capstone"""
insn = self.emu.mem_read(addr, 2 * size)
return self._disassemble_cache(self.disasm.disasm, bytes(insn), addr)

def print_asmline(self, adr, ins, op_str):
""" Pretty-print assembly using pygments syntax highlighting """
Expand All @@ -349,33 +363,6 @@ def print_asmline(self, adr, ins, op_str):
)
print("\n" + color("YELLOW", f"{adr:8X} ") + line, end=";")

def sca_code_trace(self, uci, address, size, data):
from .tracers import regs_hw_sum_trace
regs_hw_sum_trace(self, address, size, data)

def sca_code_traceHD(self, uci, address, size, data):
"""
Hook that traces modified register values in side-channel mode.

Capstone 4's 'regs_access' method is used to find out which registers are explicitly modified by an instruction.
Once found, the information is stored in self.reg_leak to be stored at the next instruction, once the unicorn engine actually performed the current instruction.
"""
if self.trace:
if self.reg_leak is not None:
for x in self.reg_leak[1]:
if x not in self.TRACE_DISCARD:
self.sca_address_trace.append(self.reg_leak[0])
self.sca_values_trace.append(self.RegistersBackup[self.reg_map[x]] ^ uci.reg_read(self.reg_map[x]))
self.RegistersBackup[self.reg_map[x]] = uci.reg_read(self.reg_map[x])

self.reg_leak = None

ins = self.disassemble_single_detailed(address, size)
_regs_read, regs_written = ins.regs_access()
if len(regs_written) > 0:
self.reg_leak = (f"{address:8X} {ins.mnemonic:<6} {ins.op_str}",list(map(ins.reg_name, regs_written))
)

def code_trace(self, uci, address, size, data):
"""
Hook that traces modified register values in side-channel mode.
Expand All @@ -391,7 +378,7 @@ def code_trace(self, uci, address, size, data):
while True:
s = input("Press Enter to continue, or Input an address and a size to display an address: ")

if s is '':
if s == '':
break
try:
address = eval("0x"+s.split(" ")[0])
Expand Down
123 changes: 105 additions & 18 deletions rainbow/tracers.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,115 @@
# This file is part of rainbow
#
# rainbow is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# Copyright 2020 Victor Servant, Ledger SAS

import functools
from typing import List, Tuple
import capstone as cs
import unicorn as uc

from .utils import hw

def regs_hw_sum_trace(rbw, address, size, data):

# Least-recently used cache for register access extraction
@functools.lru_cache(maxsize=4096)
def registers_accessed_by_instruction(insn: cs.CsInsn) -> Tuple[List[int], List[int]]:
"""Return read and written registers by a single instruction

Registers are represented with Capstone identifiers which mostly maps to
Unicorn identifiers.
"""
return insn.regs_access()


def regs_hw_sum_trace(uci: uc.Uc, address: int, size: int, rbw):
"""Trace written registers Hamming weight

For each instruction, this tracer sums the Hamming weight of all written
registers.

This tracer is hooked by default if sca_mode=True and sca_HD=False.
You may hook it with Unicorn as an `uc.UC_HOOK_CODE` hook.
"""
ins = rbw.reg_leak
if ins is not None:
_, regs_written = ins.regs_access()
v = sum(hw(rbw.emu.reg_read(rbw.reg_map[ins.reg_name(i)])) for i in regs_written)
# Find out which registers are modified
_, regs_written = registers_accessed_by_instruction(ins)
v = sum(hw(uci.reg_read(r)) for r in regs_written)

rbw.sca_address_trace.append( f"{address:8X} {ins.mnemonic:<6} {ins.op_str}" )
rbw.sca_values_trace.append(v)
rbw.sca_address_trace.append(f"{ins.address:8X} {ins.mnemonic:<6} {ins.op_str}")
rbw.sca_values_trace.append(v)

# Information is stored to be used at the next instruction,
# once the unicorn engine actually performed the current instruction.
rbw.reg_leak = rbw.disassemble_single_detailed(address, size)

def wb_regs_trace(rbw, address, size, data):
"""One point per register value, and filter out uninteresting register accesses"""
if rbw.reg_leak:
ins = rbw.reg_leak[0]
for reg in map(ins.reg_name, rbw.reg_leak[1]):
if reg not in rbw.TRACE_DISCARD:
rbw.sca_address_trace.append(ins)
rbw.sca_values_trace.append(rbw.emu.reg_read(rbw.reg_map[reg]))

rbw.reg_leak = None
def regs_hd_sum_trace(uci: uc.Uc, address: int, size: int, rbw):
"""Trace written registers Hamming distance

For each instruction, this tracer sums the Hamming distance of all written
registers with their last value.

You may filter out uninteresting register accesses using TRACE_DISCARD
attribute.

This tracer is hooked by default if sca_mode=True and sca_HD=True.
You may hook it with Unicorn as an `uc.UC_HOOK_CODE` hook.
"""
ins = rbw.reg_leak
if ins is not None:
# Find out which registers are modified
_, regs_written = registers_accessed_by_instruction(ins)

v = 0
for r in regs_written:
if r in rbw.TRACE_DISCARD:
continue
v += hw(rbw.RegistersBackup[r] ^ uci.reg_read(r))
rbw.RegistersBackup[r] = uci.reg_read(r)

rbw.sca_address_trace.append(f"{ins.address:8X} {ins.mnemonic:<6} {ins.op_str}")
rbw.sca_values_trace.append(v)

# Information is stored to be used at the next instruction,
# once the unicorn engine actually performed the current instruction.
rbw.reg_leak = rbw.disassemble_single_detailed(address, size)


def wb_regs_trace(uci: uc.Uc, address: int, size: int, rbw):
"""Trace written registers value

For each instruction, output one point per written register value.

ins = rbw.disassemble_single_detailed(address, size)
_regs_read, regs_written = ins.regs_access()
if len(regs_written) > 0:
rbw.reg_leak = (ins, regs_written)
You may filter out uninteresting register accesses using TRACE_DISCARD
attribute.
"""
ins = rbw.reg_leak
if ins is not None:
# Find out which registers are modified
_, regs_written = registers_accessed_by_instruction(ins)

for r in regs_written:
if r in rbw.TRACE_DISCARD:
continue

rbw.sca_address_trace.append(ins)
rbw.sca_values_trace.append(uci.reg_read(r))

# Information is stored to be used at the next instruction,
# once the unicorn engine actually performed the current instruction.
rbw.reg_leak = rbw.disassemble_single_detailed(address, size)