diff --git a/.gitignore b/.gitignore index 3e782f7..89f08dc 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ afl_outputs/ __pycache__/ .idea/ .vscode/ +.DS_Store \ No newline at end of file diff --git a/core/FuzzingManager.py b/core/FuzzingManager.py index f18d15f..cc7cb9c 100644 --- a/core/FuzzingManager.py +++ b/core/FuzzingManager.py @@ -11,18 +11,16 @@ import os import core.fault -def start_afl(_ql: Qiling, user_data): +def start_afl(_ql: Qiling, fuzzing_manager): """ Callback from inside """ - (varname, infile) = user_data - - def place_input_callback_nvram(uc, _input, _, data): + def place_input_callback(uc, _input, _, data): """ Injects the mutated variable to the emulated NVRAM environment. """ - _ql.env[varname] = _input + fuzzing_manager.place_input_callback(_input) def validate_crash(uc, err, _input, persistent_round, user_data): """ @@ -37,14 +35,10 @@ def validate_crash(uc, err, _input, persistent_round, user_data): crash = (_ql.internal_exception is not None) or (err.errno != UC_ERR_OK) return crash - # Choose the function to inject the mutated input to the emulation environment, - # based on the fuzzing mode. - place_input_callback = place_input_callback_nvram - # We start our AFL forkserver or run once if AFL is not available. # This will only return after the fuzzing stopped. try: - if not _ql.uc.afl_fuzz(input_file=infile, + if not _ql.uc.afl_fuzz(input_file=fuzzing_manager.infile, place_input_callback=place_input_callback, exits=[_ql.os.exit_point], always_validate=True, @@ -81,8 +75,19 @@ def fuzz(self, end=None, timeout=0, **kwargs): pe = pefile.PE(target, fast_load=True) image_base = self.ql.loader.images[-1].base entry_point = image_base + pe.OPTIONAL_HEADER.AddressOfEntryPoint - + self.infile = kwargs['infile'] # We want AFL's forkserver to spawn new copies starting from the main module's entrypoint. - self.ql.hook_address(callback=start_afl, address=entry_point, user_data=(kwargs['varname'], kwargs['infile'])) + self.ql.hook_address(callback=start_afl, address=entry_point, user_data=self) + super().run(end, timeout) + +class NvRamFuzzingManager(FuzzingManager): + def fuzz(self, end=None, timeout=0, **kwargs): + super().fuzz(self, end, timeout, **kwargs) + self.var_name = kwargs['varname'] + + def place_input_callback(self, input): + self.ql.env[self.varname] = input - super().run(end, timeout) \ No newline at end of file +class SmmcFuzzingManager(FuzzingManager): + def place_input_callback(self, input): + self.ql.os.smm.comm_buffer_fuzz_data = input \ No newline at end of file diff --git a/core/callbacks.py b/core/callbacks.py index aea473c..13e8cc9 100644 --- a/core/callbacks.py +++ b/core/callbacks.py @@ -1,4 +1,6 @@ import capstone +from unicorn import * +from unicorn.x86_const import * def after_module_execution_callback(ql, number_of_modules_left): ret = False @@ -7,9 +9,20 @@ def after_module_execution_callback(ql, number_of_modules_left): ret = True return ret +# We need to hook the `in` instruction to support ACPI Power Management Timer. +PM_TMR_PORT = 0x1808 +def hook_in(uc, port, size, ql): + if port == PM_TMR_PORT: + ql.count += 10000 + return ql.count + return 0 + def init_callbacks(ql): ql.os.after_module_execution_callbacks = [] ql.os.notify_after_module_execution = after_module_execution_callback + + ql.count = 0 + ql.uc.hook_add(UC_HOOK_INSN, hook_in, ql, 1, 0, UC_X86_INS_IN) def end_of_execution_callback(ql): after_module_execution_callback(ql, 0) diff --git a/efi_fuzz.py b/efi_fuzz.py index aac7f34..4d10a67 100644 --- a/efi_fuzz.py +++ b/efi_fuzz.py @@ -36,7 +36,7 @@ import sanitizers import taint.tracker from core.EmulationManager import EmulationManager -from core.FuzzingManager import FuzzingManager +from core.FuzzingManager import * # for argparse auto_int = functools.partial(int, base=0) @@ -79,8 +79,13 @@ def run(args): emu.run(args.end, args.timeout) def fuzz(args): - emu = create_emulator(FuzzingManager, args) - emu.fuzz(args.end, args.timeout, varname=args.varname, infile=args.infile) + if args.mode == 'nvram': + emu = create_emulator(NvRamFuzzingManager, args) + elif args.mode == 'smmc': + emu = create_emulator(SmmcFuzzingManager, args) + else: + return + emu.fuzz(**vars(args)) def main(args): if args.command == 'run': @@ -115,5 +120,9 @@ def main(args): nvram_subparser = subparsers.add_parser("nvram", help="Fuzz contents of NVRAM variables") nvram_subparser.add_argument("varname", help="Name of the NVRAM variable to mutate") nvram_subparser.add_argument("infile", help="Mutated input buffer. Set to @@ when running under afl-fuzz") + + # SMMC sub-command + smmc_subparser = subparsers.add_parser("smmc", help="Fuzz CommunicationBuffer passed to SMIs") + smmc_subparser.add_argument("infile", help="Mutated input buffer. Set to @@ when running under afl-fuzz") main(parser.parse_args()) diff --git a/smm/__init__.py b/smm/__init__.py index c5d7587..fd11c42 100644 --- a/smm/__init__.py +++ b/smm/__init__.py @@ -34,6 +34,7 @@ def overlaps(self, address): class SmmState(object): PAGE_SIZE = 0x1000 + UINTN_SIZE = 8 def __init__(self, ql): self.swsmi_handlers = {} @@ -51,6 +52,11 @@ def __init__(self, ql): # A pointer to a collection of data in memory that will # be conveyed from a non-MM environment into an MM environment. self.comm_buffer = self.heap_alloc(self.PAGE_SIZE) + self.comm_buffer_size_ptr = self.comm_buffer - self.UINTN_SIZE + self.comm_buffer_size = self.PAGE_SIZE - self.UINTN_SIZE + + + self.comm_buffer_fuzz_data = 0 def heap_alloc(self, size): # Prefer allocating from TSEG. @@ -98,7 +104,7 @@ class SMM_READY_TO_LOCK_PROTOCOL(STRUCT): "struct" : SMM_READY_TO_LOCK_PROTOCOL, "fields" : (('Header', None),) } - return ql.loader.smm_context.install_protocol(descriptor, 1) + return ql.loader.smm_context.install_protocol(descriptor, 1) or trigger_swsmi(ql) return trigger_swsmi(ql) return False diff --git a/smm/protocols/__init__.py b/smm/protocols/__init__.py index 10c33c2..4e38e66 100644 --- a/smm/protocols/__init__.py +++ b/smm/protocols/__init__.py @@ -8,7 +8,7 @@ from .guids import * from qiling.os.uefi.const import * from qiling.os.uefi.utils import ptr_write64, ptr_read64 -from ..swsmi import trigger_swsmi +from ..swsmi import trigger_swsmi, register_sw_smi, unregister_sw_smi import ctypes import random @@ -22,18 +22,19 @@ def install(ql, in_smm=False): install_EFI_SMM_ACCESS_PROTOCOL(ql) def hook_mm_interrupt_register(ql, address, params): - smi_num = 0 - params['RegisterContext'] = 0 - params['DispatchFunction'] = params["Handler"] - DispatchHandle = random.getrandbits(64) - ql.os.smm.swsmi_handlers.append((DispatchHandle, smi_num, params)) - ptr_write64(ql, params["DispatchHandle"], DispatchHandle) - return EFI_SUCCESS + dh = register_sw_smi(ql, params["Handler"], 0, 0) + if dh: + ptr_write64(ql, params["DispatchHandle"], dh) + return EFI_SUCCESS + else: + return EFI_OUT_OF_RESOURCES def hook_efi_mm_interrupt_unregister(ql, address, params): - dh = ptr_read64(ql, params["DispatchHandle"]) - ql.os.smm.swsmi_handlers[:] = [tup for tup in ql.os.smm.swsmi_handlers if tup[0] != dh] - return EFI_SUCCESS + return unregister_sw_smi(ql, params["DispatchHandle"]) ql.set_api("mm_interrupt_register", hook_mm_interrupt_register) ql.set_api("efi_mm_interrupt_unregister", hook_efi_mm_interrupt_unregister) + + # For now we don't have anyting diffrent to do for SMMCs, so let's use the same hook functions. + ql.set_api("SmiHandlerRegister", hook_mm_interrupt_register) + ql.set_api("SmiHandlerUnRegister", hook_efi_mm_interrupt_unregister) \ No newline at end of file diff --git a/smm/protocols/smm_sw_dispatch2_protocol.py b/smm/protocols/smm_sw_dispatch2_protocol.py index e58e7e4..14ec891 100644 --- a/smm/protocols/smm_sw_dispatch2_protocol.py +++ b/smm/protocols/smm_sw_dispatch2_protocol.py @@ -68,33 +68,19 @@ def hook_SMM_SW_DISPATCH2_Register(ql, address, params): # Currently we don't support it and always pass a zero value for it. smm_sw_context.DataPort = 0 - # Allocate a unique handle for this SMI. - dh = ql.os.heap.alloc(1) - ptr_write64(ql, params["DispatchHandle"], dh) - - smi_params = { - "DispatchFunction": params["DispatchFunction"], - "RegisterContext": smm_sw_register_context, - "CommunicationBuffer": smm_sw_context, - } - - # Let's save the dispatch params, so they can be triggered if needed. - ql.os.smm.swsmi_handlers[dh] = smi_params - return EFI_SUCCESS + dh = register_sw_smi(ql, params["DispatchFunction"], smm_sw_context) + if dh: + ptr_write64(ql, params["DispatchHandle"], dh) + return EFI_SUCCESS + else: + return EFI_OUT_OF_RESOURCES @dxeapi(params={ "This": POINTER, #POINTER_T(struct__EFI_SMM_SW_DISPATCH2_PROTOCOL) "DispatchHandle": POINTER, #POINTER_T(None) }) def hook_SMM_SW_DISPATCH2_UnRegister(ql, address, params): - dh = params['DispatchHandle'] - try: - del ql.os.smm.swsmi_handlers[dh] - ql.os.heap.free(dh) - except: - return EFI_INVALID_PARAMETER - else: - return EFI_SUCCESS + return unregister_sw_smi(ql, params["DispatchHandle"]) def install_EFI_SMM_SW_DISPATCH2_PROTOCOL(ql): descriptor = { diff --git a/smm/protocols/smm_sw_dispatch_protocol.py b/smm/protocols/smm_sw_dispatch_protocol.py index 2c26c54..340eafe 100644 --- a/smm/protocols/smm_sw_dispatch_protocol.py +++ b/smm/protocols/smm_sw_dispatch_protocol.py @@ -6,7 +6,7 @@ from qiling.os.uefi.ProcessorBind import * from qiling.os.uefi.UefiBaseType import * from qiling.os.uefi.utils import ptr_write64, ptr_read64 -import random +from ..swsmi import register_sw_smi, unregister_sw_smi class EFI_SMM_SW_DISPATCH_CONTEXT(STRUCT): EFI_SMM_SW_DISPATCH_CONTEXT = STRUCT @@ -47,34 +47,20 @@ def hook_SMM_SW_DISPATCH_Register(ql, address, params): # SMI# is too big. return EFI_INVALID_PARAMETER - # Allocate a unique handle for this SMI. - dh = ql.os.heap.alloc(1) - ptr_write64(ql, params["DispatchHandle"], dh) - - smi_params = { - "DispatchFunction": params["DispatchFunction"], - "RegisterContext": smm_sw_dispatch_context, - "CommunicationBuffer": None, - } - - # Let's save the dispatch params, so they can be triggered if needed. - ql.os.smm.swsmi_handlers[dh] = smi_params - return EFI_SUCCESS + dh = register_sw_smi(ql, params["DispatchFunction"], smm_sw_dispatch_context) + if dh: + ptr_write64(ql, params["DispatchHandle"], dh) + return EFI_SUCCESS + else: + return EFI_OUT_OF_RESOURCES @dxeapi(params={ "This": POINTER, #POINTER_T(struct__EFI_SMM_SW_DISPATCH2_PROTOCOL) "DispatchHandle": POINTER, #POINTER_T(None) }) def hook_SMM_SW_DISPATCH_UnRegister(ql, address, params): - dh = params['DispatchHandle'] - try: - del ql.os.smm.swsmi_handlers[dh] - ql.os.heap.free(dh) - except: - return EFI_INVALID_PARAMETER - else: - return EFI_SUCCESS - + return unregister_sw_smi(ql, params['DispatchHandle']) + def install_EFI_SMM_SW_DISPATCH_PROTOCOL(ql): descriptor = { 'guid' : EFI_SMM_SW_DISPATCH_PROTOCOL_GUID, diff --git a/smm/swsmi.py b/smm/swsmi.py index 5abe996..78a9dc7 100644 --- a/smm/swsmi.py +++ b/smm/swsmi.py @@ -6,6 +6,28 @@ import ctypes from qiling.os.uefi.utils import ptr_write64 +def register_sw_smi(ql, DispatchFunction, RegisterContext, CommunicationBuffer): + # Allocate a unique handle for this SMI. + dh = ql.os.heap.alloc(1) + + smi_params = { + "DispatchFunction": DispatchFunction, + "RegisterContext": RegisterContext, + "CommunicationBuffer": CommunicationBuffer, + } + + # Let's save the dispatch params, so they can be triggered if needed. + ql.os.smm.swsmi_handlers[dh] = smi_params + return dh + +def unregister_sw_smi(ql, DispatchHandle): + try: + del ql.os.smm.swsmi_handlers[DispatchHandle] + ql.os.heap.free(DispatchHandle) + except: + return EFI_INVALID_PARAMETER + else: + return EFI_SUCCESS def trigger_next_smi_handler(ql): (dispatch_handle, smi_params) = ql.os.smm.swsmi_handlers.popitem() @@ -16,19 +38,31 @@ def trigger_next_smi_handler(ql): # IN CONST VOID *Context OPTIONAL register_context = smi_params['RegisterContext'] - register_context.saveTo(ql, ql.os.smm.context_buffer) + if register_context: + register_context.saveTo(ql, ql.os.smm.context_buffer) ql.reg.rdx = ql.os.smm.context_buffer - comm_buffer = smi_params['CommunicationBuffer'] + comm_buffer = smi_params['CommunicationBuffer'] or ql.os.smm.comm_buffer_fuzz_data if comm_buffer: # IN OUT VOID *CommBuffer OPTIONAL - comm_buffer.saveTo(ql, ql.os.smm.comm_buffer) + if type(comm_buffer) == bytes: + if len(comm_buffer) > ql.os.smm.comm_buffer_size: + comm_buffer = comm_buffer[:ql.os.smm.comm_buffer_size] + ql.mem.write(ql.os.smm.comm_buffer, comm_buffer) + comm_buffer_size = len(comm_buffer) + if hasattr(ql, 'tainters') and 'smm' in ql.tainters: + ql.tainters['smm'].set_taint_range(ql.os.smm.comm_buffer, ql.os.smm.comm_buffer + comm_buffer_size, True) + else: + if comm_buffer.sizeof() > ql.os.smm.comm_buffer_size: + ql.log.error("Structure too big, can't write command buffer") + return False + comm_buffer.saveTo(ql, ql.os.smm.comm_buffer) + comm_buffer_size = comm_buffer.sizeof() ql.reg.r8 = ql.os.smm.comm_buffer # IN OUT UINTN *CommBufferSize OPTIONAL - size_ptr = ql.os.smm.comm_buffer + comm_buffer.sizeof() - ptr_write64(ql, size_ptr, comm_buffer.sizeof()) - ql.reg.r9 = size_ptr + ptr_write64(ql, ql.os.smm.comm_buffer_size_ptr, comm_buffer_size) + ql.reg.r9 = ql.os.smm.comm_buffer_size_ptr ql.reg.rip = smi_params["DispatchFunction"] ql.stack_push(ql.loader.end_of_execution_ptr)