diff --git a/fido2/client.py b/fido2/client.py index d6ef301..35bd3a8 100644 --- a/fido2/client.py +++ b/fido2/client.py @@ -35,7 +35,7 @@ from .ctap2.extensions import ( Ctap2Extension, ClientExtensionOutputs, - ExtensionProcessor, + AuthenticationExtensionProcessor, ) from .webauthn import ( Aaguid, @@ -425,8 +425,8 @@ def __init__( self, client_data: CollectedClientData, assertions: Sequence[AssertionResponse], - extensions: Sequence[ExtensionProcessor], - pin_token: Optional[str], + extensions: Sequence[AuthenticationExtensionProcessor], + pin_token: Optional[bytes], pin_protocol: Optional[PinProtocol], ): super().__init__(client_data, assertions) @@ -693,7 +693,7 @@ def _do_make(): extension_inputs = {} try: for ext in used_extensions: - auth_input = ext.prepare_inputs(None) + auth_input = ext.prepare_inputs() if auth_input: extension_inputs.update(auth_input) except ValueError as e: diff --git a/fido2/ctap2/extensions.py b/fido2/ctap2/extensions.py index 2301e72..d7eb7fe 100644 --- a/fido2/ctap2/extensions.py +++ b/fido2/ctap2/extensions.py @@ -39,7 +39,7 @@ ) from enum import Enum, unique from dataclasses import dataclass -from typing import Dict, Tuple, Any, Optional, Mapping, Callable +from typing import Dict, Tuple, Any, Optional, Mapping import abc import warnings @@ -77,27 +77,58 @@ def __repr__(self): return repr(dict(self)) -@dataclass -class ExtensionProcessor: +class RegistrationExtensionProcessor: """Processing state for a CTAP2 extension, for single use. The ExtensionProcessor holds state and logic for client processing of an extension, - for either a MakeCredential or GetAssertion call. + for a registration (MakeCredential) call. - :param permissions: PinUvAuthToken permissions required for the extension. - :param prepare_inputs: A function which produces authenticator extensions inputs. - :param prepare_outputs: A function which produces the client extension outputs. + :param permissions: PinUvAuthToken permissions required by the extension. """ - permissions: ClientPin.PERMISSION = ClientPin.PERMISSION(0) - prepare_inputs: Callable[ - [Optional[PublicKeyCredentialDescriptor]], - Optional[Dict[str, Any]], - ] = lambda _: None - prepare_outputs: Callable[ - [AttestationResponse, Optional[str], Optional[PinProtocol]], - Optional[Dict[str, Any]], - ] = lambda r, t, p: None + def __init__(self, permissions: ClientPin.PERMISSION = ClientPin.PERMISSION(0)): + self.permissions = permissions + + def prepare_inputs(self) -> Optional[Dict[str, Any]]: + "Prepare authenticator extension inputs, to be passed to the Authenenticator." + return None + + def prepare_outputs( + self, + response: AttestationResponse, + pin_token: Optional[bytes], + pin_protocol: Optional[PinProtocol], + ) -> Optional[Dict[str, Any]]: + "Prepare client extension outputs, to be returned to the caller." + return None + + +class AuthenticationExtensionProcessor: + """Processing state for a CTAP2 extension, for single use. + + The ExtensionProcessor holds state and logic for client processing of an extension, + for an authentication (GetAssertion) call. + + :param permissions: PinUvAuthToken permissions required by the extension. + """ + + def __init__(self, permissions: ClientPin.PERMISSION = ClientPin.PERMISSION(0)): + self.permissions = permissions + + def prepare_inputs( + self, selected: Optional[PublicKeyCredentialDescriptor] + ) -> Optional[Dict[str, Any]]: + "Prepare authenticator extension inputs, to be passed to the Authenenticator." + return None + + def prepare_outputs( + self, + response: AssertionResponse, + pin_token: Optional[bytes], + pin_protocol: Optional[PinProtocol], + ) -> Optional[Dict[str, Any]]: + "Prepare client extension outputs, to be returned to the caller." + return None # TODO 2.0: Make changes as described below @@ -148,7 +179,7 @@ def is_supported(self, ctap: Optional[Ctap2] = None) -> bool: def make_credential( self, ctap: Ctap2, options: PublicKeyCredentialCreationOptions - ) -> Optional[ExtensionProcessor]: + ) -> Optional[RegistrationExtensionProcessor]: """Start client extension processing for registration.""" # This implementation is for LEGACY PURPOSES! # Subclasses should override this method instead of: @@ -158,29 +189,27 @@ def make_credential( DeprecationWarning, ) inputs = dict(options.extensions or {}) - - def prepare_inputs(_): - processed = self.process_create_input(inputs) - self._has_input = processed is not None - return {self.NAME: processed} if self._has_input else None - - def prepare_outputs(response, pin_token, pin_protocol): - if self._has_input: - processed = self.process_create_output( - response, pin_token, pin_protocol - ) - return processed - self._ctap = ctap - return ExtensionProcessor( - permissions=self.get_create_permissions(inputs), - prepare_inputs=prepare_inputs, - prepare_outputs=prepare_outputs, - ) + ext = self + + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self): + processed = ext.process_create_input(inputs) + self._has_input = processed is not None + return {ext.NAME: processed} if self._has_input else None + + def prepare_outputs(self, response, pin_token, pin_protocol): + if self._has_input: + processed = ext.process_create_output( + response, pin_token, pin_protocol + ) + return processed + + return Processor(self.get_create_permissions(inputs)) def get_assertion( self, ctap: Ctap2, options: PublicKeyCredentialRequestOptions - ) -> Optional[ExtensionProcessor]: + ) -> Optional[AuthenticationExtensionProcessor]: """Start client extension processing for authentication.""" # This implementation is for LEGACY PURPOSES! # Subclasses should override this method instead of: @@ -190,22 +219,22 @@ def get_assertion( DeprecationWarning, ) inputs = dict(options.extensions or {}) + self._ctap = ctap + ext = self - def prepare_inputs(selected): - processed = self.process_get_input(inputs) - self._has_input = processed is not None - return {self.NAME: processed} if self._has_input else None + class Processor(AuthenticationExtensionProcessor): + _has_input: bool - def prepare_outputs(response, pin_token, pin_protocol): - if self._has_input: - return self.process_get_output(response, pin_token, pin_protocol) + def prepare_inputs(self, selected): + processed = ext.process_get_input(inputs) + self._has_input = processed is not None + return {ext.NAME: processed} if self._has_input else None - self._ctap = ctap - return ExtensionProcessor( - permissions=self.get_get_permissions(inputs), - prepare_inputs=prepare_inputs, - prepare_outputs=prepare_outputs, - ) + def prepare_outputs(self, response, pin_token, pin_protocol): + if self._has_input: + return ext.process_get_output(response, pin_token, pin_protocol) + + return Processor(self.get_get_permissions(inputs)) def get_create_permissions(self, inputs: Dict[str, Any]) -> ClientPin.PERMISSION: return ClientPin.PERMISSION(0) @@ -228,7 +257,7 @@ def process_create_input_with_permissions( def process_create_output( self, attestation_response: AttestationResponse, - token: Optional[str], + token: Optional[bytes], pin_protocol: Optional[PinProtocol], ) -> Optional[Dict[str, Any]]: """Return client extension output given attestation_response, or None.""" @@ -254,7 +283,7 @@ def process_get_input_with_permissions( def process_get_output( self, assertion_response: AssertionResponse, - token: Optional[str], + token: Optional[bytes], pin_protocol: Optional[PinProtocol], ) -> Optional[Dict[str, Any]]: """Return client extension output given assertion_response, or None.""" @@ -314,102 +343,117 @@ def make_credential(self, ctap: Ctap2, options: PublicKeyCredentialCreationOptio hmac = self._allow_hmac_secret and inputs.get("hmacCreateSecret") is True if self.is_supported(ctap) and (prf or hmac): - def outputs(response, *args): - enabled = response.auth_data.extensions.get(self.NAME, False) - if prf: - return {"prf": _PrfOutputs(enabled=enabled)} - else: - return {"hmacCreateSecret": enabled} + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self): + return {HmacSecretExtension.NAME: True} - return ExtensionProcessor( - prepare_inputs=lambda _: {self.NAME: True}, - prepare_outputs=outputs, - ) + def prepare_outputs(self, response, pin_token, pin_protocol): + extensions = response.auth_data.extensions or {} + enabled = extensions.get(HmacSecretExtension.NAME, False) + if prf: + return {"prf": _PrfOutputs(enabled=enabled)} + else: + return {"hmacCreateSecret": enabled} + + return Processor() def get_assertion(self, ctap, options): inputs = options.extensions or {} prf = _PrfInputs.from_dict(inputs.get("prf")) - hmac = self._allow_hmac_secret and _HmacGetSecretInput.from_dict( - inputs.get("hmacGetSecret") + hmac = ( + _HmacGetSecretInput.from_dict(inputs.get("hmacGetSecret")) + if self._allow_hmac_secret + else None ) if self.is_supported(ctap) and (prf or hmac): client_pin = ClientPin(ctap, self.pin_protocol) key_agreement, shared_secret = client_pin._get_shared_secret() - pin_protocol = client_pin.protocol - - def prepare_inputs(selected): - if prf: - secrets = prf.eval - by_creds = prf.eval_by_credential - if by_creds: - # Make sure all keys are valid IDs from allow_credentials - allow_list = options.allow_credentials - if not allow_list: - raise ValueError( - "evalByCredentials requires allowCredentials" - ) - ids = {websafe_encode(c.id) for c in allow_list} - if not ids.issuperset(by_creds): - raise ValueError("evalByCredentials contains invalid key") - if selected: - key = websafe_encode(selected.id) - if key in by_creds: - secrets = by_creds[key] - - if not secrets: - return - - salts = ( - _prf_salt(secrets.first), - ( - _prf_salt(secrets.second) - if secrets.second is not None - else b"" - ), - ) - else: - salts = hmac.salt1, hmac.salt2 or b"" - - if not ( - len(salts[0]) == HmacSecretExtension.SALT_LEN - and (not salts[1] or len(salts[1]) == HmacSecretExtension.SALT_LEN) - ): - raise ValueError("Invalid salt length") - - salt_enc = pin_protocol.encrypt(shared_secret, salts[0] + salts[1]) - salt_auth = pin_protocol.authenticate(shared_secret, salt_enc) - - return { - self.NAME: { - 1: key_agreement, - 2: salt_enc, - 3: salt_auth, - 4: pin_protocol.VERSION, + + class Processing(AuthenticationExtensionProcessor): + def prepare_inputs(self, selected): + if prf: + secrets = prf.eval + by_creds = prf.eval_by_credential + if by_creds: + # Make sure all keys are valid IDs from allow_credentials + allow_list = options.allow_credentials + if not allow_list: + raise ValueError( + "evalByCredentials requires allowCredentials" + ) + ids = {websafe_encode(c.id) for c in allow_list} + if not ids.issuperset(by_creds): + raise ValueError( + "evalByCredentials contains invalid key" + ) + if selected: + key = websafe_encode(selected.id) + if key in by_creds: + secrets = by_creds[key] + + if not secrets: + return + + salts = ( + _prf_salt(secrets.first), + ( + _prf_salt(secrets.second) + if secrets.second is not None + else b"" + ), + ) + else: + assert hmac is not None # nosec + salts = hmac.salt1, hmac.salt2 or b"" + + if not ( + len(salts[0]) == HmacSecretExtension.SALT_LEN + and ( + not salts[1] + or len(salts[1]) == HmacSecretExtension.SALT_LEN + ) + ): + raise ValueError("Invalid salt length") + + pin_protocol = client_pin.protocol + salt_enc = pin_protocol.encrypt(shared_secret, salts[0] + salts[1]) + salt_auth = pin_protocol.authenticate(shared_secret, salt_enc) + + return { + HmacSecretExtension.NAME: { + 1: key_agreement, + 2: salt_enc, + 3: salt_auth, + 4: pin_protocol.VERSION, + } } - } - def prepare_outputs(response, *args): - value = response.auth_data.extensions.get(self.NAME) + def prepare_outputs(self, response, pin_token, pin_protocol): + extensions = response.auth_data.extensions or {} + value = extensions.get(HmacSecretExtension.NAME) - decrypted = pin_protocol.decrypt(shared_secret, value) - output1 = decrypted[: HmacSecretExtension.SALT_LEN] - output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None + if value: + decrypted = client_pin.protocol.decrypt(shared_secret, value) + output1 = decrypted[: HmacSecretExtension.SALT_LEN] + output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None + else: + return None - if prf: - return {"prf": _PrfOutputs(results=_PrfValues(output1, output2))} - else: - return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} + if prf: + return { + "prf": _PrfOutputs(results=_PrfValues(output1, output2)) + } + else: + return {"hmacGetSecret": _HmacGetSecretOutput(output1, output2)} - return ExtensionProcessor( - prepare_inputs=prepare_inputs, prepare_outputs=prepare_outputs - ) + return Processing() def process_create_input(self, inputs): if self.is_supported() and inputs.get("hmacCreateSecret") is True: return True - def process_create_output(self, attestation_response, *args): + def process_create_output(self, attestation_response, *args, **kwargs): enabled = attestation_response.auth_data.extensions.get(self.NAME, False) return {"hmacCreateSecret": enabled} @@ -445,9 +489,10 @@ def process_get_input(self, inputs): 4: self.pin_protocol.VERSION, } - def process_get_output(self, assertion_response, *args): + def process_get_output(self, assertion_response, *args, **kwargs): value = assertion_response.auth_data.extensions.get(self.NAME) + assert self.pin_protocol is not None # nosec decrypted = self.pin_protocol.decrypt(self.shared_secret, value) output1 = decrypted[: HmacSecretExtension.SALT_LEN] output2 = decrypted[HmacSecretExtension.SALT_LEN :] or None @@ -477,7 +522,8 @@ class LargeBlobExtension(Ctap2Extension): def is_supported(self, ctap=None): ctap = ctap or self._ctap - return super().is_supported(ctap) and ctap.info.options.get("largeBlobs") + assert ctap is not None # nosec + return super().is_supported(ctap) and ctap.info.options.get("largeBlobs", False) def process_create_input(self, inputs): data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) @@ -488,7 +534,7 @@ def process_create_input(self, inputs): raise ValueError("Authenticator does not support large blob storage") return True - def process_create_output(self, attestation_response, *args): + def process_create_output(self, attestation_response, *args, **kwargs): return { "largeBlob": _LargeBlobOutputs( supported=attestation_response.large_blob_key is not None @@ -504,14 +550,18 @@ def make_credential(self, ctap, options): if data.support == "required" and not self.is_supported(ctap): raise ValueError("Authenticator does not support large blob storage") - return ExtensionProcessor( - prepare_inputs=lambda _: {self.NAME: True}, - prepare_outputs=lambda response, _, __: { - "largeBlob": _LargeBlobOutputs( - supported=response.large_blob_key is not None - ) - }, - ) + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self): + return {LargeBlobExtension.NAME: True} + + def prepare_outputs(self, response, pin_token, pin_protocol): + return { + "largeBlob": _LargeBlobOutputs( + supported=response.large_blob_key is not None + ) + } + + return Processor() def get_get_permissions(self, inputs): data = _LargeBlobInputs.from_dict(inputs.get("largeBlob")) @@ -534,15 +584,15 @@ def process_get_input(self, inputs): def process_get_output(self, assertion_response, token, pin_protocol): blob_key = assertion_response.large_blob_key - if self._action is True: # Read - large_blobs = LargeBlobs(self.ctap) - blob = large_blobs.get_blob(blob_key) - return {"largeBlob": _LargeBlobOutputs(blob=blob)} - - elif self._action: # Write - large_blobs = LargeBlobs(self.ctap, pin_protocol, token) - large_blobs.put_blob(blob_key, self._action) - return {"largeBlob": _LargeBlobOutputs(written=True)} + if blob_key: + if self._action is True: # Read + large_blobs = LargeBlobs(self.ctap) + blob = large_blobs.get_blob(blob_key) + return {"largeBlob": _LargeBlobOutputs(blob=blob)} + elif self._action: # Write + large_blobs = LargeBlobs(self.ctap, pin_protocol, token) + large_blobs.put_blob(blob_key, self._action) + return {"largeBlob": _LargeBlobOutputs(written=True)} def get_assertion(self, ctap, options): inputs = options.extensions or {} @@ -553,25 +603,26 @@ def get_assertion(self, ctap, options): if not self.is_supported(ctap): raise ValueError("Authenticator does not support large blob storage") - def outputs(response, pin_token, pin_protocol): - blob_key = response.large_blob_key - if data.read: - large_blobs = LargeBlobs(ctap) - blob = large_blobs.get_blob(blob_key) - return {"largeBlob": _LargeBlobOutputs(blob=blob)} - elif data.write: - large_blobs = LargeBlobs(ctap, pin_protocol, pin_token) - large_blobs.put_blob(blob_key, data.write) - return {"largeBlob": _LargeBlobOutputs(written=True)} - - return ExtensionProcessor( - permissions=( - ClientPin.PERMISSION.LARGE_BLOB_WRITE - if data.write - else ClientPin.PERMISSION(0) - ), - prepare_inputs=lambda _: {self.NAME: True}, - prepare_outputs=outputs, + class Processor(AuthenticationExtensionProcessor): + def prepare_inputs(self, selected): + return {LargeBlobExtension.NAME: True} + + def prepare_outputs(self, response, pin_token, pin_protocol): + blob_key = response.large_blob_key + if blob_key: + if data.read: + large_blobs = LargeBlobs(ctap) + blob = large_blobs.get_blob(blob_key) + return {"largeBlob": _LargeBlobOutputs(blob=blob)} + elif data.write: + large_blobs = LargeBlobs(ctap, pin_protocol, pin_token) + large_blobs.put_blob(blob_key, data.write) + return {"largeBlob": _LargeBlobOutputs(written=True)} + + return Processor( + ClientPin.PERMISSION.LARGE_BLOB_WRITE + if data.write + else ClientPin.PERMISSION(0) ) @@ -630,6 +681,7 @@ class MinPinLengthExtension(Ctap2Extension): def is_supported(self, ctap=None): # NB: There is no key in the extensions field. ctap = ctap or self._ctap + assert ctap is not None # nosec return "setMinPINLength" in ctap.info.options def process_create_input(self, inputs): @@ -639,7 +691,12 @@ def process_create_input(self, inputs): def make_credential(self, ctap, options): inputs = options.extensions or {} if self.is_supported(ctap) and inputs.get(self.NAME) is True: - return ExtensionProcessor(prepare_inputs=lambda _: {self.NAME: True}) + + class Processor(RegistrationExtensionProcessor): + def prepare_inputs(self): + return {MinPinLengthExtension.NAME: True} + + return Processor() @dataclass(eq=False, frozen=True) @@ -660,10 +717,14 @@ def is_supported(self, ctap=None): # NB: There is no key in the extensions fiel def make_credential(self, ctap, options): inputs = options.extensions or {} if inputs.get(self.NAME) is True: - selection = ( - options.authenticator_selection or AuthenticatorSelectionCriteria() - ) - rk = selection.require_resident_key - return ExtensionProcessor( - prepare_outputs=lambda *_: {self.NAME: _CredPropsOutputs(rk=rk)} - ) + + class Processor(RegistrationExtensionProcessor): + def prepare_outputs(self, response, pin_token, pin_protocol): + selection = ( + options.authenticator_selection + or AuthenticatorSelectionCriteria() + ) + rk = selection.require_resident_key + return {CredPropsExtension.NAME: _CredPropsOutputs(rk=rk)} + + return Processor()