Skip to content

Commit

Permalink
Refactor extension processing
Browse files Browse the repository at this point in the history
  • Loading branch information
dainnilsson committed Nov 17, 2024
1 parent 2deef9b commit c35f76f
Show file tree
Hide file tree
Showing 3 changed files with 356 additions and 127 deletions.
3 changes: 1 addition & 2 deletions examples/hmac_secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from fido2.client import Fido2Client, WindowsClient
from fido2.ctap2.extensions import HmacSecretExtension
from exampleutils import CliInteraction
from functools import partial
import ctypes
import sys
import os
Expand Down Expand Up @@ -74,7 +73,7 @@ def enumerate_devices():
user_interaction=CliInteraction(),
# By default only the PRF extension is allowed, we need to explicitly
# configure the client to allow hmac-secret
extension_types=[partial(HmacSecretExtension, allow_hmac_secret=True)],
extensions=[HmacSecretExtension(allow_hmac_secret=True)],
)
if "hmac-secret" in client.info.extensions:
break
Expand Down
90 changes: 45 additions & 45 deletions fido2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
from .ctap1 import Ctap1, APDU, ApduError
from .ctap2 import Ctap2, AssertionResponse, Info
from .ctap2.pin import ClientPin, PinProtocol
from .ctap2.extensions import Ctap2Extension, ClientExtensionOutputs
from .ctap2.extensions import (
Ctap2Extension,
ClientExtensionOutputs,
ExtensionProcessor,
)
from .webauthn import (
Aaguid,
AttestationObject,
Expand Down Expand Up @@ -421,7 +425,7 @@ def __init__(
self,
client_data: CollectedClientData,
assertions: Sequence[AssertionResponse],
extensions: Sequence[Ctap2Extension],
extensions: Sequence[ExtensionProcessor],
pin_token: Optional[str],
pin_protocol: Optional[PinProtocol],
):
Expand All @@ -435,10 +439,10 @@ def _get_extension_results(self, assertion):
extension_outputs = {}
try:
for ext in self._extensions:
output = ext.process_get_output(
output = ext.prepare_outputs(
assertion, self._pin_token, self._pin_protocol
)
if output is not None:
if output:
extension_outputs.update(output)
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)
Expand All @@ -456,13 +460,21 @@ def __init__(
self,
device: CtapDevice,
user_interaction: UserInteraction,
extensions: Sequence[Type[Ctap2Extension]],
extension_types: Sequence[Type[Ctap2Extension]],
extensions: Sequence[Ctap2Extension],
):
self.ctap2 = Ctap2(device)
self.info = self.ctap2.info
self.extensions = extensions
self._extension_types = extension_types
self._extensions = extensions
self.user_interaction = user_interaction

@property
def extensions(self) -> Sequence[Ctap2Extension]:
if self._extensions:
return self._extensions
return [ext(self.ctap2) for ext in self._extension_types]

def _filter_creds(
self, rp_id, cred_list, pin_protocol, pin_token, event, on_keepalive
):
Expand Down Expand Up @@ -629,7 +641,6 @@ def do_make_credential(
user = options.user
key_params = options.pub_key_cred_params
exclude_list = options.exclude_credentials
extensions = options.extensions
selection = options.authenticator_selection or AuthenticatorSelectionCriteria()
rk = selection.require_resident_key
user_verification = selection.user_verification
Expand All @@ -648,20 +659,19 @@ def do_make_credential(
# Vendor facilitated
enterprise_attestation = 1

# Gather up permissions
# Gather UV permissions
permissions = ClientPin.PERMISSION.MAKE_CREDENTIAL
if exclude_list:
# We need this for filtering the exclude_list
permissions |= ClientPin.PERMISSION.GET_ASSERTION

# Get extension permissions
extension_instances = [cls(self.ctap2) for cls in self.extensions]
# Initialize extensions and add extension permissions
used_extensions = []
client_inputs = extensions or {}
for ext in extension_instances:
# TODO: Move options to the constructor instead
ext._create_options = options
permissions |= ext.get_create_permissions(client_inputs)
for e in self.extensions:
ext = e.make_credential(self.ctap2, options)
if ext:
used_extensions.append(ext)
permissions |= ext.permissions

def _do_make():
# Handle auth
Expand All @@ -682,14 +692,10 @@ def _do_make():
# Process extensions
extension_inputs = {}
try:
for ext in extension_instances:
auth_input = ext.process_create_input(client_inputs)
if auth_input is not None:
used_extensions.append(ext)
extension_inputs[ext.NAME] = auth_input
elif ext._used:
# TODO: Make this cleaner
used_extensions.append(ext)
for ext in used_extensions:
auth_input = ext.prepare_inputs(None)
if auth_input:
extension_inputs.update(auth_input)
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)

Expand Down Expand Up @@ -759,7 +765,7 @@ def _do_make():
extension_outputs = {}
try:
for ext in used_extensions:
output = ext.process_create_output(att_obj, pin_token, pin_protocol)
output = ext.prepare_outputs(att_obj, pin_token, pin_protocol)
if output is not None:
extension_outputs.update(output)
except ValueError as e:
Expand All @@ -779,21 +785,20 @@ def do_get_assertion(
):
rp_id = options.rp_id
allow_list = options.allow_credentials
extensions = options.extensions
user_verification = options.user_verification

on_keepalive = _user_keepalive(self.user_interaction)

# Gather up permissions
# Gather UV permissions
permissions = ClientPin.PERMISSION.GET_ASSERTION

# Get extension permissions
extension_instances = [cls(self.ctap2) for cls in self.extensions]
client_inputs = extensions or {}
for ext in extension_instances:
# TODO: Move options to get_get_permissions and process_get_input
ext._get_options = options
permissions |= ext.get_get_permissions(client_inputs)
# Initialize extensions and add extension permissions
used_extensions = []
for e in self.extensions:
ext = e.get_assertion(self.ctap2, options)
if ext:
used_extensions.append(ext)
permissions |= ext.permissions

def _do_auth():
# Handle auth
Expand All @@ -813,18 +818,11 @@ def _do_auth():

# Process extensions
extension_inputs = {}
used_extensions = []
try:
for ext in extension_instances:
# TODO: Move to process_get_input()
ext._selected = selected_cred
auth_input = ext.process_get_input(client_inputs)
if auth_input is not None:
used_extensions.append(ext)
extension_inputs[ext.NAME] = auth_input
elif ext._used:
# TODO: Make this cleaner
used_extensions.append(ext)
for ext in used_extensions:
inputs = ext.prepare_inputs(selected_cred)
if inputs:
extension_inputs.update(inputs)
except ValueError as e:
raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e)

Expand Down Expand Up @@ -905,8 +903,10 @@ def __init__(
device: CtapDevice,
origin: str,
verify: Callable[[str, str], bool] = verify_rp_id,
# TODO 2.0: Replace extension_types with extensions
extension_types: Sequence[Type[Ctap2Extension]] = _default_extensions(),
user_interaction: UserInteraction = UserInteraction(),
extensions: Sequence[Ctap2Extension] = [],
):
super().__init__(origin, verify)

Expand All @@ -915,7 +915,7 @@ def __init__(

try:
self._backend: _ClientBackend = _Ctap2ClientBackend(
device, user_interaction, extension_types
device, user_interaction, extension_types, extensions
)
except (ValueError, CtapError):
self._backend = _Ctap1ClientBackend(device, user_interaction)
Expand Down
Loading

0 comments on commit c35f76f

Please sign in to comment.