Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for smmc fuzzing. #20

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ afl_outputs/
__pycache__/
.idea/
.vscode/
.DS_Store
31 changes: 18 additions & 13 deletions core/FuzzingManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@
import os
import core.fault

def start_afl(_ql: Qiling, user_data):
def start_afl(_ql: Qiling, fuzzing_manager):
"""
Callback from inside
"""

(varname, infile) = user_data

def place_input_callback_nvram(uc, _input, _, data):
def place_input_callback(uc, _input, _, data):
"""
Injects the mutated variable to the emulated NVRAM environment.
"""
_ql.env[varname] = _input
fuzzing_manager.place_input_callback(_input)

def validate_crash(uc, err, _input, persistent_round, user_data):
"""
Expand All @@ -37,14 +35,10 @@ def validate_crash(uc, err, _input, persistent_round, user_data):
crash = (_ql.internal_exception is not None) or (err.errno != UC_ERR_OK)
return crash

# Choose the function to inject the mutated input to the emulation environment,
# based on the fuzzing mode.
place_input_callback = place_input_callback_nvram

# We start our AFL forkserver or run once if AFL is not available.
# This will only return after the fuzzing stopped.
try:
if not _ql.uc.afl_fuzz(input_file=infile,
if not _ql.uc.afl_fuzz(input_file=fuzzing_manager.infile,
place_input_callback=place_input_callback,
exits=[_ql.os.exit_point],
always_validate=True,
Expand Down Expand Up @@ -81,8 +75,19 @@ def fuzz(self, end=None, timeout=0, **kwargs):
pe = pefile.PE(target, fast_load=True)
image_base = self.ql.loader.images[-1].base
entry_point = image_base + pe.OPTIONAL_HEADER.AddressOfEntryPoint

self.infile = kwargs['infile']
# We want AFL's forkserver to spawn new copies starting from the main module's entrypoint.
self.ql.hook_address(callback=start_afl, address=entry_point, user_data=(kwargs['varname'], kwargs['infile']))
self.ql.hook_address(callback=start_afl, address=entry_point, user_data=self)
super().run(end, timeout)

class NvRamFuzzingManager(FuzzingManager):
def fuzz(self, end=None, timeout=0, **kwargs):
super().fuzz(self, end, timeout, **kwargs)
self.var_name = kwargs['varname']

def place_input_callback(self, input):
self.ql.env[self.varname] = input

super().run(end, timeout)
class SmmcFuzzingManager(FuzzingManager):
def place_input_callback(self, input):
self.ql.os.smm.comm_buffer_fuzz_data = input
13 changes: 13 additions & 0 deletions core/callbacks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import capstone
from unicorn import *
from unicorn.x86_const import *

def after_module_execution_callback(ql, number_of_modules_left):
ret = False
Expand All @@ -7,9 +9,20 @@ def after_module_execution_callback(ql, number_of_modules_left):
ret = True
return ret

# We need to hook the `in` instruction to support ACPI Power Management Timer.
PM_TMR_PORT = 0x1808
def hook_in(uc, port, size, ql):
if port == PM_TMR_PORT:
ql.count += 10000
return ql.count
return 0

def init_callbacks(ql):
ql.os.after_module_execution_callbacks = []
ql.os.notify_after_module_execution = after_module_execution_callback

ql.count = 0
ql.uc.hook_add(UC_HOOK_INSN, hook_in, ql, 1, 0, UC_X86_INS_IN)

def end_of_execution_callback(ql):
after_module_execution_callback(ql, 0)
Expand Down
15 changes: 12 additions & 3 deletions efi_fuzz.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import sanitizers
import taint.tracker
from core.EmulationManager import EmulationManager
from core.FuzzingManager import FuzzingManager
from core.FuzzingManager import *

# for argparse
auto_int = functools.partial(int, base=0)
Expand Down Expand Up @@ -79,8 +79,13 @@ def run(args):
emu.run(args.end, args.timeout)

def fuzz(args):
emu = create_emulator(FuzzingManager, args)
emu.fuzz(args.end, args.timeout, varname=args.varname, infile=args.infile)
if args.mode == 'nvram':
emu = create_emulator(NvRamFuzzingManager, args)
elif args.mode == 'smmc':
emu = create_emulator(SmmcFuzzingManager, args)
else:
return
emu.fuzz(**vars(args))

def main(args):
if args.command == 'run':
Expand Down Expand Up @@ -115,5 +120,9 @@ def main(args):
nvram_subparser = subparsers.add_parser("nvram", help="Fuzz contents of NVRAM variables")
nvram_subparser.add_argument("varname", help="Name of the NVRAM variable to mutate")
nvram_subparser.add_argument("infile", help="Mutated input buffer. Set to @@ when running under afl-fuzz")

# SMMC sub-command
smmc_subparser = subparsers.add_parser("smmc", help="Fuzz CommunicationBuffer passed to SMIs")
smmc_subparser.add_argument("infile", help="Mutated input buffer. Set to @@ when running under afl-fuzz")

main(parser.parse_args())
8 changes: 7 additions & 1 deletion smm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def overlaps(self, address):
class SmmState(object):

PAGE_SIZE = 0x1000
UINTN_SIZE = 8

def __init__(self, ql):
self.swsmi_handlers = {}
Expand All @@ -51,6 +52,11 @@ def __init__(self, ql):
# A pointer to a collection of data in memory that will
# be conveyed from a non-MM environment into an MM environment.
self.comm_buffer = self.heap_alloc(self.PAGE_SIZE)
self.comm_buffer_size_ptr = self.comm_buffer - self.UINTN_SIZE
self.comm_buffer_size = self.PAGE_SIZE - self.UINTN_SIZE


self.comm_buffer_fuzz_data = 0

def heap_alloc(self, size):
# Prefer allocating from TSEG.
Expand Down Expand Up @@ -98,7 +104,7 @@ class SMM_READY_TO_LOCK_PROTOCOL(STRUCT):
"struct" : SMM_READY_TO_LOCK_PROTOCOL,
"fields" : (('Header', None),)
}
return ql.loader.smm_context.install_protocol(descriptor, 1)
return ql.loader.smm_context.install_protocol(descriptor, 1) or trigger_swsmi(ql)
return trigger_swsmi(ql)
return False

Expand Down
23 changes: 12 additions & 11 deletions smm/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .guids import *
from qiling.os.uefi.const import *
from qiling.os.uefi.utils import ptr_write64, ptr_read64
from ..swsmi import trigger_swsmi
from ..swsmi import trigger_swsmi, register_sw_smi, unregister_sw_smi
import ctypes
import random

Expand All @@ -22,18 +22,19 @@ def install(ql, in_smm=False):
install_EFI_SMM_ACCESS_PROTOCOL(ql)

def hook_mm_interrupt_register(ql, address, params):
smi_num = 0
params['RegisterContext'] = 0
params['DispatchFunction'] = params["Handler"]
DispatchHandle = random.getrandbits(64)
ql.os.smm.swsmi_handlers.append((DispatchHandle, smi_num, params))
ptr_write64(ql, params["DispatchHandle"], DispatchHandle)
return EFI_SUCCESS
dh = register_sw_smi(ql, params["Handler"], 0, 0)
if dh:
ptr_write64(ql, params["DispatchHandle"], dh)
return EFI_SUCCESS
else:
return EFI_OUT_OF_RESOURCES

def hook_efi_mm_interrupt_unregister(ql, address, params):
dh = ptr_read64(ql, params["DispatchHandle"])
ql.os.smm.swsmi_handlers[:] = [tup for tup in ql.os.smm.swsmi_handlers if tup[0] != dh]
return EFI_SUCCESS
return unregister_sw_smi(ql, params["DispatchHandle"])

ql.set_api("mm_interrupt_register", hook_mm_interrupt_register)
ql.set_api("efi_mm_interrupt_unregister", hook_efi_mm_interrupt_unregister)

# For now we don't have anyting diffrent to do for SMMCs, so let's use the same hook functions.
ql.set_api("SmiHandlerRegister", hook_mm_interrupt_register)
ql.set_api("SmiHandlerUnRegister", hook_efi_mm_interrupt_unregister)
28 changes: 7 additions & 21 deletions smm/protocols/smm_sw_dispatch2_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,19 @@ def hook_SMM_SW_DISPATCH2_Register(ql, address, params):
# Currently we don't support it and always pass a zero value for it.
smm_sw_context.DataPort = 0

# Allocate a unique handle for this SMI.
dh = ql.os.heap.alloc(1)
ptr_write64(ql, params["DispatchHandle"], dh)

smi_params = {
"DispatchFunction": params["DispatchFunction"],
"RegisterContext": smm_sw_register_context,
"CommunicationBuffer": smm_sw_context,
}

# Let's save the dispatch params, so they can be triggered if needed.
ql.os.smm.swsmi_handlers[dh] = smi_params
return EFI_SUCCESS
dh = register_sw_smi(ql, params["DispatchFunction"], smm_sw_context)
if dh:
ptr_write64(ql, params["DispatchHandle"], dh)
return EFI_SUCCESS
else:
return EFI_OUT_OF_RESOURCES

@dxeapi(params={
"This": POINTER, #POINTER_T(struct__EFI_SMM_SW_DISPATCH2_PROTOCOL)
"DispatchHandle": POINTER, #POINTER_T(None)
})
def hook_SMM_SW_DISPATCH2_UnRegister(ql, address, params):
dh = params['DispatchHandle']
try:
del ql.os.smm.swsmi_handlers[dh]
ql.os.heap.free(dh)
except:
return EFI_INVALID_PARAMETER
else:
return EFI_SUCCESS
return unregister_sw_smi(ql, params["DispatchHandle"])

def install_EFI_SMM_SW_DISPATCH2_PROTOCOL(ql):
descriptor = {
Expand Down
32 changes: 9 additions & 23 deletions smm/protocols/smm_sw_dispatch_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from qiling.os.uefi.ProcessorBind import *
from qiling.os.uefi.UefiBaseType import *
from qiling.os.uefi.utils import ptr_write64, ptr_read64
import random
from ..swsmi import register_sw_smi, unregister_sw_smi

class EFI_SMM_SW_DISPATCH_CONTEXT(STRUCT):
EFI_SMM_SW_DISPATCH_CONTEXT = STRUCT
Expand Down Expand Up @@ -47,34 +47,20 @@ def hook_SMM_SW_DISPATCH_Register(ql, address, params):
# SMI# is too big.
return EFI_INVALID_PARAMETER

# Allocate a unique handle for this SMI.
dh = ql.os.heap.alloc(1)
ptr_write64(ql, params["DispatchHandle"], dh)

smi_params = {
"DispatchFunction": params["DispatchFunction"],
"RegisterContext": smm_sw_dispatch_context,
"CommunicationBuffer": None,
}

# Let's save the dispatch params, so they can be triggered if needed.
ql.os.smm.swsmi_handlers[dh] = smi_params
return EFI_SUCCESS
dh = register_sw_smi(ql, params["DispatchFunction"], smm_sw_dispatch_context)
if dh:
ptr_write64(ql, params["DispatchHandle"], dh)
return EFI_SUCCESS
else:
return EFI_OUT_OF_RESOURCES

@dxeapi(params={
"This": POINTER, #POINTER_T(struct__EFI_SMM_SW_DISPATCH2_PROTOCOL)
"DispatchHandle": POINTER, #POINTER_T(None)
})
def hook_SMM_SW_DISPATCH_UnRegister(ql, address, params):
dh = params['DispatchHandle']
try:
del ql.os.smm.swsmi_handlers[dh]
ql.os.heap.free(dh)
except:
return EFI_INVALID_PARAMETER
else:
return EFI_SUCCESS

return unregister_sw_smi(ql, params['DispatchHandle'])

def install_EFI_SMM_SW_DISPATCH_PROTOCOL(ql):
descriptor = {
'guid' : EFI_SMM_SW_DISPATCH_PROTOCOL_GUID,
Expand Down
46 changes: 40 additions & 6 deletions smm/swsmi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@
import ctypes
from qiling.os.uefi.utils import ptr_write64

def register_sw_smi(ql, DispatchFunction, RegisterContext, CommunicationBuffer):
# Allocate a unique handle for this SMI.
dh = ql.os.heap.alloc(1)

smi_params = {
"DispatchFunction": DispatchFunction,
"RegisterContext": RegisterContext,
"CommunicationBuffer": CommunicationBuffer,
}

# Let's save the dispatch params, so they can be triggered if needed.
ql.os.smm.swsmi_handlers[dh] = smi_params
return dh

def unregister_sw_smi(ql, DispatchHandle):
try:
del ql.os.smm.swsmi_handlers[DispatchHandle]
ql.os.heap.free(DispatchHandle)
except:
return EFI_INVALID_PARAMETER
else:
return EFI_SUCCESS

def trigger_next_smi_handler(ql):
(dispatch_handle, smi_params) = ql.os.smm.swsmi_handlers.popitem()
Expand All @@ -16,19 +38,31 @@ def trigger_next_smi_handler(ql):

# IN CONST VOID *Context OPTIONAL
register_context = smi_params['RegisterContext']
register_context.saveTo(ql, ql.os.smm.context_buffer)
if register_context:
register_context.saveTo(ql, ql.os.smm.context_buffer)
ql.reg.rdx = ql.os.smm.context_buffer

comm_buffer = smi_params['CommunicationBuffer']
comm_buffer = smi_params['CommunicationBuffer'] or ql.os.smm.comm_buffer_fuzz_data
if comm_buffer:
# IN OUT VOID *CommBuffer OPTIONAL
comm_buffer.saveTo(ql, ql.os.smm.comm_buffer)
if type(comm_buffer) == bytes:
if len(comm_buffer) > ql.os.smm.comm_buffer_size:
comm_buffer = comm_buffer[:ql.os.smm.comm_buffer_size]
ql.mem.write(ql.os.smm.comm_buffer, comm_buffer)
comm_buffer_size = len(comm_buffer)
if hasattr(ql, 'tainters') and 'smm' in ql.tainters:
ql.tainters['smm'].set_taint_range(ql.os.smm.comm_buffer, ql.os.smm.comm_buffer + comm_buffer_size, True)
else:
if comm_buffer.sizeof() > ql.os.smm.comm_buffer_size:
ql.log.error("Structure too big, can't write command buffer")
return False
comm_buffer.saveTo(ql, ql.os.smm.comm_buffer)
comm_buffer_size = comm_buffer.sizeof()
ql.reg.r8 = ql.os.smm.comm_buffer

# IN OUT UINTN *CommBufferSize OPTIONAL
size_ptr = ql.os.smm.comm_buffer + comm_buffer.sizeof()
ptr_write64(ql, size_ptr, comm_buffer.sizeof())
ql.reg.r9 = size_ptr
ptr_write64(ql, ql.os.smm.comm_buffer_size_ptr, comm_buffer_size)
ql.reg.r9 = ql.os.smm.comm_buffer_size_ptr

ql.reg.rip = smi_params["DispatchFunction"]
ql.stack_push(ql.loader.end_of_execution_ptr)
Expand Down