diff --git a/examples/hmac_secret.py b/examples/hmac_secret.py index 5937bf2..28bcac8 100644 --- a/examples/hmac_secret.py +++ b/examples/hmac_secret.py @@ -39,7 +39,6 @@ from fido2.client import Fido2Client, WindowsClient from fido2.ctap2.extensions import HmacSecretExtension from exampleutils import CliInteraction -from functools import partial import ctypes import sys import os @@ -74,7 +73,7 @@ def enumerate_devices(): user_interaction=CliInteraction(), # By default only the PRF extension is allowed, we need to explicitly # configure the client to allow hmac-secret - extension_types=[partial(HmacSecretExtension, allow_hmac_secret=True)], + extensions=[HmacSecretExtension(allow_hmac_secret=True)], ) if "hmac-secret" in client.info.extensions: break diff --git a/fido2/client.py b/fido2/client.py index 36ad6cd..d6ef301 100644 --- a/fido2/client.py +++ b/fido2/client.py @@ -32,7 +32,11 @@ from .ctap1 import Ctap1, APDU, ApduError from .ctap2 import Ctap2, AssertionResponse, Info from .ctap2.pin import ClientPin, PinProtocol -from .ctap2.extensions import Ctap2Extension, ClientExtensionOutputs +from .ctap2.extensions import ( + Ctap2Extension, + ClientExtensionOutputs, + ExtensionProcessor, +) from .webauthn import ( Aaguid, AttestationObject, @@ -421,7 +425,7 @@ def __init__( self, client_data: CollectedClientData, assertions: Sequence[AssertionResponse], - extensions: Sequence[Ctap2Extension], + extensions: Sequence[ExtensionProcessor], pin_token: Optional[str], pin_protocol: Optional[PinProtocol], ): @@ -435,10 +439,10 @@ def _get_extension_results(self, assertion): extension_outputs = {} try: for ext in self._extensions: - output = ext.process_get_output( + output = ext.prepare_outputs( assertion, self._pin_token, self._pin_protocol ) - if output is not None: + if output: extension_outputs.update(output) except ValueError as e: raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e) @@ -456,13 +460,21 @@ def __init__( self, device: CtapDevice, user_interaction: UserInteraction, - extensions: Sequence[Type[Ctap2Extension]], + extension_types: Sequence[Type[Ctap2Extension]], + extensions: Sequence[Ctap2Extension], ): self.ctap2 = Ctap2(device) self.info = self.ctap2.info - self.extensions = extensions + self._extension_types = extension_types + self._extensions = extensions self.user_interaction = user_interaction + @property + def extensions(self) -> Sequence[Ctap2Extension]: + if self._extensions: + return self._extensions + return [ext(self.ctap2) for ext in self._extension_types] + def _filter_creds( self, rp_id, cred_list, pin_protocol, pin_token, event, on_keepalive ): @@ -629,7 +641,6 @@ def do_make_credential( user = options.user key_params = options.pub_key_cred_params exclude_list = options.exclude_credentials - extensions = options.extensions selection = options.authenticator_selection or AuthenticatorSelectionCriteria() rk = selection.require_resident_key user_verification = selection.user_verification @@ -648,20 +659,19 @@ def do_make_credential( # Vendor facilitated enterprise_attestation = 1 - # Gather up permissions + # Gather UV permissions permissions = ClientPin.PERMISSION.MAKE_CREDENTIAL if exclude_list: # We need this for filtering the exclude_list permissions |= ClientPin.PERMISSION.GET_ASSERTION - # Get extension permissions - extension_instances = [cls(self.ctap2) for cls in self.extensions] + # Initialize extensions and add extension permissions used_extensions = [] - client_inputs = extensions or {} - for ext in extension_instances: - # TODO: Move options to the constructor instead - ext._create_options = options - permissions |= ext.get_create_permissions(client_inputs) + for e in self.extensions: + ext = e.make_credential(self.ctap2, options) + if ext: + used_extensions.append(ext) + permissions |= ext.permissions def _do_make(): # Handle auth @@ -682,14 +692,10 @@ def _do_make(): # Process extensions extension_inputs = {} try: - for ext in extension_instances: - auth_input = ext.process_create_input(client_inputs) - if auth_input is not None: - used_extensions.append(ext) - extension_inputs[ext.NAME] = auth_input - elif ext._used: - # TODO: Make this cleaner - used_extensions.append(ext) + for ext in used_extensions: + auth_input = ext.prepare_inputs(None) + if auth_input: + extension_inputs.update(auth_input) except ValueError as e: raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e) @@ -759,7 +765,7 @@ def _do_make(): extension_outputs = {} try: for ext in used_extensions: - output = ext.process_create_output(att_obj, pin_token, pin_protocol) + output = ext.prepare_outputs(att_obj, pin_token, pin_protocol) if output is not None: extension_outputs.update(output) except ValueError as e: @@ -779,21 +785,20 @@ def do_get_assertion( ): rp_id = options.rp_id allow_list = options.allow_credentials - extensions = options.extensions user_verification = options.user_verification on_keepalive = _user_keepalive(self.user_interaction) - # Gather up permissions + # Gather UV permissions permissions = ClientPin.PERMISSION.GET_ASSERTION - # Get extension permissions - extension_instances = [cls(self.ctap2) for cls in self.extensions] - client_inputs = extensions or {} - for ext in extension_instances: - # TODO: Move options to get_get_permissions and process_get_input - ext._get_options = options - permissions |= ext.get_get_permissions(client_inputs) + # Initialize extensions and add extension permissions + used_extensions = [] + for e in self.extensions: + ext = e.get_assertion(self.ctap2, options) + if ext: + used_extensions.append(ext) + permissions |= ext.permissions def _do_auth(): # Handle auth @@ -813,18 +818,11 @@ def _do_auth(): # Process extensions extension_inputs = {} - used_extensions = [] try: - for ext in extension_instances: - # TODO: Move to process_get_input() - ext._selected = selected_cred - auth_input = ext.process_get_input(client_inputs) - if auth_input is not None: - used_extensions.append(ext) - extension_inputs[ext.NAME] = auth_input - elif ext._used: - # TODO: Make this cleaner - used_extensions.append(ext) + for ext in used_extensions: + inputs = ext.prepare_inputs(selected_cred) + if inputs: + extension_inputs.update(inputs) except ValueError as e: raise ClientError.ERR.CONFIGURATION_UNSUPPORTED(e) @@ -905,8 +903,10 @@ def __init__( device: CtapDevice, origin: str, verify: Callable[[str, str], bool] = verify_rp_id, + # TODO 2.0: Replace extension_types with extensions extension_types: Sequence[Type[Ctap2Extension]] = _default_extensions(), user_interaction: UserInteraction = UserInteraction(), + extensions: Sequence[Ctap2Extension] = [], ): super().__init__(origin, verify) @@ -915,7 +915,7 @@ def __init__( try: self._backend: _ClientBackend = _Ctap2ClientBackend( - device, user_interaction, extension_types + device, user_interaction, extension_types, extensions ) except (ValueError, CtapError): self._backend = _Ctap1ClientBackend(device, user_interaction) diff --git a/fido2/ctap2/extensions.py b/fido2/ctap2/extensions.py index b2df7ca..2301e72 100644 --- a/fido2/ctap2/extensions.py +++ b/fido2/ctap2/extensions.py @@ -39,12 +39,20 @@ ) from enum import Enum, unique from dataclasses import dataclass -from typing import Dict, Tuple, Any, Optional, Mapping +from typing import Dict, Tuple, Any, Optional, Mapping, Callable import abc import warnings class ClientExtensionOutputs(Mapping[str, Any]): + """Holds extension output from a call to MakeCredential or GetAssertion. + + When accessed as a dict, all bytes values will be serialized to base64url encoding, + capable of being serialized to JSON. + + When accessed using attributes, richer types will instead be returned. + """ + def __init__(self, outputs: Mapping[str, Any]): self._members = {k: v for k, v in outputs.items() if v is not None} @@ -69,25 +77,135 @@ def __repr__(self): return repr(dict(self)) +@dataclass +class ExtensionProcessor: + """Processing state for a CTAP2 extension, for single use. + + The ExtensionProcessor holds state and logic for client processing of an extension, + for either a MakeCredential or GetAssertion call. + + :param permissions: PinUvAuthToken permissions required for the extension. + :param prepare_inputs: A function which produces authenticator extensions inputs. + :param prepare_outputs: A function which produces the client extension outputs. + """ + + permissions: ClientPin.PERMISSION = ClientPin.PERMISSION(0) + prepare_inputs: Callable[ + [Optional[PublicKeyCredentialDescriptor]], + Optional[Dict[str, Any]], + ] = lambda _: None + prepare_outputs: Callable[ + [AttestationResponse, Optional[str], Optional[PinProtocol]], + Optional[Dict[str, Any]], + ] = lambda r, t, p: None + + +# TODO 2.0: Make changes as described below class Ctap2Extension(abc.ABC): - """Base class for Ctap2 extensions. + """Base class for CTAP2 extensions. + + As of python-fido2 1.2 these instances can be used for multiple requests and + should be invoked via the make_credential and get_assertion methods. Subclasses are instantiated for a single request, if the Authenticator supports the extension. + + From python-fido2 2.0 the following methods will be fully removed: + get_create_permissions, process_create_input, process_create_output, + process_create_input_with_permissions, + get_get_permissions, process_get_input, process_get_output, + process_get_input_with_permissions. + + The following changes will also be made: + __init__() will no longer allow passing a ctap2 instance. + is_supported() will require a ctap2 instance to be passed. """ NAME: str = None # type: ignore - def __init__(self, ctap: Ctap2): - self.ctap = ctap - # TODO: Pass options and selected to the various methods that need them instead - self._create_options: PublicKeyCredentialCreationOptions - self._get_options: PublicKeyCredentialRequestOptions - self._selected: Optional[PublicKeyCredentialDescriptor] - self._used = False + def __init__(self, ctap: Optional[Ctap2] = None): + self._ctap = ctap + + @property + def ctap(self) -> Ctap2: + ctap = self._ctap + if not ctap: + raise ValueError( + "Accessed self.ctap when no ctap instance has been passed to __init__" + ) + return ctap - def is_supported(self) -> bool: + def is_supported(self, ctap: Optional[Ctap2] = None) -> bool: """Whether or not the extension is supported by the authenticator.""" - return self.NAME in self.ctap.info.extensions + if not ctap: + warnings.warn( + "Calling is_supported without a Ctap2 instance is deprecated.", + DeprecationWarning, + ) + ctap = ctap or self._ctap + if not ctap: + raise ValueError("No Ctap2 instance available") + return self.NAME in ctap.info.extensions + + def make_credential( + self, ctap: Ctap2, options: PublicKeyCredentialCreationOptions + ) -> Optional[ExtensionProcessor]: + """Start client extension processing for registration.""" + # This implementation is for LEGACY PURPOSES! + # Subclasses should override this method instead of: + # process_create_input, process_create_output, and get_create_permissions + warnings.warn( + "This extension does not override make_credential, which is deprecated.", + DeprecationWarning, + ) + inputs = dict(options.extensions or {}) + + def prepare_inputs(_): + processed = self.process_create_input(inputs) + self._has_input = processed is not None + return {self.NAME: processed} if self._has_input else None + + def prepare_outputs(response, pin_token, pin_protocol): + if self._has_input: + processed = self.process_create_output( + response, pin_token, pin_protocol + ) + return processed + + self._ctap = ctap + return ExtensionProcessor( + permissions=self.get_create_permissions(inputs), + prepare_inputs=prepare_inputs, + prepare_outputs=prepare_outputs, + ) + + def get_assertion( + self, ctap: Ctap2, options: PublicKeyCredentialRequestOptions + ) -> Optional[ExtensionProcessor]: + """Start client extension processing for authentication.""" + # This implementation is for LEGACY PURPOSES! + # Subclasses should override this method instead of: + # process_get_input, process_get_output, and get_get_permissions + warnings.warn( + "This extension does not override get_assertion, which is deprecated.", + DeprecationWarning, + ) + inputs = dict(options.extensions or {}) + + def prepare_inputs(selected): + processed = self.process_get_input(inputs) + self._has_input = processed is not None + return {self.NAME: processed} if self._has_input else None + + def prepare_outputs(response, pin_token, pin_protocol): + if self._has_input: + return self.process_get_output(response, pin_token, pin_protocol) + + self._ctap = ctap + return ExtensionProcessor( + permissions=self.get_get_permissions(inputs), + prepare_inputs=prepare_inputs, + prepare_outputs=prepare_outputs, + ) def get_create_permissions(self, inputs: Dict[str, Any]) -> ClientPin.PERMISSION: return ClientPin.PERMISSION(0) @@ -102,7 +220,7 @@ def process_create_input_with_permissions( self, inputs: Dict[str, Any] ) -> Tuple[Any, ClientPin.PERMISSION]: warnings.warn( - "This method is deprecated, use get_create_permissions.", DeprecationWarning + "This method is deprecated, use make_credential().", DeprecationWarning ) return self.process_create_input(inputs), self.get_create_permissions(inputs) @@ -129,7 +247,7 @@ def process_get_input_with_permissions( self, inputs: Dict[str, Any] ) -> Tuple[Any, ClientPin.PERMISSION]: warnings.warn( - "This method is deprecated, use get_get_permissions.", DeprecationWarning + "This method is deprecated, use get_assertion().", DeprecationWarning ) return self.process_get_input(inputs), self.get_get_permissions(inputs) @@ -185,63 +303,124 @@ class HmacSecretExtension(Ctap2Extension): NAME = "hmac-secret" SALT_LEN = 32 - def __init__(self, ctap, pin_protocol=None, allow_hmac_secret=False): + def __init__(self, ctap=None, pin_protocol=None, allow_hmac_secret=False): super().__init__(ctap) self.pin_protocol = pin_protocol self._allow_hmac_secret = allow_hmac_secret + def make_credential(self, ctap: Ctap2, options: PublicKeyCredentialCreationOptions): + inputs = options.extensions or {} + prf = inputs.get("prf") is not None + hmac = self._allow_hmac_secret and inputs.get("hmacCreateSecret") is True + if self.is_supported(ctap) and (prf or hmac): + + def outputs(response, *args): + enabled = response.auth_data.extensions.get(self.NAME, False) + if prf: + return {"prf": _PrfOutputs(enabled=enabled)} + else: + return {"hmacCreateSecret": enabled} + + return ExtensionProcessor( + prepare_inputs=lambda _: {self.NAME: True}, + prepare_outputs=outputs, + ) + + def get_assertion(self, ctap, options): + inputs = options.extensions or {} + prf = _PrfInputs.from_dict(inputs.get("prf")) + hmac = self._allow_hmac_secret and _HmacGetSecretInput.from_dict( + inputs.get("hmacGetSecret") + ) + + if self.is_supported(ctap) and (prf or hmac): + client_pin = ClientPin(ctap, self.pin_protocol) + key_agreement, shared_secret = client_pin._get_shared_secret() + pin_protocol = client_pin.protocol + + def prepare_inputs(selected): + if prf: + secrets = prf.eval + by_creds = prf.eval_by_credential + if by_creds: + # Make sure all keys are valid IDs from allow_credentials + allow_list = options.allow_credentials + if not allow_list: + raise ValueError( + "evalByCredentials requires allowCredentials" + ) + ids = {websafe_encode(c.id) for c in allow_list} + if not ids.issuperset(by_creds): + raise ValueError("evalByCredentials contains invalid key") + if selected: + key = websafe_encode(selected.id) + if key in by_creds: + secrets = by_creds[key] + + if not secrets: + return + + salts = ( + _prf_salt(secrets.first), + ( + _prf_salt(secrets.second) + if secrets.second is not None + else b"" + ), + ) + else: + salts = hmac.salt1, hmac.salt2 or b"" + + if not ( + len(salts[0]) == HmacSecretExtension.SALT_LEN + and (not salts[1] or len(salts[1]) == HmacSecretExtension.SALT_LEN) + ): + raise ValueError("Invalid salt length") + + salt_enc = pin_protocol.encrypt(shared_secret, salts[0] + salts[1]) + salt_auth = pin_protocol.authenticate(shared_secret, salt_enc) + + return { + self.NAME: { + 1: key_agreement, + 2: salt_enc, + 3: salt_auth, + 4: pin_protocol.VERSION, + } + } + + def prepare_outputs(response, *args): + value = response.auth_data.extensions.get(self.NAME) + + decrypted = pin_protocol.decrypt(shared_secret, value) + output1 = decrypted[: HmacSecretExtension.SALT_LEN] + output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None + + if prf: + return {"prf": _PrfOutputs(results=_PrfValues(output1, output2))} + else: + return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} + + return ExtensionProcessor( + prepare_inputs=prepare_inputs, prepare_outputs=prepare_outputs + ) + def process_create_input(self, inputs): - if self.is_supported(): - if inputs.get("hmacCreateSecret") is True and self._allow_hmac_secret: - self.prf = False - return True - elif inputs.get("prf") is not None: - self.prf = True - return True + if self.is_supported() and inputs.get("hmacCreateSecret") is True: + return True def process_create_output(self, attestation_response, *args): enabled = attestation_response.auth_data.extensions.get(self.NAME, False) - if self.prf: - return {"prf": _PrfOutputs(enabled=enabled)} - - else: - return {"hmacCreateSecret": enabled} + return {"hmacCreateSecret": enabled} def process_get_input(self, inputs): if not self.is_supported(): return - prf = _PrfInputs.from_dict(inputs.get("prf")) - if prf: - secrets = prf.eval - by_creds = prf.eval_by_credential - if by_creds: - # Make sure all keys are valid IDs from allow_credentials - allow_list = self._get_options.allow_credentials - if not allow_list: - raise ValueError("evalByCredentials requires allowCredentials") - ids = {websafe_encode(c.id) for c in allow_list} - if not ids.issuperset(by_creds): - raise ValueError("evalByCredentials contains invalid key") - if self._selected: - key = websafe_encode(self._selected.id) - if key in by_creds: - secrets = by_creds[key] - - if not secrets: - return - - salts = ( - _prf_salt(secrets.first), - _prf_salt(secrets.second) if secrets.second is not None else b"", - ) - self.prf = True - else: - get_secret = _HmacGetSecretInput.from_dict(inputs.get("hmacGetSecret")) - if not get_secret or not self._allow_hmac_secret: - return - salts = get_secret.salt1, get_secret.salt2 or b"" - self.prf = False + get_secret = _HmacGetSecretInput.from_dict(inputs.get("hmacGetSecret")) + if not get_secret: + return + salts = get_secret.salt1, get_secret.salt2 or b"" if not ( len(salts[0]) == HmacSecretExtension.SALT_LEN @@ -249,7 +428,9 @@ def process_get_input(self, inputs): ): raise ValueError("Invalid salt length") - client_pin = ClientPin(self.ctap, self.pin_protocol) + if not self._ctap: + raise ValueError("No Ctap2 instance available") + client_pin = ClientPin(self._ctap, self.pin_protocol) key_agreement, self.shared_secret = client_pin._get_shared_secret() if self.pin_protocol is None: self.pin_protocol = client_pin.protocol @@ -270,11 +451,7 @@ def process_get_output(self, assertion_response, *args): decrypted = self.pin_protocol.decrypt(self.shared_secret, value) output1 = decrypted[: HmacSecretExtension.SALT_LEN] output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None - - if self.prf: - return {"prf": _PrfOutputs(results=_PrfValues(output1, output2))} - else: - return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} + return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} @dataclass(eq=False, frozen=True) @@ -298,8 +475,9 @@ class LargeBlobExtension(Ctap2Extension): NAME = "largeBlobKey" - def is_supported(self): - return super().is_supported() and self.ctap.info.options.get("largeBlobs") + def is_supported(self, ctap=None): + ctap = ctap or self._ctap + return super().is_supported(ctap) and ctap.info.options.get("largeBlobs") def process_create_input(self, inputs): data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) @@ -317,6 +495,24 @@ def process_create_output(self, attestation_response, *args): ) } + def make_credential(self, ctap, options): + inputs = options.extensions or {} + data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) + if data: + if data.read or data.write: + raise ValueError("Invalid set of parameters") + if data.support == "required" and not self.is_supported(ctap): + raise ValueError("Authenticator does not support large blob storage") + + return ExtensionProcessor( + prepare_inputs=lambda _: {self.NAME: True}, + prepare_outputs=lambda response, _, __: { + "largeBlob": _LargeBlobOutputs( + supported=response.large_blob_key is not None + ) + }, + ) + def get_get_permissions(self, inputs): data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) if data and data.write: @@ -348,6 +544,36 @@ def process_get_output(self, assertion_response, token, pin_protocol): large_blobs.put_blob(blob_key, self._action) return {"largeBlob": _LargeBlobOutputs(written=True)} + def get_assertion(self, ctap, options): + inputs = options.extensions or {} + data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) + if data: + if data.support or (data.read and data.write): + raise ValueError("Invalid set of parameters") + if not self.is_supported(ctap): + raise ValueError("Authenticator does not support large blob storage") + + def outputs(response, pin_token, pin_protocol): + blob_key = response.large_blob_key + if data.read: + large_blobs = LargeBlobs(ctap) + blob = large_blobs.get_blob(blob_key) + return {"largeBlob": _LargeBlobOutputs(blob=blob)} + elif data.write: + large_blobs = LargeBlobs(ctap, pin_protocol, pin_token) + large_blobs.put_blob(blob_key, data.write) + return {"largeBlob": _LargeBlobOutputs(written=True)} + + return ExtensionProcessor( + permissions=( + ClientPin.PERMISSION.LARGE_BLOB_WRITE + if data.write + else ClientPin.PERMISSION(0) + ), + prepare_inputs=lambda _: {self.NAME: True}, + prepare_outputs=outputs, + ) + class CredBlobExtension(Ctap2Extension): """ @@ -401,13 +627,20 @@ class MinPinLengthExtension(Ctap2Extension): NAME = "minPinLength" - def is_supported(self): # NB: There is no key in the extensions field. - return "setMinPINLength" in self.ctap.info.options + def is_supported(self, ctap=None): + # NB: There is no key in the extensions field. + ctap = ctap or self._ctap + return "setMinPINLength" in ctap.info.options def process_create_input(self, inputs): if self.is_supported() and inputs.get(self.NAME) is True: return True + def make_credential(self, ctap, options): + inputs = options.extensions or {} + if self.is_supported(ctap) and inputs.get(self.NAME) is True: + return ExtensionProcessor(prepare_inputs=lambda _: {self.NAME: True}) + @dataclass(eq=False, frozen=True) class _CredPropsOutputs(_JsonDataObject): @@ -421,19 +654,16 @@ class CredPropsExtension(Ctap2Extension): NAME = "credProps" - def is_supported(self): # NB: There is no key in the extensions field. + def is_supported(self, ctap=None): # NB: There is no key in the extensions field. return True - def process_create_input(self, inputs): + def make_credential(self, ctap, options): + inputs = options.extensions or {} if inputs.get(self.NAME) is True: - # This extension doesn't provide any input to the authenticator, - # but still needs to add output. - self._used = True - - def process_create_output(self, attestation_response, *args): - selection = ( - self._create_options.authenticator_selection - or AuthenticatorSelectionCriteria() - ) - rk = selection.require_resident_key - return {"credProps": _CredPropsOutputs(rk=rk)} + selection = ( + options.authenticator_selection or AuthenticatorSelectionCriteria() + ) + rk = selection.require_resident_key + return ExtensionProcessor( + prepare_outputs=lambda *_: {self.NAME: _CredPropsOutputs(rk=rk)} + )