diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index f850256..c691997 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -48,3 +48,13 @@ jobs: with: linters: mypy run: mypy + + black: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: psf/black@stable + with: + options: "--check --verbose" + src: "./signify" + version: "~= 23.7" diff --git a/pyproject.toml b/pyproject.toml index 88a6b8b..454fd7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,8 @@ +[tool.black] +line-length = 88 +target-version = ['py38', 'py39', 'py310', 'py311'] +preview = true + [tool.mypy] files = "signify" python_version = "3.8" diff --git a/signify/__init__.py b/signify/__init__.py index 7e663ce..a35c6db 100644 --- a/signify/__init__.py +++ b/signify/__init__.py @@ -8,7 +8,7 @@ def _print_type(t: Any) -> str: return "" elif isinstance(t, tuple): return ".".join(map(str, t)) - elif callable(t) and hasattr(t(), 'name'): + elif callable(t) and hasattr(t(), "name"): return cast(str, t().name) # used by hashlib elif hasattr(t, "__name__"): return cast(str, t.__name__) diff --git a/signify/asn1/__init__.py b/signify/asn1/__init__.py index 7b3ca83..22a144a 100755 --- a/signify/asn1/__init__.py +++ b/signify/asn1/__init__.py @@ -32,7 +32,9 @@ def guarded_ber_decode(data: Any, asn1_spec: _T | None = None) -> Asn1Type | _T: except Exception as e: raise ParseError("Error while parsing %s BER: %s" % (_print_type(asn1_spec), e)) if rest: - raise ParseError("Extra information after parsing %s BER" % _print_type(asn1_spec)) + raise ParseError( + "Extra information after parsing %s BER" % _print_type(asn1_spec) + ) return result @@ -56,5 +58,7 @@ def guarded_der_decode(data: Any, asn1_spec: _T | None = None) -> Asn1Type | _T: except Exception as e: raise ParseError("Error while parsing %s DER: %s" % (_print_type(asn1_spec), e)) if rest: - raise ParseError("Extra information after parsing %s DER" % _print_type(asn1_spec)) + raise ParseError( + "Extra information after parsing %s DER" % _print_type(asn1_spec) + ) return result diff --git a/signify/asn1/ctl.py b/signify/asn1/ctl.py index febee29..0b05b22 100644 --- a/signify/asn1/ctl.py +++ b/signify/asn1/ctl.py @@ -6,9 +6,7 @@ class CTLVersion(univ.Integer): # type: ignore[misc] - namedValues = namedval.NamedValues( - ('v1', 0) - ) + namedValues = namedval.NamedValues(("v1", 0)) class SubjectUsage(rfc5280.ExtKeyUsageSyntax): # type: ignore[misc] @@ -25,8 +23,8 @@ class SubjectIdentifier(univ.OctetString): # type: ignore[misc] class TrustedSubject(univ.Sequence): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.NamedType('subjectIdentifier', SubjectIdentifier()), - namedtype.OptionalNamedType('subjectAttributes', rfc2315.Attributes()), + namedtype.NamedType("subjectIdentifier", SubjectIdentifier()), + namedtype.OptionalNamedType("subjectAttributes", rfc2315.Attributes()), ) @@ -36,16 +34,20 @@ class TrustedSubjects(univ.SequenceOf): # type: ignore[misc] class CertificateTrustList(univ.Sequence): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.DefaultedNamedType('version', CTLVersion('v1')), - namedtype.NamedType('subjectUsage', SubjectUsage()), - namedtype.OptionalNamedType('listIdentifier', ListIdentifier()), - namedtype.OptionalNamedType('sequenceNumber', univ.Integer()), - namedtype.NamedType('ctlThisUpdate', rfc5280.Time()), - namedtype.OptionalNamedType('ctlNextUpdate', rfc5280.Time()), - namedtype.NamedType('subjectAlgorithm', rfc5280.AlgorithmIdentifier()), - namedtype.OptionalNamedType('trustedSubjects', TrustedSubjects()), - namedtype.OptionalNamedType('ctlExtensions', rfc5280.Extensions().subtype( - explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))) + namedtype.DefaultedNamedType("version", CTLVersion("v1")), + namedtype.NamedType("subjectUsage", SubjectUsage()), + namedtype.OptionalNamedType("listIdentifier", ListIdentifier()), + namedtype.OptionalNamedType("sequenceNumber", univ.Integer()), + namedtype.NamedType("ctlThisUpdate", rfc5280.Time()), + namedtype.OptionalNamedType("ctlNextUpdate", rfc5280.Time()), + namedtype.NamedType("subjectAlgorithm", rfc5280.AlgorithmIdentifier()), + namedtype.OptionalNamedType("trustedSubjects", TrustedSubjects()), + namedtype.OptionalNamedType( + "ctlExtensions", + rfc5280.Extensions().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0) + ), + ), ) diff --git a/signify/asn1/hashing.py b/signify/asn1/hashing.py index 9ea48fd..fb63efa 100644 --- a/signify/asn1/hashing.py +++ b/signify/asn1/hashing.py @@ -12,10 +12,18 @@ from signify.exceptions import ParseError # this list must be in the order of worst to best -ACCEPTED_DIGEST_ALGORITHMS = (hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha384, hashlib.sha512) - - -def _verify_empty_algorithm_parameters(algorithm: rfc5280.AlgorithmIdentifier, location: str) -> None: +ACCEPTED_DIGEST_ALGORITHMS = ( + hashlib.md5, + hashlib.sha1, + hashlib.sha256, + hashlib.sha384, + hashlib.sha512, +) + + +def _verify_empty_algorithm_parameters( + algorithm: rfc5280.AlgorithmIdentifier, location: str +) -> None: if "parameters" in algorithm and algorithm["parameters"].isValue: parameters = guarded_ber_decode(algorithm["parameters"]) if not isinstance(parameters, univ.Null): @@ -30,7 +38,8 @@ def _get_digest_algorithm( result = asn1.oids.get(algorithm["algorithm"], asn1.oids.OID_TO_HASH) if isinstance(result, tuple) or result not in acceptable: raise ParseError( - "%s must be one of %s, not %s" % (location, [x().name for x in acceptable], _print_type(result)) + "%s must be one of %s, not %s" + % (location, [x().name for x in acceptable], _print_type(result)) ) _verify_empty_algorithm_parameters(algorithm, location) @@ -41,7 +50,10 @@ def _get_digest_algorithm( def _get_encryption_algorithm(algorithm: univ.Sequence, location: str) -> str: result = asn1.oids.OID_TO_PUBKEY.get(algorithm["algorithm"]) if result is None: - raise ParseError("%s: %s is not acceptable as encryption algorithm" % (location, algorithm["algorithm"])) + raise ParseError( + "%s: %s is not acceptable as encryption algorithm" + % (location, algorithm["algorithm"]) + ) _verify_empty_algorithm_parameters(algorithm, location) return result diff --git a/signify/asn1/helpers.py b/signify/asn1/helpers.py index 1b4a529..e8f31f9 100644 --- a/signify/asn1/helpers.py +++ b/signify/asn1/helpers.py @@ -9,28 +9,28 @@ def time_to_python(time: GeneralizedTime | UTCTime) -> datetime.datetime | None: - if 'utcTime' in time: - return cast(datetime.datetime, time['utcTime'].asDateTime) - elif 'generalTime' in time: - return cast(datetime.datetime, time['generalTime'].asDateTime) + if "utcTime" in time: + return cast(datetime.datetime, time["utcTime"].asDateTime) + elif "generalTime" in time: + return cast(datetime.datetime, time["generalTime"].asDateTime) else: return None def accuracy_to_python(accuracy: rfc3161.Accuracy) -> datetime.timedelta: delta = datetime.timedelta() - if 'seconds' in accuracy and accuracy['seconds'].isValue: - delta += datetime.timedelta(seconds=int(accuracy['seconds'])) - if 'millis' in accuracy and accuracy['millis'].isValue: - delta += datetime.timedelta(milliseconds=int(accuracy['millis'])) - if 'micros' in accuracy and accuracy['micros'].isValue: - delta += datetime.timedelta(microseconds=int(accuracy['micros'])) + if "seconds" in accuracy and accuracy["seconds"].isValue: + delta += datetime.timedelta(seconds=int(accuracy["seconds"])) + if "millis" in accuracy and accuracy["millis"].isValue: + delta += datetime.timedelta(milliseconds=int(accuracy["millis"])) + if "micros" in accuracy and accuracy["micros"].isValue: + delta += datetime.timedelta(microseconds=int(accuracy["micros"])) return delta def bitstring_to_bytes(s: str) -> bytes: # based on https://stackoverflow.com/questions/32675679/convert-binary-string-to-bytearray-in-python-3 - return int(str(s), 2).to_bytes((len(s) + 7) // 8, byteorder='big') + return int(str(s), 2).to_bytes((len(s) + 7) // 8, byteorder="big") @contextlib.contextmanager @@ -45,25 +45,50 @@ def patch_rfc5652_signeddata() -> Iterator[rfc5652.SignedData]: original_component_type = CertificateChoices.componentType # first allow changing values on the object - del CertificateChoices._readOnly['componentType'] + del CertificateChoices._readOnly["componentType"] CertificateChoices.componentType = rfc5652.namedtype.NamedTypes( - rfc5652.namedtype.NamedType('certificate', rfc5652.rfc5280.Certificate()), - rfc5652.namedtype.NamedType('extendedCertificate', rfc5652.ExtendedCertificate().subtype( - implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 0))), - # The following line is the only one changed to reflect that tag 1 is also used for v2AttrCerts. - # Note that we do not update the actual name in the scheme to prevent naming com - rfc5652.namedtype.NamedType('v1AttrCert', rfc5652.AttributeCertificateV2().subtype( - implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 1))), - rfc5652.namedtype.NamedType('v2AttrCert', rfc5652.AttributeCertificateV2().subtype( - implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 2))), - rfc5652.namedtype.NamedType('other', rfc5652.OtherCertificateFormat().subtype( - implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 3))) + rfc5652.namedtype.NamedType("certificate", rfc5652.rfc5280.Certificate()), + rfc5652.namedtype.NamedType( + "extendedCertificate", + rfc5652.ExtendedCertificate().subtype( + implicitTag=rfc5652.tag.Tag( + rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 0 + ) + ), + ), + # The following line is the only one changed to reflect that tag 1 is + # also used for v2AttrCerts. + # Note that we do not update the actual name in the scheme to preventnaming com + rfc5652.namedtype.NamedType( + "v1AttrCert", + rfc5652.AttributeCertificateV2().subtype( + implicitTag=rfc5652.tag.Tag( + rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 1 + ) + ), + ), + rfc5652.namedtype.NamedType( + "v2AttrCert", + rfc5652.AttributeCertificateV2().subtype( + implicitTag=rfc5652.tag.Tag( + rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 2 + ) + ), + ), + rfc5652.namedtype.NamedType( + "other", + rfc5652.OtherCertificateFormat().subtype( + implicitTag=rfc5652.tag.Tag( + rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 3 + ) + ), + ), ) - CertificateChoices._readOnly['componentType'] = CertificateChoices.componentType + CertificateChoices._readOnly["componentType"] = CertificateChoices.componentType try: yield SignedData() finally: - del CertificateChoices._readOnly['componentType'] + del CertificateChoices._readOnly["componentType"] CertificateChoices.componentType = original_component_type - CertificateChoices._readOnly['componentType'] = CertificateChoices.componentType + CertificateChoices._readOnly["componentType"] = CertificateChoices.componentType diff --git a/signify/asn1/preserving_der.py b/signify/asn1/preserving_der.py index 2c1b70f..a8b6a0b 100644 --- a/signify/asn1/preserving_der.py +++ b/signify/asn1/preserving_der.py @@ -3,22 +3,21 @@ from pyasn1.compat.octets import null, str2octs from pyasn1.type import univ -__all__ = ['encode'] +__all__ = ["encode"] class SetOfEncoder(cer_encoder.SetOfEncoder): # type: ignore[misc] - """This class is identical to the one of the CER encoder, except that the sorting has been removed. """ + """This class is identical to the one of the CER encoder, except that the sorting + has been removed. + """ def encodeValue(self, value, asn1Spec, encodeFun, **options): # type: ignore[no-untyped-def] - chunks = self._encodeComponents( - value, asn1Spec, encodeFun, **options) + chunks = self._encodeComponents(value, asn1Spec, encodeFun, **options) if len(chunks) > 1: - zero = str2octs('\x00') + zero = str2octs("\x00") maxLen = max(map(len, chunks)) - paddedChunks = [ - (x.ljust(maxLen, zero), x) for x in chunks - ] + paddedChunks = [(x.ljust(maxLen, zero), x) for x in chunks] chunks = [x[1] for x in paddedChunks] @@ -26,14 +25,10 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options): # type: ignore[no tagMap = encoder.tagMap.copy() -tagMap.update({ - univ.SetOf.tagSet: SetOfEncoder() -}) +tagMap.update({univ.SetOf.tagSet: SetOfEncoder()}) typeMap = encoder.typeMap.copy() -typeMap.update({ - univ.SetOf.typeId: SetOfEncoder() -}) +typeMap.update({univ.SetOf.typeId: SetOfEncoder()}) class Encoder(encoder.Encoder): # type: ignore[misc] diff --git a/signify/asn1/spc.py b/signify/asn1/spc.py index cd92ccd..ec8a250 100755 --- a/signify/asn1/spc.py +++ b/signify/asn1/spc.py @@ -33,15 +33,16 @@ class SpcAttributeTypeAndOptionalValue(univ.Sequence): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.NamedType('type', rfc2459.AttributeType()), - namedtype.OptionalNamedType('value', rfc2459.AttributeValue()) + namedtype.NamedType("type", rfc2459.AttributeType()), + namedtype.OptionalNamedType("value", rfc2459.AttributeValue()), ) class SpcIndirectDataContent(univ.Sequence): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.NamedType('data', SpcAttributeTypeAndOptionalValue()), - namedtype.NamedType('messageDigest', rfc2315.DigestInfo())) + namedtype.NamedType("data", SpcAttributeTypeAndOptionalValue()), + namedtype.NamedType("messageDigest", rfc2315.DigestInfo()), + ) class SpcUuid(univ.OctetString): # type: ignore[misc] @@ -50,61 +51,84 @@ class SpcUuid(univ.OctetString): # type: ignore[misc] class SpcSerializedObject(univ.Sequence): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.NamedType('classId', SpcUuid()), - namedtype.NamedType('serializedData', univ.OctetString())) + namedtype.NamedType("classId", SpcUuid()), + namedtype.NamedType("serializedData", univ.OctetString()), + ) class SpcString(univ.Choice): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.NamedType('unicode', char.BMPString().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) - )), - namedtype.NamedType('ascii', char.IA5String().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) - )) + namedtype.NamedType( + "unicode", + char.BMPString().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) + ), + ), + namedtype.NamedType( + "ascii", + char.IA5String().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) + ), + ), ) def to_python(self) -> str | None: - if 'unicode' in self: - return str(self['unicode']) - elif 'ascii' in self: - return str(self['ascii']) + if "unicode" in self: + return str(self["unicode"]) + elif "ascii" in self: + return str(self["ascii"]) return None class SpcLink(univ.Choice): # type: ignore[misc] """According to Authenticode specification.""" + componentType = namedtype.NamedTypes( - namedtype.NamedType('url', char.IA5String().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) - )), - namedtype.NamedType('moniker', SpcSerializedObject().subtype( - implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) - )), - namedtype.NamedType('file', SpcString().subtype( - explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) - )) + namedtype.NamedType( + "url", + char.IA5String().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) + ), + ), + namedtype.NamedType( + "moniker", + SpcSerializedObject().subtype( + implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) + ), + ), + namedtype.NamedType( + "file", + SpcString().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 2) + ), + ), ) def to_python(self) -> str | None: - if 'url' in self: - return str(self['url']) - elif 'moniker' in self: + if "url" in self: + return str(self["url"]) + elif "moniker" in self: return None # TODO - elif 'file' in self: - return cast(SpcString, self['file']).to_python() + elif "file" in self: + return cast(SpcString, self["file"]).to_python() else: return None class SpcSpOpusInfo(univ.Sequence): # type: ignore[misc] componentType = namedtype.NamedTypes( - namedtype.OptionalNamedType('programName', SpcString().subtype( - explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) - )), - namedtype.OptionalNamedType('moreInfo', SpcLink().subtype( - explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) - )) + namedtype.OptionalNamedType( + "programName", + SpcString().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) + ), + ), + namedtype.OptionalNamedType( + "moreInfo", + SpcLink().subtype( + explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1) + ), + ), ) diff --git a/signify/authenticode/__init__.py b/signify/authenticode/__init__.py index 320fe93..c20a96d 100644 --- a/signify/authenticode/__init__.py +++ b/signify/authenticode/__init__.py @@ -1,11 +1,33 @@ -from .structures import CERTIFICATE_LOCATION, TRUSTED_CERTIFICATE_STORE_NO_CTL, TRUSTED_CERTIFICATE_STORE, \ - AuthenticodeVerificationResult, AuthenticodeCounterSignerInfo, AuthenticodeSignerInfo, SpcInfo, \ - AuthenticodeSignedData, RFC3161SignerInfo, TSTInfo, RFC3161SignedData +from .structures import ( + CERTIFICATE_LOCATION, + TRUSTED_CERTIFICATE_STORE_NO_CTL, + TRUSTED_CERTIFICATE_STORE, + AuthenticodeVerificationResult, + AuthenticodeCounterSignerInfo, + AuthenticodeSignerInfo, + SpcInfo, + AuthenticodeSignedData, + RFC3161SignerInfo, + TSTInfo, + RFC3161SignedData, +) from .signed_pe import SignedPEFile -from .authroot import AUTHROOTSTL_PATH, CertificateTrustList, \ - CertificateTrustSubject +from .authroot import AUTHROOTSTL_PATH, CertificateTrustList, CertificateTrustSubject -__all__ = ["CERTIFICATE_LOCATION", "TRUSTED_CERTIFICATE_STORE_NO_CTL", "TRUSTED_CERTIFICATE_STORE", - "AuthenticodeVerificationResult", "AuthenticodeCounterSignerInfo", "AuthenticodeSignerInfo", "SpcInfo", - "AuthenticodeSignedData", "RFC3161SignerInfo", "TSTInfo", "RFC3161SignedData", "SignedPEFile", - "AUTHROOTSTL_PATH", "CertificateTrustList", "CertificateTrustSubject"] +__all__ = [ + "CERTIFICATE_LOCATION", + "TRUSTED_CERTIFICATE_STORE_NO_CTL", + "TRUSTED_CERTIFICATE_STORE", + "AuthenticodeVerificationResult", + "AuthenticodeCounterSignerInfo", + "AuthenticodeSignerInfo", + "SpcInfo", + "AuthenticodeSignedData", + "RFC3161SignerInfo", + "TSTInfo", + "RFC3161SignedData", + "SignedPEFile", + "AUTHROOTSTL_PATH", + "CertificateTrustList", + "CertificateTrustSubject", +] diff --git a/signify/authenticode/authroot.py b/signify/authenticode/authroot.py index 047bae2..99cf8e4 100644 --- a/signify/authenticode/authroot.py +++ b/signify/authenticode/authroot.py @@ -17,7 +17,10 @@ from signify._typing import HashFunction, OidTuple from signify.asn1 import guarded_ber_decode from signify.asn1.helpers import time_to_python -from signify.exceptions import CertificateTrustListParseError, CTLCertificateVerificationError +from signify.exceptions import ( + CertificateTrustListParseError, + CTLCertificateVerificationError, +) from signify.pkcs7.signeddata import SignedData from signify.asn1.hashing import _get_digest_algorithm from signify.x509 import certificates, context @@ -25,9 +28,12 @@ AUTHROOTSTL_PATH = pathlib.Path(mscerts.where(stl=True)) -def _lookup_ekus(extended_key_usages: Iterable[str] | None = None) -> Iterator[OidTuple]: - """Normally we would be able to use certvalidator for this, but we simply can't now we have done this - all to ourselves. So we convert the arguments passed to the function to a list of all object-ID tuples. +def _lookup_ekus( + extended_key_usages: Iterable[str] | None = None, +) -> Iterator[OidTuple]: + """Normally we would be able to use certvalidator for this, but we simply can't + now we have done this all to ourselves. So we convert the arguments passed to the + function to a list of all object-ID tuples. """ if not extended_key_usages: @@ -36,7 +42,9 @@ def _lookup_ekus(extended_key_usages: Iterable[str] | None = None) -> Iterator[O # create an inverted map for the fancy names that are supported from asn1crypto.x509 import KeyPurposeId - inverted_map = {v: tuple(map(int, k.split("."))) for k, v in KeyPurposeId._map.items()} + inverted_map = { + v: tuple(map(int, k.split("."))) for k, v in KeyPurposeId._map.items() + } # now look for all values for eku in extended_key_usages: @@ -47,8 +55,8 @@ def _lookup_ekus(extended_key_usages: Iterable[str] | None = None) -> Iterator[O class CertificateTrustList(SignedData): - """A subclass of :class:`signify.pkcs7.SignedData`, containing a list of trusted root certificates. It is based - on the following ASN.1 structure:: + """A subclass of :class:`signify.pkcs7.SignedData`, containing a list of trusted + root certificates. It is based on the following ASN.1 structure:: CertificateTrustList ::= SEQUENCE { version CTLVersion DEFAULT v1, @@ -118,15 +126,23 @@ def _parse(self) -> None: super()._parse() self.subject_usage = self.content["subjectUsage"][0] - self.list_identifier = bytes(self.content["listIdentifier"]) if self.content["listIdentifier"].isValue else None + self.list_identifier = ( + bytes(self.content["listIdentifier"]) + if self.content["listIdentifier"].isValue + else None + ) self.sequence_number = self.content["sequenceNumber"] self.this_update = time_to_python(self.content["ctlThisUpdate"]) self.next_update = time_to_python(self.content["ctlNextUpdate"]) self.subject_algorithm = _get_digest_algorithm( - self.content["subjectAlgorithm"], location="CertificateTrustList.subjectAlgorithm" + self.content["subjectAlgorithm"], + location="CertificateTrustList.subjectAlgorithm", ) self._subjects = {} - for subj in (CertificateTrustSubject(subject) for subject in self.content["trustedSubjects"]): + for subj in ( + CertificateTrustSubject(subject) + for subject in self.content["trustedSubjects"] + ): self._subjects[subj.identifier.hex().lower()] = subj # TODO: extensions?? @@ -136,9 +152,11 @@ def subjects(self) -> Iterable[CertificateTrustSubject]: return self._subjects.values() - def verify_trust(self, chain: list[certificates.Certificate], *args: Any, **kwargs: Any) -> bool: - """Checks whether the specified certificate is valid in the given conditions according to this Certificate Trust - List. + def verify_trust( + self, chain: list[certificates.Certificate], *args: Any, **kwargs: Any + ) -> bool: + """Checks whether the specified certificate is valid in the given conditions + according to this Certificate Trust List. :param List[Certificate] chain: The certificate chain to verify """ @@ -146,11 +164,16 @@ def verify_trust(self, chain: list[certificates.Certificate], *args: Any, **kwar # Find the subject belonging to this certificate subject = self.find_subject(chain[0]) if not subject: - raise CTLCertificateVerificationError("The root %s is not in the certificate trust list" % chain[0]) + raise CTLCertificateVerificationError( + "The root %s is not in the certificate trust list" % chain[0] + ) return subject.verify_trust(chain, *args, **kwargs) - def find_subject(self, certificate: certificates.Certificate) -> CertificateTrustSubject | None: - """Finds the :class:`CertificateTrustSubject` belonging to the provided :class:`signify.x509.Certificate`. + def find_subject( + self, certificate: certificates.Certificate + ) -> CertificateTrustSubject | None: + """Finds the :class:`CertificateTrustSubject` belonging to the provided + :class:`signify.x509.Certificate`. :param signify.x509.Certificate certificate: The certificate to look for. :rtype: CertificateTrustSubject @@ -161,7 +184,9 @@ def find_subject(self, certificate: certificates.Certificate) -> CertificateTrus elif self.subject_algorithm == hashlib.sha256: identifier = certificate.sha256_fingerprint else: - raise CertificateTrustListParseError("The specified subject algorithm is not yet supported.") + raise CertificateTrustListParseError( + "The specified subject algorithm is not yet supported." + ) return self._subjects.get(identifier) @@ -176,7 +201,9 @@ def from_stl_file(cls, path: pathlib.Path = AUTHROOTSTL_PATH) -> Self: # debug.setLogger(debug.Debug('all')) if asn1.oids.get(content["contentType"]) is not rfc2315.SignedData: - raise CertificateTrustListParseError("ContentInfo does not contain SignedData") + raise CertificateTrustListParseError( + "ContentInfo does not contain SignedData" + ) data = guarded_ber_decode(content["content"], asn1_spec=rfc2315.SignedData()) @@ -186,9 +213,10 @@ def from_stl_file(cls, path: pathlib.Path = AUTHROOTSTL_PATH) -> Self: class CertificateTrustSubject: - """A subject listed in a :class:`CertificateTrustList`. The structure in this object has mostly been - reverse-engineered using Windows tooling such as ``certutil -dump``. We do not pretend to have a complete picture - of all the edge-cases that are considered. + """A subject listed in a :class:`CertificateTrustList`. The structure in this object + has mostly been reverse-engineered using Windows tooling such as ``certutil -dump``. + We do not pretend to have a complete picture of all the edge-cases that are + considered. .. attribute:: data @@ -202,7 +230,8 @@ class CertificateTrustSubject: .. attribute:: extended_key_usages - Defines the EKU's the certificate is valid for. It may be empty, which we take as 'all is acceptable'. + Defines the EKU's the certificate is valid for. It may be empty, which we take + as 'all is acceptable'. .. attribute:: friendly_name @@ -222,8 +251,9 @@ class CertificateTrustSubject: .. attribute:: disallowed_filetime - The time since when a certificate has been disabled. Digital signatures with a timestamp prior to this date - continue to be valid, but use cases after this date are prohibited. It may be used in conjunction with + The time since when a certificate has been disabled. Digital signatures with a + timestamp prior to this date continue to be valid, but use cases after this date + are prohibited. It may be used in conjunction with :attr:`disallowed_extended_key_usages` to define specific EKU's to be disabled. .. attribute:: root_program_chain_policies @@ -232,24 +262,28 @@ class CertificateTrustSubject: .. attribute:: disallowed_extended_key_usages - Defines the EKU's the certificate is not valid for. When used in combination with :attr:`disallowed_filetime`, - the disabled EKU's are only disabled from that date onwards, otherwise, it means since the beginning of time. + Defines the EKU's the certificate is not valid for. When used in combination with + :attr:`disallowed_filetime`, the disabled EKU's are only disabled from that date + onwards, otherwise, it means since the beginning of time. .. attribute:: not_before_filetime - The time since when new certificates from this CA are not trusted. Certificates from prior the date will continue - to validate. When used in conjunction with :attr:`not_before_extended_key_usages`, this only concerns - certificates issued after this date for the defined EKU's. + The time since when new certificates from this CA are not trusted. Certificates + from prior the date will continue to validate. When used in conjunction with + :attr:`not_before_extended_key_usages`, this only concerns certificates issued + after this date for the defined EKU's. .. attribute:: not_before_extended_key_usages - Defines the EKU's for which the :attr:`not_before_filetime` is considered. If that attribute is not defined, - we assume that it means since the beginning of time. + Defines the EKU's for which the :attr:`not_before_filetime` is considered. If + that attribute is not defined, we assume that it means since the beginning of + time. .. warning:: - The interpretation of the various attributes and their implications has been reverse-engineered. Though we seem - to have a fairly solid understanding, various edge-cases may not have been considered. + The interpretation of the various attributes and their implications has been + reverse-engineered. Though we seem to have a fairly solid understanding, various + edge-cases may not have been considered. """ @@ -274,44 +308,68 @@ def _parse(self) -> None: self.extended_key_usages = None if asn1.ctl.EnhkeyUsage in self.attributes: - self.extended_key_usages = [tuple(x) for x in self.attributes[asn1.ctl.EnhkeyUsage][0]] + self.extended_key_usages = [ + tuple(x) for x in self.attributes[asn1.ctl.EnhkeyUsage][0] + ] self.friendly_name = None if asn1.ctl.FriendlyName in self.attributes: - self.friendly_name = bytes(self.attributes[asn1.ctl.FriendlyName][0]).decode("utf-16") + self.friendly_name = bytes( + self.attributes[asn1.ctl.FriendlyName][0] + ).decode("utf-16") - self.key_identifier = bytes(self.attributes.get(asn1.ctl.KeyIdentifier, [b""])[0]) - self.subject_name_md5 = bytes(self.attributes.get(asn1.ctl.SubjectNameMd5Hash, [b""])[0]) + self.key_identifier = bytes( + self.attributes.get(asn1.ctl.KeyIdentifier, [b""])[0] + ) + self.subject_name_md5 = bytes( + self.attributes.get(asn1.ctl.SubjectNameMd5Hash, [b""])[0] + ) # TODO: RootProgramCertPolicies not implemented - self.auth_root_sha256 = bytes(self.attributes.get(asn1.ctl.AuthRootSha256Hash, [b""])[0]) + self.auth_root_sha256 = bytes( + self.attributes.get(asn1.ctl.AuthRootSha256Hash, [b""])[0] + ) self.disallowed_filetime = None if asn1.ctl.DisallowedFiletime in self.attributes: - self.disallowed_filetime = self._filetime_to_datetime(self.attributes[asn1.ctl.DisallowedFiletime][0]) + self.disallowed_filetime = self._filetime_to_datetime( + self.attributes[asn1.ctl.DisallowedFiletime][0] + ) self.root_program_chain_policies = None if asn1.ctl.RootProgramChainPolicies in self.attributes: - self.root_program_chain_policies = [tuple(x) for x in self.attributes[asn1.ctl.RootProgramChainPolicies][0]] + self.root_program_chain_policies = [ + tuple(x) for x in self.attributes[asn1.ctl.RootProgramChainPolicies][0] + ] self.disallowed_extended_key_usages = None if asn1.ctl.DisallowedEnhkeyUsage in self.attributes: - self.disallowed_extended_key_usages = [tuple(x) for x in self.attributes[asn1.ctl.DisallowedEnhkeyUsage][0]] + self.disallowed_extended_key_usages = [ + tuple(x) for x in self.attributes[asn1.ctl.DisallowedEnhkeyUsage][0] + ] self.not_before_filetime = None if asn1.ctl.NotBeforeFiletime in self.attributes: - self.not_before_filetime = self._filetime_to_datetime(self.attributes[asn1.ctl.NotBeforeFiletime][0]) + self.not_before_filetime = self._filetime_to_datetime( + self.attributes[asn1.ctl.NotBeforeFiletime][0] + ) self.not_before_extended_key_usages = None if asn1.ctl.NotBeforeEnhkeyUsage in self.attributes: - self.not_before_extended_key_usages = [tuple(x) for x in self.attributes[asn1.ctl.NotBeforeEnhkeyUsage][0]] - - def verify_trust(self, chain: list[certificates.Certificate], context: context.VerificationContext) -> bool: - """Checks whether the specified certificate is valid in the given conditions according to this Certificate Trust - List. + self.not_before_extended_key_usages = [ + tuple(x) for x in self.attributes[asn1.ctl.NotBeforeEnhkeyUsage][0] + ] + + def verify_trust( + self, + chain: list[certificates.Certificate], + context: context.VerificationContext, + ) -> bool: + """Checks whether the specified certificate is valid in the given conditions + according to this Certificate Trust List. :param List[Certificate] chain: The certificate chain to verify. - :param VerificationContext context: The context to verify with. Mainly the timestamp and extended_key_usages - are used. + :param VerificationContext context: The context to verify with. Mainly the + timestamp and extended_key_usages are used. """ timestamp = context.timestamp @@ -325,28 +383,42 @@ def verify_trust(self, chain: list[certificates.Certificate], context: context.V requested_extended_key_usages = set(_lookup_ekus(extended_key_usages)) # Now check each of the properties - if self.extended_key_usages and (requested_extended_key_usages - set(self.extended_key_usages)): + if self.extended_key_usages and ( + requested_extended_key_usages - set(self.extended_key_usages) + ): raise CTLCertificateVerificationError( "The root %s lists its extended key usages, but %s are not present" - % (self.friendly_name, requested_extended_key_usages - set(self.extended_key_usages)) + % ( + self.friendly_name, + requested_extended_key_usages - set(self.extended_key_usages), + ) ) - # The notBefore time does concern the validity of the certificate that is being validated. It must have a - # notBefore of before the timestamp + # The notBefore time does concern the validity of the certificate that is being + # validated. It must have a notBefore of before the timestamp if self.not_before_filetime is not None: to_verify_timestamp = chain[-1].valid_from if to_verify_timestamp >= self.not_before_filetime: - # If there is a notBefore time, and there is no NotBeforeEnhkeyUsage, then the validity concerns the - # entire certificate. + # If there is a notBefore time, and there is no NotBeforeEnhkeyUsage, + # then the validity concerns the entire certificate. if self.not_before_extended_key_usages is None: raise CTLCertificateVerificationError( - "The root %s is disallowed for certificate issued after %s (certificate is %s)" - % (self.friendly_name, self.not_before_filetime, to_verify_timestamp) + "The root %s is disallowed for certificate issued after %s" + " (certificate is %s)" + % ( + self.friendly_name, + self.not_before_filetime, + to_verify_timestamp, + ) ) - elif any(eku in self.not_before_extended_key_usages for eku in requested_extended_key_usages): + elif any( + eku in self.not_before_extended_key_usages + for eku in requested_extended_key_usages + ): raise CTLCertificateVerificationError( - "The root %s disallows requested EKU's %s to certificates issued after %s (certificate is %s)" + "The root %s disallows requested EKU's %s to certificates" + " issued after %s (certificate is %s)" % ( self.friendly_name, requested_extended_key_usages, @@ -355,25 +427,33 @@ def verify_trust(self, chain: list[certificates.Certificate], context: context.V ) ) elif self.not_before_extended_key_usages is not None and any( - eku in self.not_before_extended_key_usages for eku in requested_extended_key_usages + eku in self.not_before_extended_key_usages + for eku in requested_extended_key_usages ): raise CTLCertificateVerificationError( - "The root %s disallows requested EKU's %s" % (self.friendly_name, requested_extended_key_usages) + "The root %s disallows requested EKU's %s" + % (self.friendly_name, requested_extended_key_usages) ) - # The DisallowedFiletime time does concern the timestamp of the signature being verified. + # The DisallowedFiletime time does concern the timestamp of the signature + # being verified. if self.disallowed_filetime is not None: if timestamp >= self.disallowed_filetime: - # If there is a DisallowedFiletime, and there is no DisallowedEnhkeyUsage, then the validity - # concerns the entire certificate. + # If there is a DisallowedFiletime, and there is no + # DisallowedEnhkeyUsage, then the validity concerns the entire + # certificate. if self.disallowed_extended_key_usages is None: raise CTLCertificateVerificationError( "The root %s is disallowed since %s (requested %s)" % (self.friendly_name, self.disallowed_filetime, timestamp) ) - elif any(eku in self.disallowed_extended_key_usages for eku in requested_extended_key_usages): + elif any( + eku in self.disallowed_extended_key_usages + for eku in requested_extended_key_usages + ): raise CTLCertificateVerificationError( - "The root %s is disallowed for EKU's %s since %s (requested %s at %s)" + "The root %s is disallowed for EKU's %s since %s (requested %s" + " at %s)" % ( self.friendly_name, self.disallowed_extended_key_usages, @@ -383,16 +463,20 @@ def verify_trust(self, chain: list[certificates.Certificate], context: context.V ) ) elif self.disallowed_extended_key_usages is not None and any( - eku in self.disallowed_extended_key_usages for eku in requested_extended_key_usages + eku in self.disallowed_extended_key_usages + for eku in requested_extended_key_usages ): raise CTLCertificateVerificationError( - "The root %s disallows requested EKU's %s" % (self.friendly_name, requested_extended_key_usages) + "The root %s disallows requested EKU's %s" + % (self.friendly_name, requested_extended_key_usages) ) return True @classmethod - def _parse_attributes(cls, data: rfc2315.Attributes) -> dict[OidTuple | Type[Asn1Type], list[Any]]: + def _parse_attributes( + cls, data: rfc2315.Attributes + ) -> dict[OidTuple | Type[Asn1Type], list[Any]]: """Given a set of Attributes, parses them and returns them as a dict :param data: The attributes to process @@ -404,8 +488,8 @@ def _parse_attributes(cls, data: rfc2315.Attributes) -> dict[OidTuple | Type[Asn values = [] for value in attr["values"]: if not isinstance(typ, tuple): - # This should transparently handle when the data is encapsulated in an OctetString but we are - # not expecting an OctetString + # This should transparently handle when the data is encapsulated in + # an OctetString but we are not expecting an OctetString try: if not isinstance(type, univ.OctetString): _, v = ber_decoder.decode(value, recursiveFlag=0) @@ -420,7 +504,9 @@ def _parse_attributes(cls, data: rfc2315.Attributes) -> dict[OidTuple | Type[Asn return result @classmethod - def _filetime_to_datetime(cls, filetime: univ.OctetString) -> datetime.datetime | None: + def _filetime_to_datetime( + cls, filetime: univ.OctetString + ) -> datetime.datetime | None: if not filetime: return None diff --git a/signify/authenticode/signed_pe.py b/signify/authenticode/signed_pe.py index fa97c51..7785588 100644 --- a/signify/authenticode/signed_pe.py +++ b/signify/authenticode/signed_pe.py @@ -47,7 +47,9 @@ logger = logging.getLogger(__name__) RelRange = collections.namedtuple("RelRange", "start length") -ParsedCertTable = TypedDict("ParsedCertTable", {"revision": int, "type": int, "certificate": bytes}) +ParsedCertTable = TypedDict( + "ParsedCertTable", {"revision": int, "type": int, "certificate": bytes} +) class SignedPEFile: @@ -81,7 +83,11 @@ def get_authenticode_omit_sections(self) -> dict[str, RelRange] | None: locations = self._parse_pe_header_locations() except (SignedPEParseError, struct.error): return None - return {k: v for k, v in locations.items() if k in ["checksum", "datadir_certtable", "certtable"]} + return { + k: v + for k, v in locations.items() + if k in ["checksum", "datadir_certtable", "certtable"] + } def _parse_pe_header_locations(self) -> dict[str, RelRange]: """Parses a PE file to find the sections to exclude from the AuthentiCode hash. @@ -101,7 +107,8 @@ def _parse_pe_header_locations(self) -> dict[str, RelRange]: pe_offset = struct.unpack("= self._filelength: raise SignedPEParseError( - "PE header location is beyond file boundaries (%d >= %d)" % (pe_offset, self._filelength) + "PE header location is beyond file boundaries (%d >= %d)" + % (pe_offset, self._filelength) ) # Check if the PE header is PE @@ -125,7 +132,9 @@ def _parse_pe_header_locations(self) -> dict[str, RelRange]: # We can't do authenticode-style hashing. If this is a valid binary, # which it can be, the header still does not even contain a checksum. raise SignedPEParseError( - "The optional header size is %d < 68, which is insufficient for authenticode", optional_header_size + "The optional header size is %d < 68, which is insufficient for" + " authenticode", + optional_header_size, ) # The optional header contains the signature of the image @@ -139,29 +148,33 @@ def _parse_pe_header_locations(self) -> dict[str, RelRange]: cert_base = optional_header_offset + 144 # Certificate Table else: # A ROM image or such, not in the PE/COFF specs. Not sure what to do. - raise SignedPEParseError("The PE Optional Header signature is %x, which is unknown", signature) + raise SignedPEParseError( + "The PE Optional Header signature is %x, which is unknown", signature + ) # According to the specification, the checksum should not be hashed. location["checksum"] = RelRange(optional_header_offset + 64, 4) # Read the RVA if optional_header_offset + optional_header_size < rva_base + 4: - logger.debug("The PE Optional Header size can not accommodate for the NumberOfRvaAndSizes field") + logger.debug( + "The PE Optional Header size can not accommodate for the" + " NumberOfRvaAndSizes field" + ) return location self.file.seek(rva_base, os.SEEK_SET) number_of_rva = struct.unpack(" dict[str, RelRange]: logger.debug("The Certificate Table is empty") return location - if address < optional_header_size + optional_header_offset or address + size > self._filelength: + if ( + address < optional_header_size + optional_header_offset + or address + size > self._filelength + ): logger.debug( - ( - "The location of the Certificate Table in the binary makes no sense and is either beyond the " - "boundaries of the file, or in the middle of the PE header; " - "VirtualAddress: %x, Size: %x" - ), + "The location of the Certificate Table in the binary makes no sense and" + " is either beyond the boundaries of the file, or in the middle of the" + " PE header; VirtualAddress: %x, Size: %x", address, size, ) @@ -196,13 +210,17 @@ def _parse_cert_table(self) -> Iterator[ParsedCertTable]: locations = self.get_authenticode_omit_sections() if not locations or "certtable" not in locations: - raise SignedPEParseError("The PE file does not contain a certificate table.") + raise SignedPEParseError( + "The PE file does not contain a certificate table." + ) position = locations["certtable"].start while position < sum(locations["certtable"]): # check if this position is viable, we need at least 8 bytes for our header if position + 8 > self._filelength: - raise SignedPEParseError("Position of certificate table is beyond length of file") + raise SignedPEParseError( + "Position of certificate table is beyond length of file" + ) self.file.seek(position, os.SEEK_SET) length = struct.unpack(" Iterator[ParsedCertTable]: raise SignedPEParseError("Invalid length in certificate table header") certificate = self.file.read(length - 8) - yield {"revision": revision, "type": certificate_type, "certificate": certificate} + yield { + "revision": revision, + "type": certificate_type, + "certificate": certificate, + } position += length + (8 - length % 8) % 8 def get_fingerprinter(self) -> fingerprinter.AuthenticodeFingerprinter: @@ -231,12 +253,17 @@ def signed_datas(self) -> Iterator[structures.AuthenticodeSignedData]: yield from self.iter_signed_datas() - def iter_signed_datas(self, include_nested: bool = True) -> Iterator[structures.AuthenticodeSignedData]: - """Returns an iterator over :class:`AuthenticodeSignedData` objects relevant for this PE file. + def iter_signed_datas( + self, include_nested: bool = True + ) -> Iterator[structures.AuthenticodeSignedData]: + """Returns an iterator over :class:`AuthenticodeSignedData` objects relevant + for this PE file. - :param include_nested: Boolean, if True, will also iterate over all nested SignedData structures + :param include_nested: Boolean, if True, will also iterate over all nested + SignedData structures :raises SignedPEParseError: For parse errors in the PEFile - :raises signify.authenticode.AuthenticodeParseError: For parse errors in the SignedData + :raises signify.authenticode.AuthenticodeParseError: For parse errors in the + SignedData :return: iterator of signify.authenticode.SignedData """ @@ -251,27 +278,35 @@ def recursive_nested( found = False for certificate in self._parse_cert_table(): if certificate["revision"] != 0x200: - raise SignedPEParseError(f"Unknown certificate revision {certificate['revision']!r}") + raise SignedPEParseError( + f"Unknown certificate revision {certificate['revision']!r}" + ) if certificate["type"] == 2: yield from recursive_nested( - structures.AuthenticodeSignedData.from_envelope(certificate["certificate"], pefile=self) + structures.AuthenticodeSignedData.from_envelope( + certificate["certificate"], pefile=self + ) ) found = True if not found: - raise SignedPEParseError("A SignedData structure was not found in the PE file's Certificate Table") + raise SignedPEParseError( + "A SignedData structure was not found in the PE file's Certificate" + " Table" + ) def _calculate_expected_hashes( self, signed_datas: Iterable[structures.AuthenticodeSignedData], expected_hashes: dict[str, bytes] | None = None, ) -> dict[str, bytes]: - """Calculates the expected hashes that are needed for verification. This provides a small speed-up - by pre-calculating all hashes, so that not each individual SignerInfo object is responsible for calculating - their own hash. + """Calculates the expected hashes that are needed for verification. This + provides a small speed-up by pre-calculating all hashes, so that not each + individual SignerInfo object is responsible for calculating their own hash. - :param signed_datas: The signed datas of this object. Provided to allow :meth:`verify` to prefetch these + :param signed_datas: The signed datas of this object. Provided to allow + :meth:`verify` to prefetch these :param expected_hashes: Hashes provided by the caller of :meth:`verify` :return: All required hashes """ @@ -303,20 +338,22 @@ def verify( expected_hashes: dict[str, bytes] | None = None, **kwargs: Any, ) -> bool: - """Verifies the SignedData structures. This is a little bit more efficient than calling all verify-methods - separately. + """Verifies the SignedData structures. This is a little bit more efficient than + calling all verify-methods separately. - :param expected_hashes: When provided, should be a mapping of hash names to digests. This could speed up the - verification process. - :param multi_verify_mode: Indicates how to verify when there are multiple :cls:`AuthenticodeSignedData` objects - in this PE file. Can be: + :param expected_hashes: When provided, should be a mapping of hash names to + digests. This could speed up the verification process. + :param multi_verify_mode: Indicates how to verify when there are multiple + :cls:`AuthenticodeSignedData` objects in this PE file. Can be: - * 'any' (default) to indicate that any of the signatures must validate correctly. + * 'any' (default) to indicate that any of the signatures must validate + correctly. * 'first' to indicate that the first signature must verify correctly (the default of tools such as sigcheck.exe) * 'all' to indicate that all signatures must verify - * 'best' to indicate that the signature using the best hashing algorithm must verify (e.g. if both SHA-1 - and SHA-256 are present, only SHA-256 is checked); if multiple signatures exist with the same algorithm, + * 'best' to indicate that the signature using the best hashing algorithm + must verify (e.g. if both SHA-1 and SHA-256 are present, only SHA-256 + is checked); if multiple signatures exist with the same algorithm, any may verify This argument has no effect when only one signature is present. @@ -330,15 +367,19 @@ def verify( if not signed_datas: raise AuthenticodeNotSignedError("No valid SignedData structure was found.") - # only consider the first signed_data; by selecting it here we prevent calculating more hashes than needed + # only consider the first signed_data; by selecting it here we prevent + # calculating more hashes than needed if multi_verify_mode == "first": signed_datas = [signed_datas[0]] elif multi_verify_mode == "best": # ACCEPTED_DIGEST_ALGORITHMS contains the algorithms in worst to best order best_algorithm = max( - (sd.digest_algorithm for sd in signed_datas), key=lambda alg: ACCEPTED_DIGEST_ALGORITHMS.index(alg) + (sd.digest_algorithm for sd in signed_datas), + key=lambda alg: ACCEPTED_DIGEST_ALGORITHMS.index(alg), ) - signed_datas = [sd for sd in signed_datas if sd.digest_algorithm == best_algorithm] + signed_datas = [ + sd for sd in signed_datas if sd.digest_algorithm == best_algorithm + ] expected_hashes = self._calculate_expected_hashes(signed_datas, expected_hashes) @@ -347,10 +388,13 @@ def verify( assert signed_datas for signed_data in signed_datas: try: - signed_data.verify(expected_hash=expected_hashes[signed_data.digest_algorithm().name], **kwargs) + signed_data.verify( + expected_hash=expected_hashes[signed_data.digest_algorithm().name], + **kwargs, + ) except Exception as e: - # best and any are interpreted as any; first doesn't matter either way, but raising where it is raised - # is a little bit clearer + # best and any are interpreted as any; first doesn't matter either way, + # but raising where it is raised is a little bit clearer if multi_verify_mode in ("all", "first"): raise last_error = e @@ -364,12 +408,15 @@ def verify( def explain_verify( self, *args: Any, **kwargs: Any ) -> tuple[structures.AuthenticodeVerificationResult, Exception | None]: - """This will return a value indicating the signature status of this PE file. This will not raise an error - when the verification fails, but rather indicate this through the resulting enum + """This will return a value indicating the signature status of this PE file. + This will not raise an error when the verification fails, but rather + indicate this through the resulting enum :rtype: (signify.authenticode.AuthenticodeVerificationResult, Exception) :returns: The verification result, and the exception containing more details (if available or None) """ - return structures.AuthenticodeVerificationResult.call(self.verify, *args, **kwargs) + return structures.AuthenticodeVerificationResult.call( + self.verify, *args, **kwargs + ) diff --git a/signify/authenticode/structures.py b/signify/authenticode/structures.py index 9af4454..3d6f2af 100644 --- a/signify/authenticode/structures.py +++ b/signify/authenticode/structures.py @@ -44,7 +44,13 @@ from signify.authenticode.authroot import CertificateTrustList from signify.asn1 import guarded_ber_decode, pkcs7, spc from signify.asn1.helpers import accuracy_to_python, patch_rfc5652_signeddata -from signify.x509 import CertificateName, VerificationContext, FileSystemCertificateStore, CertificateStore, Certificate +from signify.x509 import ( + CertificateName, + VerificationContext, + FileSystemCertificateStore, + CertificateStore, + Certificate, +) from signify.exceptions import ( AuthenticodeParseError, ParseError, @@ -63,9 +69,13 @@ logger = logging.getLogger(__name__) CERTIFICATE_LOCATION = pathlib.Path(mscerts.where(stl=False)) -TRUSTED_CERTIFICATE_STORE_NO_CTL = FileSystemCertificateStore(location=CERTIFICATE_LOCATION, trusted=True) +TRUSTED_CERTIFICATE_STORE_NO_CTL = FileSystemCertificateStore( + location=CERTIFICATE_LOCATION, trusted=True +) TRUSTED_CERTIFICATE_STORE = FileSystemCertificateStore( - location=CERTIFICATE_LOCATION, trusted=True, ctl=CertificateTrustList.from_stl_file() + location=CERTIFICATE_LOCATION, + trusted=True, + ctl=CertificateTrustList.from_stl_file(), ) @@ -73,9 +83,10 @@ class AuthenticodeVerificationResult(enum.Enum): - """This represents the result of an Authenticode verification. If everything is OK, it will equal to - ``AuthenticodeVerificationResult.OK``, otherwise one of the other enum items will be returned. Remember that only - the first exception is processed - there may be more wrong. + """This represents the result of an Authenticode verification. If everything is OK, + it will equal to ``AuthenticodeVerificationResult.OK``, otherwise one of the + other enum items will be returned. Remember that onl the first exception is + processed - there may be more wrong. """ OK = enum.auto() @@ -85,20 +96,22 @@ class AuthenticodeVerificationResult(enum.Enum): PARSE_ERROR = enum.auto() """The Authenticode signature could not be parsed.""" VERIFY_ERROR = enum.auto() - """The Authenticode signature could not be verified. This is a more generic error than other possible - statuses and is used as a catch-all. + """The Authenticode signature could not be verified. This is a more generic error + than other possible statuses and is used as a catch-all. """ UNKNOWN_ERROR = enum.auto() """An unknown error occurred during parsing or verifying.""" CERTIFICATE_ERROR = enum.auto() - """An error occurred during the processing of a certificate (e.g. during chain building), or when verifying the - certificate's signature. + """An error occurred during the processing of a certificate (e.g. during chain + building), or when verifying the certificate's signature. """ INCONSISTENT_DIGEST_ALGORITHM = enum.auto() - """A highly specific error raised when different digest algorithms are used in SignedData, SpcInfo or SignerInfo.""" + """A highly specific error raised when different digest algorithms are used in + SignedData, SpcInfo or SignerInfo. + """ INVALID_DIGEST = enum.auto() - """The verified digest does not match the calculated digest of the file. This is a tell-tale sign that the file - may have been tampered with. + """The verified digest does not match the calculated digest of the file. This is a + tell-tale sign that the file may have been tampered with. """ COUNTERSIGNER_ERROR = enum.auto() """Something went wrong when verifying the countersignature.""" @@ -130,36 +143,45 @@ def call( class AuthenticodeCounterSignerInfo(CounterSignerInfo): - """Subclass of :class:`CounterSignerInfo` that is used to contain the countersignerinfo for Authenticode.""" + """Subclass of :class:`CounterSignerInfo` that is used to contain the + countersignerinfo for Authenticode. + """ - _required_authenticated_attributes = (rfc2315.ContentType, rfc5652.SigningTime, rfc2315.Digest) + _required_authenticated_attributes = ( + rfc2315.ContentType, + rfc5652.SigningTime, + rfc2315.Digest, + ) class AuthenticodeSignerInfo(SignerInfo): - """Subclass of :class:`SignerInfo` that is used by the verification of Authenticode. Note that this will contain - the same attributes as :class:`SignerInfo`, and additionally the following: + """Subclass of :class:`SignerInfo` that is used by the verification of Authenticode. + Note that this will contain the same attributes as :class:`SignerInfo`, and + additionally the following: .. attribute:: program_name more_info - This information is extracted from the SpcSpOpusInfo authenticated attribute, containing the program's name and - an URL with more information. + This information is extracted from the SpcSpOpusInfo authenticated attribute, + containing the program's name and an URL with more information. .. attribute:: nested_signed_datas - It is possible for Authenticode SignerInfo objects to contain nested :class:`signify.pkcs7.SignedData` - objects. This is similar to including multiple SignedData structures in the - :class:`signify.authenticode.SignedPEFile`. This field is extracted from the unauthenticated attributes. + It is possible for Authenticode SignerInfo objects to contain nested + :class:`signify.pkcs7.SignedData` objects. This is similar to including + multiple SignedData structures in the :class:`signify.authenticode.SignedPEFile`. + This field is extracted from the unauthenticated attributes. - The :attr:`countersigner` attribute can hold the same as in the normal :class:`SignerInfo`, but may also contain a - :class:`RFC3161SignedData` class: + The :attr:`countersigner` attribute can hold the same as in the normal + :class:`SignerInfo`, but may also contain a :class:`RFC3161SignedData` class: .. attribute:: countersigner - Authenticode may use a different countersigning mechanism, rather than using a nested - :class:`AuthenticodeCounterSignerInfo`, it may use a nested RFC-3161 response, which is a nested - :class:`signify.pkcs7.SignedData` structure (of type :class:`RFC3161SignedData`). This is also assigned - to the countersigner attribute if this is available. + Authenticode may use a different countersigning mechanism, rather than using a + nested :class:`AuthenticodeCounterSignerInfo`, it may use a nested RFC-3161 + response, which is a nested :class:`signify.pkcs7.SignedData` structure + (of type :class:`RFC3161SignedData`). This is also assigned to the countersigner + attribute if this is available. """ @@ -179,61 +201,85 @@ class AuthenticodeSignerInfo(SignerInfo): def _parse(self) -> None: super()._parse() - # - Retrieve object from SpcSpOpusInfo from the authenticated attributes (for normal signer) + # - Retrieve object from SpcSpOpusInfo from the authenticated attributes + # (for normal signer) self.program_name = self.more_info = None if asn1.spc.SpcSpOpusInfo in self.authenticated_attributes: if len(self.authenticated_attributes[asn1.spc.SpcSpOpusInfo]) != 1: - raise AuthenticodeParseError("Only one SpcSpOpusInfo expected in SignerInfo.authenticatedAttributes") + raise AuthenticodeParseError( + "Only one SpcSpOpusInfo expected in" + " SignerInfo.authenticatedAttributes" + ) - self.program_name = self.authenticated_attributes[asn1.spc.SpcSpOpusInfo][0]["programName"].to_python() - self.more_info = self.authenticated_attributes[asn1.spc.SpcSpOpusInfo][0]["moreInfo"].to_python() + self.program_name = self.authenticated_attributes[asn1.spc.SpcSpOpusInfo][ + 0 + ]["programName"].to_python() + self.more_info = self.authenticated_attributes[asn1.spc.SpcSpOpusInfo][0][ + "moreInfo" + ].to_python() # - Authenticode can use nested signatures through OID 1.3.6.1.4.1.311.2.4.1 self.nested_signed_datas = [] if asn1.spc.SpcNestedSignature in self.unauthenticated_attributes: - for sig_data in self.unauthenticated_attributes[asn1.spc.SpcNestedSignature]: + for sig_data in self.unauthenticated_attributes[ + asn1.spc.SpcNestedSignature + ]: content_type = asn1.oids.get(sig_data["contentType"]) if content_type is not rfc2315.SignedData: - raise AuthenticodeParseError("Nested signature is not a SignedData structure") + raise AuthenticodeParseError( + "Nested signature is not a SignedData structure" + ) signed_data: rfc2315.SignedData = guarded_ber_decode( sig_data["content"], asn1_spec=content_type(), # type: ignore[operator] ) - self.nested_signed_datas.append(AuthenticodeSignedData(signed_data, pefile=self.parent.pefile)) + self.nested_signed_datas.append( + AuthenticodeSignedData(signed_data, pefile=self.parent.pefile) + ) - # - Authenticode can be signed using a RFC-3161 timestamp, so we discover this possibility here + # - Authenticode can be signed using a RFC-3161 timestamp, so we discover this + # possibility here if ( pkcs7.Countersignature in self.unauthenticated_attributes and asn1.spc.SpcRfc3161Timestamp in self.unauthenticated_attributes ): raise AuthenticodeParseError( - "Countersignature and RFC-3161 timestamp present in SignerInfo.unauthenticatedAttributes" + "Countersignature and RFC-3161 timestamp present in" + " SignerInfo.unauthenticatedAttributes" ) if asn1.spc.SpcRfc3161Timestamp in self.unauthenticated_attributes: if len(self.unauthenticated_attributes[asn1.spc.SpcRfc3161Timestamp]) != 1: raise AuthenticodeParseError( - "Only one RFC-3161 timestamp expected in SignerInfo.unauthenticatedAttributes" + "Only one RFC-3161 timestamp expected in" + " SignerInfo.unauthenticatedAttributes" ) ts_data = self.unauthenticated_attributes[asn1.spc.SpcRfc3161Timestamp][0] content_type = asn1.oids.get(ts_data["contentType"]) if content_type is not rfc2315.SignedData: - raise AuthenticodeParseError("RFC-3161 Timestamp does not contain SignedData structure") + raise AuthenticodeParseError( + "RFC-3161 Timestamp does not contain SignedData structure" + ) # Note that we expect rfc5652 compatible data here - # This is a work-around for incorrectly tagged v2AttrCerts in the BER-encoded blob, - # see the docstring for patch_rfc5652_signeddata for more details + # This is a work-around for incorrectly tagged v2AttrCerts in the + # BER-encoded blob, see the docstring for patch_rfc5652_signeddata for + # more details try: - signed_data = guarded_ber_decode(ts_data["content"], asn1_spec=rfc5652.SignedData()) + signed_data = guarded_ber_decode( + ts_data["content"], asn1_spec=rfc5652.SignedData() + ) except ParseError: with patch_rfc5652_signeddata() as asn1_spec: - signed_data = guarded_ber_decode(ts_data["content"], asn1_spec=asn1_spec) + signed_data = guarded_ber_decode( + ts_data["content"], asn1_spec=asn1_spec + ) self.countersigner = RFC3161SignedData(signed_data) class SpcInfo: - """The Authenticode's SpcIndirectDataContent information, and their children. This is expected to be part of the - content of the SignedData structure in Authenticode. + """The Authenticode's SpcIndirectDataContent information, and their children. This + is expected to be part of the content of the SignedData structure in Authenticode. Note that this structure is completely flattened out from this ASN.1 spec:: @@ -285,22 +331,25 @@ def _parse(self) -> None: if "value" in self.data["data"] and self.data["data"]["value"].isValue: self.image_data = None # TODO: not parsed - # image_data = _guarded_ber_decode((self.data['data']['value'], asn1_spec=self.content_type()) + # image_data = _guarded_ber_decode((self.data['data']['value'], + # asn1_spec=self.content_type()) self.digest_algorithm = _get_digest_algorithm( - self.data["messageDigest"]["digestAlgorithm"], location="SpcIndirectDataContent.digestAlgorithm" + self.data["messageDigest"]["digestAlgorithm"], + location="SpcIndirectDataContent.digestAlgorithm", ) self.digest = bytes(self.data["messageDigest"]["digest"]) class AuthenticodeSignedData(SignedData): - """The :class:`signify.pkcs7.SignedData` structure for Authenticode. It holds the same information as its - superclass, with additionally the :class:`SpcInfo`: + """The :class:`signify.pkcs7.SignedData` structure for Authenticode. It holds the + same information as its superclass, with additionally the :class:`SpcInfo`: .. attribute:: spc_info - The parsed :attr:`content` of this :class:`SignedData` object, being a SpcIndirectDataContent object. + The parsed :attr:`content` of this :class:`SignedData` object, being a + SpcIndirectDataContent object. """ @@ -313,7 +362,11 @@ class AuthenticodeSignedData(SignedData): _expected_content_type = asn1.spc.SpcIndirectDataContent _signerinfo_class = AuthenticodeSignerInfo - def __init__(self, data: rfc2315.SignedData | rfc5652.SignedData, pefile: signed_pe.SignedPEFile | None = None): + def __init__( + self, + data: rfc2315.SignedData | rfc5652.SignedData, + pefile: signed_pe.SignedPEFile | None = None, + ): """ :param asn1.pkcs7.SignedData data: The ASN.1 structure of the SignedData object :param pefile: The related PEFile. @@ -324,7 +377,9 @@ def __init__(self, data: rfc2315.SignedData | rfc5652.SignedData, pefile: signed def _parse(self) -> None: # Parse the fields of the SignedData structure if self.data["version"] != 1: - raise AuthenticodeParseError("SignedData.version must be 1, not %d" % self.data["version"]) + raise AuthenticodeParseError( + "SignedData.version must be 1, not %d" % self.data["version"] + ) super()._parse() self.spc_info = SpcInfo(self.content) @@ -332,14 +387,17 @@ def _parse(self) -> None: # signerInfos if len(self.signer_infos) != 1: raise AuthenticodeParseError( - "SignedData.signerInfos must contain exactly 1 signer, not %d" % len(self.signer_infos) + "SignedData.signerInfos must contain exactly 1 signer, not %d" + % len(self.signer_infos) ) self.signer_info = self.signer_infos[0] # CRLs if "crls" in self.data and self.data["crls"].isValue: - raise AuthenticodeParseError("SignedData.crls is present, but that is unexpected.") + raise AuthenticodeParseError( + "SignedData.crls is present, but that is unexpected." + ) def verify( self, @@ -353,40 +411,53 @@ def verify( ) -> None: """Verifies the SignedData structure: - * Verifies that the digest algorithms match across the structure (:class:`SpcInfo`, - :class:`AuthenticodeSignedData` and :class:`AuthenticodeSignerInfo` must have the same) - * Ensures that the hash in :attr:`SpcInfo.digest` matches the expected hash. If no expected hash is - provided to this function, it is calculated using the :class:`Fingerprinter` obtained from the - :class:`SignedPEFile` object. - * Verifies that the :class:`SpcInfo`, when hashed, is the same as the value in :attr:`SignerInfo.message_digest` - * In the case of a countersigner, calls :meth:`check_message_digest` on the countersigner to verify that the - hashed value of :attr:`AuthenticodeSignerInfo.encrypted_digest` is contained in the countersigner. - * Verifies the chain of the countersigner up to a trusted root, see :meth:`SignerInfo.verify` - and :meth:`RFC3161SignedData.verify` - * Verifies the chain of the signer up to a trusted root, see :meth:`SignerInfo.verify` - - In the case of a countersigner, the verification is performed using the timestamp of the - :class:`CounterSignerInfo`, otherwise now is assumed. If there is no countersigner, you can override this - by specifying a different timestamp in the :class:`VerificationContext`. Note that you cannot set a timestamp - when checking against the CRL; this is not permitted by the underlying library. If you need to do this, you - must therefore set countersignature_mode to ``ignore``. - - :param bytes expected_hash: The expected hash digest of the :class:`SignedPEFile`. - :param VerificationContext verification_context: The VerificationContext for verifying the chain of the - :class:`SignerInfo`. The timestamp is overridden in the case of a countersigner. Default stores are - TRUSTED_CERTIFICATE_STORE and the certificates of this :class:`SignedData` object. EKU is code_signing - :param VerificationContext cs_verification_context: The VerificationContext for verifying the chain of the - :class:`CounterSignerInfo`. The timestamp is overridden in the case of a countersigner. Default stores are - TRUSTED_CERTIFICATE_STORE and the certificates of this :class:`SignedData` object. EKU is time_stamping - :param CertificateStore trusted_certificate_store: A :class:`CertificateStore` object that contains a list of - trusted certificates to be used when :const:`None` is passed to either ``verification_context`` or + * Verifies that the digest algorithms match across the structure + (:class:`SpcInfo`, :class:`AuthenticodeSignedData` and + :class:`AuthenticodeSignerInfo` must have the same) + * Ensures that the hash in :attr:`SpcInfo.digest` matches the expected hash. + If no expected hash is provided to this function, it is calculated using + the :class:`Fingerprinter` obtained from the :class:`SignedPEFile` object. + * Verifies that the :class:`SpcInfo`, when hashed, is the same as the value in + :attr:`SignerInfo.message_digest` + * In the case of a countersigner, calls :meth:`check_message_digest` on the + countersigner to verify that the hashed value of + :attr:`AuthenticodeSignerInfo.encrypted_digest` is contained in the + countersigner. + * Verifies the chain of the countersigner up to a trusted root, see + :meth:`SignerInfo.verify` and :meth:`RFC3161SignedData.verify` + * Verifies the chain of the signer up to a trusted root, see + :meth:`SignerInfo.verify` + + In the case of a countersigner, the verification is performed using the + timestamp of the :class:`CounterSignerInfo`, otherwise now is assumed. If there + is no countersigner, you can override this by specifying a different timestamp + in the :class:`VerificationContext`. Note that you cannot set a timestamp when + checking against the CRL; this is not permitted by the underlying library. If + you need to do this, you must therefore set countersignature_mode to ``ignore``. + + :param bytes expected_hash: The expected hash digest of the + :class:`SignedPEFile`. + :param VerificationContext verification_context: The VerificationContext for + verifying the chain of the :class:`SignerInfo`. The timestamp is overridden + in the case of a countersigner. Default stores are TRUSTED_CERTIFICATE_STORE + and the certificates of this :class:`SignedData` object. EKU is code_signing + :param VerificationContext cs_verification_context: The VerificationContext for + verifying the chain of the :class:`CounterSignerInfo`. The timestamp is + overridden in the case of a countersigner. Default stores are + TRUSTED_CERTIFICATE_STORE and the certificates of this :class:`SignedData` + object. EKU is time_stamping + :param CertificateStore trusted_certificate_store: A :class:`CertificateStore` + object that contains a list of trusted certificates to be used when + :const:`None` is passed to either ``verification_context`` or ``cs_verification_context`` and a :class:`VerificationContext` is created. - :param dict verification_context_kwargs: If provided, keyword arguments that are passed to the instantiation of - :class:`VerificationContext` s created in this function. Used for e.g. providing a timestamp. - :param str countersignature_mode: Changes how countersignatures are handled. Defaults to 'strict', which means - that errors in the countersignature result in verification failure. If set to 'permit', the - countersignature is checked, but when it errors, it is verified as if the countersignature was never set. - When set to 'ignore', countersignatures are never checked. + :param dict verification_context_kwargs: If provided, keyword arguments that + are passed to the instantiation of :class:`VerificationContext` s created + in this function. Used for e.g. providing a timestamp. + :param str countersignature_mode: Changes how countersignatures are handled. + Defaults to 'strict', which means that errors in the countersignature + result in verification failure. If set to 'permit', the countersignature is + checked, but when it errors, it is verified as if the countersignature was + never set. When set to 'ignore', countersignatures are never checked. :raises AuthenticodeVerificationError: when the verification failed :return: :const:`None` """ @@ -399,16 +470,23 @@ def verify( **verification_context_kwargs, ) - if cs_verification_context is None and self.signer_info.countersigner and countersignature_mode != "ignore": + if ( + cs_verification_context is None + and self.signer_info.countersigner + and countersignature_mode != "ignore" + ): cs_verification_context = VerificationContext( trusted_certificate_store, self.certificates, extended_key_usages=["time_stamping"], **verification_context_kwargs, ) - # Add the local certificate store for the countersignature (in the case of RFC3161SignedData) + # Add the local certificate store for the countersignature + # (in the case of RFC3161SignedData) if hasattr(self.signer_info.countersigner, "certificates"): - cs_verification_context.add_store(self.signer_info.countersigner.certificates) + cs_verification_context.add_store( + self.signer_info.countersigner.certificates + ) # Check that the digest algorithms match if self.digest_algorithm != self.spc_info.digest_algorithm: @@ -430,7 +508,9 @@ def verify( expected_hash = fingerprinter.hash()[self.digest_algorithm().name] if expected_hash != self.spc_info.digest: - raise AuthenticodeInvalidDigestError("The expected hash does not match the digest in SpcInfo") + raise AuthenticodeInvalidDigestError( + "The expected hash does not match the digest in SpcInfo" + ) # 2. The hash of the spc blob # According to RFC2315, 9.3, identifier (tag) and length need to be @@ -438,11 +518,15 @@ def verify( # out the SEQUENCE part of the spcIndirectData. # Alternatively this could be done by re-encoding and concatenating # the individual elements in spc_value, I _think_. - _, hashable_spc_blob = ber_decoder.decode(self.data["contentInfo"]["content"], recursiveFlag=0) + _, hashable_spc_blob = ber_decoder.decode( + self.data["contentInfo"]["content"], recursiveFlag=0 + ) spc_blob_hasher = self.digest_algorithm() spc_blob_hasher.update(bytes(hashable_spc_blob)) if spc_blob_hasher.digest() != self.signer_info.message_digest: - raise AuthenticodeInvalidDigestError("The expected hash of the SpcInfo does not match SignerInfo") + raise AuthenticodeInvalidDigestError( + "The expected hash of the SpcInfo does not match SignerInfo" + ) # Can't check authAttr hash against encrypted hash, done implicitly in # M2's pubkey.verify. @@ -453,43 +537,59 @@ def verify( try: # 3. Check the countersigner hash. # Make sure to use the same digest_algorithm that the countersigner used - if not self.signer_info.countersigner.check_message_digest(self.signer_info.encrypted_digest): + if not self.signer_info.countersigner.check_message_digest( + self.signer_info.encrypted_digest + ): raise AuthenticodeCounterSignerError( - "The expected hash of the encryptedDigest does not match countersigner's SignerInfo" + "The expected hash of the encryptedDigest does not match" + " countersigner's SignerInfo" ) - cs_verification_context.timestamp = self.signer_info.countersigner.signing_time + cs_verification_context.timestamp = ( + self.signer_info.countersigner.signing_time + ) - # We could be calling SignerInfo.verify or RFC3161SignedData.verify here, but those have identical - # signatures. Note that RFC3161SignedData accepts a trusted_certificate_store argument, but we pass in - # an explicit context anyway + # We could be calling SignerInfo.verify or RFC3161SignedData.verify + # here, but those have identical signatures. Note that + # RFC3161SignedData accepts a trusted_certificate_store argument, but + # we pass in an explicit context anyway self.signer_info.countersigner.verify(cs_verification_context) except Exception as e: if countersignature_mode != "strict": pass else: raise AuthenticodeCounterSignerError( - "An error occurred while validating the countersignature: {}".format(e) + "An error occurred while validating the countersignature: {}" + .format(e) ) else: - # If no errors occur, we should be fine setting the timestamp to the countersignature's timestamp - verification_context.timestamp = self.signer_info.countersigner.signing_time + # If no errors occur, we should be fine setting the timestamp to the + # countersignature's timestamp + verification_context.timestamp = ( + self.signer_info.countersigner.signing_time + ) self.signer_info.verify(verification_context) - def explain_verify(self, *args: Any, **kwargs: Any) -> tuple[AuthenticodeVerificationResult, Exception | None]: - """This will return a value indicating the signature status of this object. This will not raise an error - when the verification fails, but rather indicate this through the resulting enum + def explain_verify( + self, *args: Any, **kwargs: Any + ) -> tuple[AuthenticodeVerificationResult, Exception | None]: + """This will return a value indicating the signature status of this object. + This will not raise an error when the verification fails, but rather indicate + this through the resulting enum :rtype: Tuple[AuthenticodeVerificationResult, Exception] - :return: The verification result, and the exception containing more details (if available or None) + :return: The verification result, and the exception containing more details + (if available or None) """ return AuthenticodeVerificationResult.call(self.verify, *args, **kwargs) class RFC3161SignerInfo(SignerInfo): - """Subclass of SignerInfo that is used to contain the signerinfo for the RFC3161SignedData option.""" + """Subclass of SignerInfo that is used to contain the signerinfo for the + RFC3161SignedData option. + """ _expected_content_type = rfc3161.TSTInfo _required_authenticated_attributes = (rfc2315.ContentType, rfc2315.Digest) @@ -497,8 +597,8 @@ class RFC3161SignerInfo(SignerInfo): class TSTInfo: - """This is an implementation of the TSTInfo class as defined by RFC3161, used as content for a SignedData structure. - The following properties are available: + """This is an implementation of the TSTInfo class as defined by RFC3161, used as + content for a SignedData structure. The following properties are available: .. attribute:: data @@ -550,26 +650,32 @@ def __init__(self, data: rfc3161.TSTInfo): def _parse(self) -> None: if self.data["version"] != 1: - raise AuthenticodeParseError("TSTInfo.version must be 1, not %d" % self.data["version"]) + raise AuthenticodeParseError( + "TSTInfo.version must be 1, not %d" % self.data["version"] + ) self.policy = self.data["policy"] # TODO self.hash_algorithm = _get_digest_algorithm( - self.data["messageImprint"]["hashAlgorithm"], location="TSTInfo.messageImprint.hashAlgorithm" + self.data["messageImprint"]["hashAlgorithm"], + location="TSTInfo.messageImprint.hashAlgorithm", ) self.message_digest = bytes(self.data["messageImprint"]["hashedMessage"]) self.serial_number = self.data["serialNumber"] self.signing_time = self.data["genTime"].asDateTime self.signing_time_accuracy = accuracy_to_python(self.data["accuracy"]) # TODO handle case where directoryName is not a rdnSequence - self.signing_authority = CertificateName(self.data["tsa"]["directoryName"]["rdnSequence"]) + self.signing_authority = CertificateName( + self.data["tsa"]["directoryName"]["rdnSequence"] + ) class RFC3161SignedData(SignedData): - """Some samples have shown to include a RFC-3161 countersignature in the unauthenticated attributes - (as OID 1.3.6.1.4.1.311.3.3.1, which is in the Microsoft private namespace). This attribute contains its own - signed data structure. + """Some samples have shown to include a RFC-3161 countersignature in the + unauthenticated attributes (as OID 1.3.6.1.4.1.311.3.3.1, which is in the Microsoft + private namespace). This attribute contains its own signed data structure. - This is a subclass of :class:`signify.pkcs7.SignedData`, containing a RFC3161 TSTInfo in its content field. + This is a subclass of :class:`signify.pkcs7.SignedData`, containing a RFC3161 + TSTInfo in its content field. .. attribute:: tst_info :type: TSTInfo @@ -590,18 +696,23 @@ def _parse(self) -> None: # signerInfos if len(self.signer_infos) != 1: raise AuthenticodeParseError( - "RFC3161 SignedData.signerInfos must contain exactly 1 signer, not %d" % len(self.signer_infos) + "RFC3161 SignedData.signerInfos must contain exactly 1 signer, not %d" + % len(self.signer_infos) ) self.signer_info = self.signer_infos[0] @property def signing_time(self) -> datetime.datetime: - """Transparent attribute to ensure that the signing_time attribute is consistently available.""" + """Transparent attribute to ensure that the signing_time attribute is + consistently available. + """ return self.tst_info.signing_time def check_message_digest(self, data: bytes) -> bool: - """Given the data, returns whether the hash_algorithm and message_digest match the data provided.""" + """Given the data, returns whether the hash_algorithm and message_digest match + the data provided. + """ auth_attr_hasher = self.tst_info.hash_algorithm() auth_attr_hasher.update(data) @@ -613,32 +724,43 @@ def verify( *, trusted_certificate_store: CertificateStore = TRUSTED_CERTIFICATE_STORE, ) -> Iterable[Iterable[Certificate]]: - """Verifies the RFC3161 SignedData object. The context that is passed in must account for the certificate - store of this object, or be left None. + """Verifies the RFC3161 SignedData object. The context that is passed in must + account for the certificate store of this object, or be left None. - The object is verified by verifying that the hash of the :class:`TSTInfo` matches the - :attr:`SignerInfo.message_digest` value. The remainder of the validation is done by calling - :meth:`SignerInfo.verify` + The object is verified by verifying that the hash of the :class:`TSTInfo` + matches the :attr:`SignerInfo.message_digest` value. The remainder of the + validation is done by calling :meth:`SignerInfo.verify` """ - # We should ensure that the hash in the SignerInfo matches the hash of the content - # This is similar to the normal verification process, where the SpcInfo is verified - # Note that the mapping between the RFC3161 SignedData object is ensured by the verifier in SignedData + # We should ensure that the hash in the SignerInfo matches the hash of the + # content. This is similar to the normal verification process, where the + # SpcInfo is verified. Note that the mapping between the RFC3161 SignedData + # object is ensured by the verifier in SignedData blob_hasher = self.digest_algorithm() blob_hasher.update(bytes(self.data["encapContentInfo"]["eContent"])) if blob_hasher.digest() != self.signer_info.message_digest: - raise AuthenticodeCounterSignerError("The expected hash of the TstInfo does not match SignerInfo") + raise AuthenticodeCounterSignerError( + "The expected hash of the TstInfo does not match SignerInfo" + ) if context is None: context = VerificationContext( - trusted_certificate_store, self.certificates, extended_key_usages=["time_stamping"] + trusted_certificate_store, + self.certificates, + extended_key_usages=["time_stamping"], ) - # The context is set correctly by the 'verify' function, including the current certificate store + # The context is set correctly by the 'verify' function, including the current + # certificate store return self.signer_info.verify(context) if __name__ == "__main__": - print("This is a list of all certificates in the Authenticode trust store, ordered by expiration date") - for i, certificate in enumerate(sorted(TRUSTED_CERTIFICATE_STORE, key=lambda x: x.valid_to), start=1): + print( + "This is a list of all certificates in the Authenticode trust store, ordered by" + " expiration date" + ) + for i, certificate in enumerate( + sorted(TRUSTED_CERTIFICATE_STORE, key=lambda x: x.valid_to), start=1 + ): print(i, certificate.valid_to, certificate) diff --git a/signify/exceptions.py b/signify/exceptions.py index bdc7c73..b7e0b9c 100644 --- a/signify/exceptions.py +++ b/signify/exceptions.py @@ -1,7 +1,6 @@ - - class SignifyError(Exception): """Base class for all errors generated by Signify""" + pass @@ -19,6 +18,7 @@ class SignerInfoParseError(ParseError): class AuthenticodeParseError(ParseError): """Raised when any exception regarding parsing Authenticode structures occurs.""" + pass @@ -40,6 +40,7 @@ class SignerInfoVerificationError(VerificationError): class AuthenticodeVerificationError(VerificationError): """Raised when any exception regarding verifying Authenticode structures occurs.""" + pass diff --git a/signify/fingerprinter.py b/signify/fingerprinter.py index eeae6c3..7227a6a 100644 --- a/signify/fingerprinter.py +++ b/signify/fingerprinter.py @@ -42,13 +42,16 @@ class Finger: """A Finger defines how to hash a file to get specific fingerprints. - The Finger contains one or more hash functions, a set of ranges in the file that are to be processed with these - hash functions, and a description. + The Finger contains one or more hash functions, a set of ranges in the file that + are to be processed with these hash functions, and a description. - While one Finger provides potentially multiple hashers, they all get fed the same ranges of the file. + While one Finger provides potentially multiple hashers, they all get fed the + same ranges of the file. """ - def __init__(self, hashers: list[hashlib._Hash], ranges: list[Range], description: str): + def __init__( + self, hashers: list[hashlib._Hash], ranges: list[Range], description: str + ): """ :param hashers: A list of hashers to feed. @@ -70,15 +73,18 @@ def current_range(self) -> Range | None: def consume(self, start: int, end: int) -> None: """Consumes an entire range, or part thereof. - If the finger has no ranges left, or the current range start is higher than the end of the consumed block, - nothing happens. Otherwise, the current range is adjusted for the consumed block, or removed, if the entire - block is consumed. For things to work, the consumed range and the current finger starts must be equal, and the - length of the consumed range may not exceed the length of the current range. + If the finger has no ranges left, or the current range start is higher than + the end of the consumed block, nothing happens. Otherwise, the current range is + adjusted for the consumed block, or removed, if the entire block is consumed. + For things to work, the consumed range and the current finger starts must be + equal, and the length of the consumed range may not exceed the length of the + current range. :param start: Beginning of range to be consumed. :param end: First offset after the consumed range (end + 1). - :raises RuntimeError: if the start position of the consumed range is higher than the start of the current range - in the finger, or if the consumed range cuts across block boundaries. + :raises RuntimeError: if the start position of the consumed range is higher + than the start of the current range in the finger, or if the consumed + range cuts across block boundaries. """ old = self.current_range @@ -109,8 +115,9 @@ class Fingerprinter: def __init__(self, file_obj: BinaryIO, block_size: int = 1000000): """A Fingerprinter is an interface to generate hashes of (parts) of a file. - It is passed in a file object and given a set of :class:`Finger` s that define how a file must be hashed. It is - a generic approach to not hashing parts of a file. + It is passed in a file object and given a set of :class:`Finger` s that define + how a file must be hashed. It is a generic approach to not hashing parts of a + file. :param file_obj: A file opened in bytes-mode :param block_size: The block size used to feed to the hashers. @@ -125,14 +132,19 @@ def __init__(self, file_obj: BinaryIO, block_size: int = 1000000): self._fingers: list[Finger] = [] def add_hashers( - self, *hashers: HashFunction, ranges: list[Range] | None = None, description: str = "generic" + self, + *hashers: HashFunction, + ranges: list[Range] | None = None, + description: str = "generic", ) -> None: """Add hash methods to the fingerprinter. - :param hashers: A list of hashers to add to the Fingerprinter. This generally will be hashlib functions. - :param ranges: A list of Range objects that the hashers should hash. If set to :const:`None`, it is set to the - entire file. - :param description: The name for the hashers. This name will return in :meth:`hashes` + :param hashers: A list of hashers to add to the Fingerprinter. This generally + will be hashlib functions. + :param ranges: A list of Range objects that the hashers should hash. If set + to :const:`None`, it is set to the entire file. + :param description: The name for the hashers. This name will return in + :meth:`hashes` """ concrete_hashers = [x() for x in hashers] if not ranges: @@ -149,8 +161,7 @@ def _next_interval(self) -> Range | None: lowest uninterrupted range of interest. If the range is larger than self.block_size, truncate it. - Returns: - Next range of interest in a Range namedtuple. + :returns: Next range of interest in a Range namedtuple. """ starts = set([x.current_range.start for x in self._fingers if x.current_range]) ends = set([x.current_range.end for x in self._fingers if x.current_range]) @@ -177,13 +188,10 @@ def _hash_block(self, block: bytes, start: int, end: int) -> None: Start and end are used to validate the expected ranges, to catch unexpected use of that logic. - Args: - block: The data block. - start: Beginning offset of this block. - end: Offset of the next byte after the block. - - Raises: - RuntimeError: If the provided and expected ranges don't match. + :param block: The data block. + :param start: Beginning offset of this block. + :param offset: Offset of the next byte after the block. + :raises RuntimeError: If the provided and expected ranges don't match. """ for finger in self._fingers: expected_range = finger.current_range @@ -205,13 +213,15 @@ def _consume(self, start: int, end: int) -> None: def hashes(self) -> dict[str, dict[str, bytes]]: """Finalizing function for the Fingerprint class. - This method applies all the different hash functions over the previously specified different ranges of the - input file, and computes the resulting hashes. + This method applies all the different hash functions over the previously + specified different ranges of the input file, and computes the resulting hashes. - After calling this function, the state of the object is reset to its initial state, with no fingers defined. + After calling this function, the state of the object is reset to its initial + state, with no fingers defined. - :returns: A dict of dicts, the outer dict being a mapping of the description (as set in :meth:`add_hashers` - and the inner dict being a mapping of hasher name to digest. + :returns: A dict of dicts, the outer dict being a mapping of the description + (as set in :meth:`add_hashers` and the inner dict being a mapping of hasher + name to digest. :raises RuntimeError: when internal inconsistencies occur. """ while True: @@ -229,7 +239,9 @@ def hashes(self) -> dict[str, dict[str, bytes]]: for finger in self._fingers: leftover = finger.current_range if leftover and ( - len(finger._ranges) > 1 or leftover.start != self._filelength or leftover.end != self._filelength + len(finger._ranges) > 1 + or leftover.start != self._filelength + or leftover.end != self._filelength ): raise RuntimeError("Non-empty range remains.") @@ -243,9 +255,11 @@ def hashes(self) -> dict[str, dict[str, bytes]]: return results def hash(self) -> dict[str, bytes]: - """Very similar to :meth:`hashes`, but only returns a single dict of hash names to digests. + """Very similar to :meth:`hashes`, but only returns a single dict of hash names + to digests. - This method can only be called when the :meth:`add_hashers` method was called exactly once. + This method can only be called when the :meth:`add_hashers` method was called + exactly once. """ hashes = self.hashes() if len(hashes) != 1: @@ -255,11 +269,13 @@ def hash(self) -> dict[str, bytes]: class AuthenticodeFingerprinter(Fingerprinter): - """An extension of the :class:`Fingerprinter` class that enables the calculation of authentihashes of PE Files.""" + """An extension of the :class:`Fingerprinter` class that enables the calculation of + authentihashes of PE Files. + """ def add_authenticode_hashers(self, *hashers: HashFunction) -> bool: - """Specialized method of :meth:`add_hashers` to add hashers with ranges limited to those that are needed to - calculate the hash of signed PE Files. + """Specialized method of :meth:`add_hashers` to add hashers with ranges limited + to those that are needed to calculate the hash of signed PE Files. """ pefile = signed_pe.SignedPEFile(self.file) @@ -284,8 +300,12 @@ def main(*filenames: str) -> None: print("{}:".format(filename)) with open(filename, "rb") as file_obj: fingerprinter = AuthenticodeFingerprinter(file_obj) - fingerprinter.add_hashers(hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512) - fingerprinter.add_authenticode_hashers(hashlib.md5, hashlib.sha1, hashlib.sha256) + fingerprinter.add_hashers( + hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512 + ) + fingerprinter.add_authenticode_hashers( + hashlib.md5, hashlib.sha1, hashlib.sha256 + ) results = fingerprinter.hashes() for description, result in sorted(results.items()): @@ -294,7 +314,11 @@ def main(*filenames: str) -> None: for k, v in sorted(result.items()): if k == "_": continue - print(" {k:<10}: {v}".format(k=k, v=binascii.hexlify(v).decode("ascii"))) + print( + " {k:<10}: {v}".format( + k=k, v=binascii.hexlify(v).decode("ascii") + ) + ) if __name__ == "__main__": diff --git a/signify/pkcs7/signeddata.py b/signify/pkcs7/signeddata.py index 4dac236..83556f1 100644 --- a/signify/pkcs7/signeddata.py +++ b/signify/pkcs7/signeddata.py @@ -19,8 +19,8 @@ class SignedData: - """A generic SignedData object. The SignedData object is defined in RFC2315 and RFC5652 (amongst others) and - defines data that is signed by one or more signers. + """A generic SignedData object. The SignedData object is defined in RFC2315 and + RFC5652 (amongst others) and defines data that is signed by one or more signers. It is based on the following ASN.1 object (as per RFC2315):: @@ -41,7 +41,8 @@ class SignedData: .. attribute:: digest_algorithm - The digest algorithm, i.e. the hash algorithm, that is used by the signers of the data. + The digest algorithm, i.e. the hash algorithm, that is used by the signers of + the data. .. attribute:: content_type @@ -54,14 +55,15 @@ class SignedData: .. attribute:: certificates :type: CertificateStore - A list of all included certificates in the SignedData. These can be used to determine a valid validation path - from the signer to a root certificate. + A list of all included certificates in the SignedData. These can be used to + determine a valid validation path from the signer to a root certificate. .. attribute:: signer_infos :type: List[SignerInfo] A list of all included SignerInfo objects """ + data: rfc2315.SignedData | rfc5652.SignedData digest_algorithm: HashFunction content_type: Type[Asn1Type] | OidTuple @@ -92,10 +94,10 @@ def from_envelope(cls, data: bytes, *args: Any, **kwargs: Any) -> Self: """ # This one is not guarded, which is intentional content, rest = ber_decoder.decode(data, asn1Spec=rfc2315.ContentInfo()) - if asn1.oids.get(content['contentType']) is not rfc2315.SignedData: + if asn1.oids.get(content["contentType"]) is not rfc2315.SignedData: raise ParseError("ContentInfo does not contain SignedData") - data = guarded_ber_decode(content['content'], asn1_spec=rfc2315.SignedData()) + data = guarded_ber_decode(content["content"], asn1_spec=rfc2315.SignedData()) signed_data = cls(data, *args, **kwargs) signed_data._rest_data = rest # type: ignore[attr-defined] @@ -103,33 +105,53 @@ def from_envelope(cls, data: bytes, *args: Any, **kwargs: Any) -> Self: def _parse(self) -> None: # digestAlgorithms - if len(self.data['digestAlgorithms']) != 1: - raise ParseError("SignedData.digestAlgorithms must contain exactly 1 algorithm, not %d" % - len(self.data['digestAlgorithms'])) - self.digest_algorithm = _get_digest_algorithm(self.data['digestAlgorithms'][0], "SignedData.digestAlgorithm") + if len(self.data["digestAlgorithms"]) != 1: + raise ParseError( + "SignedData.digestAlgorithms must contain exactly 1 algorithm, not %d" + % len(self.data["digestAlgorithms"]) + ) + self.digest_algorithm = _get_digest_algorithm( + self.data["digestAlgorithms"][0], "SignedData.digestAlgorithm" + ) # contentType if isinstance(self.data, rfc2315.SignedData): - self.content_type = asn1.oids.get(self.data['contentInfo']['contentType']) - content = self.data['contentInfo']['content'] + self.content_type = asn1.oids.get(self.data["contentInfo"]["contentType"]) + content = self.data["contentInfo"]["content"] elif isinstance(self.data, rfc5652.SignedData): - self.content_type = asn1.oids.get(self.data['encapContentInfo']['eContentType']) - content = self.data['encapContentInfo']['eContent'] + self.content_type = asn1.oids.get( + self.data["encapContentInfo"]["eContentType"] + ) + content = self.data["encapContentInfo"]["eContent"] else: - raise ParseError("Unknown SignedData data type {}".format(_print_type(self.data))) + raise ParseError( + "Unknown SignedData data type {}".format(_print_type(self.data)) + ) if self.content_type is not self._expected_content_type: - raise ParseError("SignedData.contentInfo does not contain %s" % _print_type(self._expected_content_type)) + raise ParseError( + "SignedData.contentInfo does not contain %s" + % _print_type(self._expected_content_type) + ) # Content - self.content = guarded_ber_decode(content, asn1_spec=self._expected_content_type()) + self.content = guarded_ber_decode( + content, asn1_spec=self._expected_content_type() + ) # Certificates self.certificates = CertificateStore( - [Certificate(cert) for cert in self.data['certificates'] if Certificate.is_certificate(cert)] + [ + Certificate(cert) + for cert in self.data["certificates"] + if Certificate.is_certificate(cert) + ] ) # SignerInfo if self._signerinfo_class is not None: assert not isinstance(self._signerinfo_class, str) - self.signer_infos = [self._signerinfo_class(si, parent=self) for si in self.data['signerInfos']] + self.signer_infos = [ + self._signerinfo_class(si, parent=self) + for si in self.data["signerInfos"] + ] diff --git a/signify/pkcs7/signerinfo.py b/signify/pkcs7/signerinfo.py index f1bb43d..b90fbb6 100644 --- a/signify/pkcs7/signerinfo.py +++ b/signify/pkcs7/signerinfo.py @@ -13,7 +13,11 @@ from signify.pkcs7 import signeddata from signify.asn1 import guarded_ber_decode, pkcs7 from signify.asn1 import preserving_der as preserving_der_encoder -from signify.exceptions import VerificationError, SignerInfoParseError, SignerInfoVerificationError +from signify.exceptions import ( + VerificationError, + SignerInfoParseError, + SignerInfoVerificationError, +) from signify import asn1, _print_type from signify.asn1.helpers import time_to_python from signify.x509 import VerificationContext @@ -21,8 +25,8 @@ class SignerInfo: - """The SignerInfo class is defined in RFC2315 and RFC5652 (amongst others) and defines the per-signer information - in a :class:`SignedData` structure. + """The SignerInfo class is defined in RFC2315 and RFC5652 (amongst others) and + defines the per-signer information in a :class:`SignedData` structure. It is based on the following ASN.1 object (as per RFC2315):: @@ -44,12 +48,14 @@ class SignerInfo: .. attribute:: parent - The parent :class:`SignedData` object (or if other SignerInfos are present, it may be another object) + The parent :class:`SignedData` object (or if other SignerInfos are present, it + may be another object) .. attribute:: issuer :type: CertificateName - The issuer of the SignerInfo, i.e. the certificate of the signer of the SignedData object. + The issuer of the SignerInfo, i.e. the certificate of the signer of the + SignedData object. .. attribute:: serial_number @@ -57,18 +63,20 @@ class SignerInfo: .. attribute:: digest_algorithm - The digest algorithm, i.e. the hash algorithm, under which the content and the authenticated attributes are + The digest algorithm, i.e. the hash algorithm, under which the content and the + authenticated attributes are signed. .. attribute:: authenticated_attributes unauthenticated_attributes - A SignerInfo object can contain both signed and unsigned attributes. These contain additional information - about the signature, but also the content type and message digest. The difference between signed and unsigned - is that unsigned attributes are not validated. + A SignerInfo object can contain both signed and unsigned attributes. These + contain additional information about the signature, but also the content type + and message digest. The difference between signed and unsigned is that unsigned + attributes are not validated. - The type of this attribute is a dictionary. You should not need to access this value directly, rather using - one of the attributes listed below. + The type of this attribute is a dictionary. You should not need to access this + value directly, rather using one of the attributes listed below. .. attribute:: digest_encryption_algorithm @@ -76,10 +84,12 @@ class SignerInfo: .. attribute:: encrypted_digest - The result of encrypting the message digest and associated information with the signer's private key. + The result of encrypting the message digest and associated information with the + signer's private key. - The following attributes are automatically parsed and added to the list of attributes if present. + The following attributes are automatically parsed and added to the list of + attributes if present. .. attribute:: message_digest @@ -87,16 +97,18 @@ class SignerInfo: .. attribute:: content_type - This is an authenticated attribute, containing the content type of the content being signed. + This is an authenticated attribute, containing the content type of the content + being signed. .. attribute:: signing_time - This is an authenticated attribute, containing the timestamp of signing. Note that this should only be present in - countersigner objects. + This is an authenticated attribute, containing the timestamp of signing. Note + that this should only be present in countersigner objects. .. attribute:: countersigner - This is an unauthenticated attribute, containing the countersigner of the SignerInfo. + This is an unauthenticated attribute, containing the countersigner of the + SignerInfo. """ @@ -113,10 +125,17 @@ class SignerInfo: countersigner: CounterSignerInfo | None _countersigner_class: Type[CounterSignerInfo] | str | None = "CounterSignerInfo" - _required_authenticated_attributes: Iterable[univ.ObjectIdentifier] = (rfc2315.ContentType, rfc2315.Digest) + _required_authenticated_attributes: Iterable[univ.ObjectIdentifier] = ( + rfc2315.ContentType, + rfc2315.Digest, + ) _expected_content_type: Type[univ.Sequence] | None = None - def __init__(self, data: rfc2315.SignerInfo | rfc5652.SignerInfo, parent: signeddata.SignedData | None = None): + def __init__( + self, + data: rfc2315.SignerInfo | rfc5652.SignerInfo, + parent: signeddata.SignedData | None = None, + ): """ :param data: The ASN.1 structure of the SignerInfo. :param parent: The parent :class:`SignedData` object. @@ -130,39 +149,59 @@ def __init__(self, data: rfc2315.SignerInfo | rfc5652.SignerInfo, parent: signed def _parse(self) -> None: if self.data["version"] != 1: - raise SignerInfoParseError("SignerInfo.version must be 1, not %d" % self.data["version"]) + raise SignerInfoParseError( + "SignerInfo.version must be 1, not %d" % self.data["version"] + ) # We can handle several different rfc types here if isinstance(self.data, rfc2315.SignerInfo): - self.issuer = CertificateName(self.data["issuerAndSerialNumber"]["issuer"][0]) + self.issuer = CertificateName( + self.data["issuerAndSerialNumber"]["issuer"][0] + ) self.serial_number = self.data["issuerAndSerialNumber"]["serialNumber"] self.authenticated_attributes = self._parse_attributes( - self.data["authenticatedAttributes"], required=self._required_authenticated_attributes + self.data["authenticatedAttributes"], + required=self._required_authenticated_attributes, + ) + self._encoded_authenticated_attributes = self._encode_attributes( + self.data["authenticatedAttributes"] ) - self._encoded_authenticated_attributes = self._encode_attributes(self.data["authenticatedAttributes"]) - self.unauthenticated_attributes = self._parse_attributes(self.data["unauthenticatedAttributes"]) + self.unauthenticated_attributes = self._parse_attributes( + self.data["unauthenticatedAttributes"] + ) self.digest_encryption_algorithm = _get_encryption_algorithm( - self.data["digestEncryptionAlgorithm"], location="SignerInfo.digestEncryptionAlgorithm" + self.data["digestEncryptionAlgorithm"], + location="SignerInfo.digestEncryptionAlgorithm", ) self.encrypted_digest = bytes(self.data["encryptedDigest"]) elif isinstance(self.data, rfc5652.SignerInfo): # TODO: handle case where sid contains key identifier - self.issuer = CertificateName(self.data["sid"]["issuerAndSerialNumber"]["issuer"][0]) - self.serial_number = self.data["sid"]["issuerAndSerialNumber"]["serialNumber"] + self.issuer = CertificateName( + self.data["sid"]["issuerAndSerialNumber"]["issuer"][0] + ) + self.serial_number = self.data["sid"]["issuerAndSerialNumber"][ + "serialNumber" + ] self.authenticated_attributes = self._parse_attributes( - self.data["signedAttrs"], required=self._required_authenticated_attributes + self.data["signedAttrs"], + required=self._required_authenticated_attributes, + ) + self._encoded_authenticated_attributes = self._encode_attributes( + self.data["signedAttrs"] ) - self._encoded_authenticated_attributes = self._encode_attributes(self.data["signedAttrs"]) - self.unauthenticated_attributes = self._parse_attributes(self.data["unsignedAttrs"]) + self.unauthenticated_attributes = self._parse_attributes( + self.data["unsignedAttrs"] + ) self.digest_encryption_algorithm = _get_encryption_algorithm( - self.data["signatureAlgorithm"], location="SignerInfo.signatureAlgorithm" + self.data["signatureAlgorithm"], + location="SignerInfo.signatureAlgorithm", ) self.encrypted_digest = bytes(self.data["signature"]) @@ -178,43 +217,72 @@ def _parse(self) -> None: self.message_digest = None if rfc2315.Digest in self.authenticated_attributes: if len(self.authenticated_attributes[rfc2315.Digest]) != 1: - raise SignerInfoParseError("Only one Digest expected in SignerInfo.authenticatedAttributes") + raise SignerInfoParseError( + "Only one Digest expected in SignerInfo.authenticatedAttributes" + ) - self.message_digest = bytes(self.authenticated_attributes[rfc2315.Digest][0]) + self.message_digest = bytes( + self.authenticated_attributes[rfc2315.Digest][0] + ) # - The contentType self.content_type = None if rfc2315.ContentType in self.authenticated_attributes: if len(self.authenticated_attributes[rfc2315.ContentType]) != 1: - raise SignerInfoParseError("Only one ContentType expected in SignerInfo.authenticatedAttributes") + raise SignerInfoParseError( + "Only one ContentType expected in" + " SignerInfo.authenticatedAttributes" + ) - self.content_type = asn1.oids.get(self.authenticated_attributes[rfc2315.ContentType][0]) + self.content_type = asn1.oids.get( + self.authenticated_attributes[rfc2315.ContentType][0] + ) - if self._expected_content_type is not None and self.content_type is not self._expected_content_type: + if ( + self._expected_content_type is not None + and self.content_type is not self._expected_content_type + ): raise SignerInfoParseError( "Unexpected content type for SignerInfo, expected %s, got %s" - % (_print_type(self._expected_content_type), _print_type(self.content_type)) + % ( + _print_type(self._expected_content_type), + _print_type(self.content_type), + ) ) # - The signingTime (used by countersigner) self.signing_time = None if rfc5652.SigningTime in self.authenticated_attributes: if len(self.authenticated_attributes[rfc5652.SigningTime]) != 1: - raise SignerInfoParseError("Only one SigningTime expected in SignerInfo.authenticatedAttributes") + raise SignerInfoParseError( + "Only one SigningTime expected in" + " SignerInfo.authenticatedAttributes" + ) - self.signing_time = time_to_python(self.authenticated_attributes[rfc5652.SigningTime][0]) + self.signing_time = time_to_python( + self.authenticated_attributes[rfc5652.SigningTime][0] + ) # - The countersigner self.countersigner = None if pkcs7.Countersignature in self.unauthenticated_attributes: if len(self.unauthenticated_attributes[pkcs7.Countersignature]) != 1: - raise SignerInfoParseError("Only one CountersignInfo expected in SignerInfo.unauthenticatedAttributes") + raise SignerInfoParseError( + "Only one CountersignInfo expected in" + " SignerInfo.unauthenticatedAttributes" + ) - assert self._countersigner_class is not None and not isinstance(self._countersigner_class, str) - self.countersigner = self._countersigner_class(self.unauthenticated_attributes[pkcs7.Countersignature][0]) + assert self._countersigner_class is not None and not isinstance( + self._countersigner_class, str + ) + self.countersigner = self._countersigner_class( + self.unauthenticated_attributes[pkcs7.Countersignature][0] + ) def check_message_digest(self, data: bytes) -> bool: - """Given the data, returns whether the hash_algorithm and message_digest match the data provided.""" + """Given the data, returns whether the hash_algorithm and message_digest match + the data provided. + """ auth_attr_hash = self.digest_algorithm() auth_attr_hash.update(data) @@ -223,7 +291,9 @@ def check_message_digest(self, data: bytes) -> bool: @classmethod def _parse_attributes( cls, - data: rfc2315.Attributes | rfc5652.SignedAttributes | rfc5652.UnsignedAttributes, + data: rfc2315.Attributes + | rfc5652.SignedAttributes + | rfc5652.UnsignedAttributes, required: Iterable[univ.ObjectIdentifier] = (), ) -> dict[OidTuple | Type[Asn1Type], list[Any]]: """Given a set of Attributes, parses them and returns them as a dict @@ -257,10 +327,14 @@ def _parse_attributes( @classmethod def _encode_attributes( - cls, data: rfc2315.Attributes | rfc5652.SignedAttributes | rfc5652.UnsignedAttributes + cls, + data: rfc2315.Attributes + | rfc5652.SignedAttributes + | rfc5652.UnsignedAttributes, ) -> bytes: - """Given a set of Attributes, prepares them for creating a digest. It used to sort them by their DER encoded - values, now it is mostly a method to preserve the exact order they where in when they were encoded. + """Given a set of Attributes, prepares them for creating a digest. It used to + sort them by their DER encoded values, now it is mostly a method to preserve + the exact order they where in when they were encoded. :param data: The authenticatedAttributes or unauthenticatedAttributes to encode """ @@ -289,32 +363,41 @@ def _verify_issuer(self, issuer: Certificate, context: VerificationContext) -> N ) except VerificationError as e: raise SignerInfoVerificationError( - "Could not verify {cert} as the signer of the authenticated attributes in {cls}: {exc}".format( - cert=issuer, cls=type(self).__name__, exc=e - ) + "Could not verify {cert} as the signer of the authenticated attributes" + " in {cls}: {exc}".format(cert=issuer, cls=type(self).__name__, exc=e) ) - def _build_chain(self, context: VerificationContext) -> Iterable[Iterable[Certificate]]: - """Given a context, builds a chain up to a trusted certificate. This is a generator function, generating all - valid chains. - - This method will call :meth:`VerificationContext.verify` for all possible candidates. - - :param VerificationContext context: The context for building the chain. Most importantly, contains - all certificates to build the chain from, but also their properties are relevant. - :return: Iterable of all of the valid chains from this SignedInfo up to and including a trusted anchor. - Note that this may be an empty iteration if no candidate parent certificate was found. + def _build_chain( + self, context: VerificationContext + ) -> Iterable[Iterable[Certificate]]: + """Given a context, builds a chain up to a trusted certificate. This is a + generator function, generating all valid chains. + + This method will call :meth:`VerificationContext.verify` for all possible + candidates. + + :param VerificationContext context: The context for building the chain. Most + importantly, contains all certificates to build the chain from, but also + their properties are relevant. + :return: Iterable of all of the valid chains from this SignedInfo up to and + including a trusted anchor. Note that this may be an empty iteration if no + candidate parent certificate was found. :rtype: Iterable[Iterable[Certificate]] - :raises AuthenticodeVerificationError: When :meth:`_verify_issuer` fails or any of the underlying calls to - :meth:`VerificationContext.verify` fails. See the semantics of :meth:`VerificationContext.verify` for when - that may happen. If any error occurs, it is silently swallowed unless no valid chain is found. In that case - the first error that occurred is raised. If no error occurs, no error is raised. + :raises AuthenticodeVerificationError: When :meth:`_verify_issuer` fails or + any of the underlying calls to :meth:`VerificationContext.verify` fails. + See the semantics of :meth:`VerificationContext.verify` for when that may + happen. If any error occurs, it is silently swallowed unless no valid chain + is found. In that case the first error that occurred is raised. If no error + occurs, no error is raised. """ # this loop was designed in the same way that Certificate._build_chain was built - # first_error is None until the first iteration. When it becomes False, we do not need to raise anything. + # first_error is None until the first iteration. When it becomes False, we do + # not need to raise anything. first_error: VerificationError | None | Literal[False] = None - for issuer in context.find_certificates(issuer=self.issuer, serial_number=self.serial_number): + for issuer in context.find_certificates( + issuer=self.issuer, serial_number=self.serial_number + ): try: # _verify_issuer may fail when it is not a valid issuer for this SignedInfo self._verify_issuer(issuer, context) @@ -331,7 +414,8 @@ def _build_chain(self, context: VerificationContext) -> Iterable[Iterable[Certif raise first_error def verify(self, context: VerificationContext) -> Iterable[Iterable[Certificate]]: - """Verifies that this :class:`SignerInfo` verifies up to a chain with the root of a trusted certificate. + """Verifies that this :class:`SignerInfo` verifies up to a chain with the root + of a trusted certificate. :param VerificationContext context: The context for verifying the SignerInfo. :return: A list of valid certificate chains for this SignerInfo. @@ -343,12 +427,16 @@ def verify(self, context: VerificationContext) -> Iterable[Iterable[Certificate] if not chains: raise SignerInfoVerificationError( - "No valid certificate chain found to a trust anchor from {}".format(type(self).__name__) + "No valid certificate chain found to a trust anchor from {}".format( + type(self).__name__ + ) ) return chains - def potential_chains(self, context: VerificationContext) -> Iterable[Iterable[Certificate]]: + def potential_chains( + self, context: VerificationContext + ) -> Iterable[Iterable[Certificate]]: """Retrieves all potential chains from this SignerInfo instance. :param VerificationContext context: The context @@ -356,14 +444,21 @@ def potential_chains(self, context: VerificationContext) -> Iterable[Iterable[Ce :rtype: Iterable[Iterable[Certificate]] """ - for certificate in context.find_certificates(issuer=self.issuer, serial_number=self.serial_number): + for certificate in context.find_certificates( + issuer=self.issuer, serial_number=self.serial_number + ): yield from context.potential_chains(certificate) class CounterSignerInfo(SignerInfo): - """The class CounterSignerInfo is a subclass of :class:`SignerInfo`. It is used as the SignerInfo of a - SignerInfo, containing the timestamp the SignerInfo was created on. This normally works by sending the digest of the - SignerInfo to an external trusted service, that will include a signed time in its response. + """The class CounterSignerInfo is a subclass of :class:`SignerInfo`. It is used as + the SignerInfo of a SignerInfo, containing the timestamp the SignerInfo was created + on. This normally works by sending the digest of the SignerInfo to an external + trusted service, that will include a signed time in its response. """ - _required_authenticated_attributes = (rfc2315.ContentType, rfc5652.SigningTime, rfc2315.Digest) + _required_authenticated_attributes = ( + rfc2315.ContentType, + rfc5652.SigningTime, + rfc2315.Digest, + ) diff --git a/signify/x509/__init__.py b/signify/x509/__init__.py index b8c5807..f6683d3 100644 --- a/signify/x509/__init__.py +++ b/signify/x509/__init__.py @@ -2,4 +2,10 @@ from .context import CertificateStore, FileSystemCertificateStore, VerificationContext -__all__ = ["Certificate", "CertificateName", "CertificateStore", "FileSystemCertificateStore", "VerificationContext"] +__all__ = [ + "Certificate", + "CertificateName", + "CertificateStore", + "FileSystemCertificateStore", + "VerificationContext", +] diff --git a/signify/x509/certificates.py b/signify/x509/certificates.py index 9528178..d871f20 100644 --- a/signify/x509/certificates.py +++ b/signify/x509/certificates.py @@ -25,7 +25,9 @@ logger = logging.getLogger(__name__) -AlgorithmIdentifier = collections.namedtuple("AlgorithmIdentifier", "algorithm parameters") +AlgorithmIdentifier = collections.namedtuple( + "AlgorithmIdentifier", "algorithm parameters" +) class Certificate: @@ -90,7 +92,8 @@ def __init__( ): """ - :type data: asn1.pkcs7.ExtendedCertificateOrCertificate or asn1.x509.Certificate or asn1.x509.TBSCertificate + :type data: asn1.pkcs7.ExtendedCertificateOrCertificate or + asn1.x509.Certificate or asn1.x509.TBSCertificate :param data: The ASN.1 structure """ @@ -106,30 +109,43 @@ def is_certificate(cls, data: Any) -> bool: def _parse(self) -> None: if isinstance(self.data, rfc5652.CertificateChoices): if "extendedCertificate" in self.data: - raise NotImplementedError("Support for extendedCertificate is not implemented") + raise NotImplementedError( + "Support for extendedCertificate is not implemented" + ) if "certificate" not in self.data: raise NotImplementedError( - "This is not a certificate, probably an attribute certificate (containing no public key)" + "This is not a certificate, probably an attribute certificate" + " (containing no public key)" ) certificate = self.data["certificate"] self.signature_algorithm = certificate["signatureAlgorithm"] self.signature_value = ( - certificate["signatureValue"] if "signatureValue" in certificate else certificate["signature"] + certificate["signatureValue"] + if "signatureValue" in certificate + else certificate["signature"] ) tbs_certificate = certificate["tbsCertificate"] elif isinstance( - self.data, (rfc2315.ExtendedCertificateOrCertificate, rfc5652.ExtendedCertificateOrCertificate) + self.data, + ( + rfc2315.ExtendedCertificateOrCertificate, + rfc5652.ExtendedCertificateOrCertificate, + ), ): if "extendedCertificate" in self.data: # TODO: Not sure if needed. - raise NotImplementedError("Support for extendedCertificate is not implemented") + raise NotImplementedError( + "Support for extendedCertificate is not implemented" + ) certificate = self.data["certificate"] self.signature_algorithm = certificate["signatureAlgorithm"] self.signature_value = ( - certificate["signatureValue"] if "signatureValue" in certificate else certificate["signature"] + certificate["signatureValue"] + if "signatureValue" in certificate + else certificate["signature"] ) tbs_certificate = certificate["tbsCertificate"] @@ -137,7 +153,9 @@ def _parse(self) -> None: certificate = self.data self.signature_algorithm = certificate["signatureAlgorithm"] self.signature_value = ( - certificate["signatureValue"] if "signatureValue" in certificate else certificate["signature"] + certificate["signatureValue"] + if "signatureValue" in certificate + else certificate["signature"] ) tbs_certificate = certificate["tbsCertificate"] @@ -148,8 +166,8 @@ def _parse(self) -> None: self.serial_number = int(tbs_certificate["serialNumber"]) self.issuer = CertificateName(tbs_certificate["issuer"][0]) - # the following two ifs are here because time_to_python may return None and we want to prevent Nones - # in these keys + # the following two ifs are here because time_to_python may return None and we + # want to prevent Nones in these keys valid_from = time_to_python(tbs_certificate["validity"]["notBefore"]) if valid_from: self.valid_from = valid_from @@ -160,21 +178,35 @@ def _parse(self) -> None: self.subject_public_algorithm = AlgorithmIdentifier( algorithm=tbs_certificate["subjectPublicKeyInfo"]["algorithm"]["algorithm"], - parameters=bytes(tbs_certificate["subjectPublicKeyInfo"]["algorithm"]["parameters"]), + parameters=bytes( + tbs_certificate["subjectPublicKeyInfo"]["algorithm"]["parameters"] + ), + ) + self.subject_public_key = bitstring_to_bytes( + tbs_certificate["subjectPublicKeyInfo"]["subjectPublicKey"] ) - self.subject_public_key = bitstring_to_bytes(tbs_certificate["subjectPublicKeyInfo"]["subjectPublicKey"]) self.extensions = {} if "extensions" in tbs_certificate and tbs_certificate["extensions"].isValue: for extension in tbs_certificate["extensions"]: - self.extensions[asn1.oids.get(extension["extnID"])] = extension["extnValue"] + self.extensions[asn1.oids.get(extension["extnID"])] = extension[ + "extnValue" + ] def __str__(self) -> str: - return "{} (serial:{}, sha1:{})".format(self.subject.dn, self.serial_number, self.sha1_fingerprint) + return "{} (serial:{}, sha1:{})".format( + self.subject.dn, self.serial_number, self.sha1_fingerprint + ) def __hash__(self) -> int: return hash( - (self.issuer, self.serial_number, self.subject, self.subject_public_algorithm, self.subject_public_key) + ( + self.issuer, + self.serial_number, + self.subject, + self.subject_public_algorithm, + self.subject_public_key, + ) ) def __eq__(self, other: object) -> bool: @@ -200,7 +232,9 @@ def from_pem(cls, content: bytes) -> Certificate: @classmethod def from_pems(cls, content: bytes) -> Iterator[Certificate]: """Reads a Certificate from a PEM formatted file.""" - for type_name, headers, der_bytes in asn1crypto.pem.unarmor(content, multiple=True): + for type_name, headers, der_bytes in asn1crypto.pem.unarmor( + content, multiple=True + ): yield cls.from_der(der_bytes) @cached_property @@ -211,7 +245,9 @@ def to_der(self) -> bytes: @cached_property def to_asn1crypto(self) -> asn1crypto.x509.Certificate: """Retrieves the :mod:`asn1crypto` x509 Certificate object.""" - return cast(asn1crypto.x509.Certificate, asn1crypto.x509.Certificate.load(self.to_der)) + return cast( + asn1crypto.x509.Certificate, asn1crypto.x509.Certificate.load(self.to_der) + ) @cached_property def sha256_fingerprint(self) -> str: @@ -222,17 +258,23 @@ def sha1_fingerprint(self) -> str: return cast(str, self.to_asn1crypto.sha1_fingerprint).replace(" ", "").lower() def verify_signature( - self, signature: bytes, data: bytes, algorithm: HashFunction, allow_legacy: bool = False + self, + signature: bytes, + data: bytes, + algorithm: HashFunction, + allow_legacy: bool = False, ) -> None: - """Verifies whether the signature bytes match the data using the hashing algorithm. Supports RSA and EC keys. - Note that not all hashing algorithms are supported. + """Verifies whether the signature bytes match the data using the hashing + algorithm. Supports RSA and EC keys. Note that not all hashing algorithms + are supported. :param bytes signature: The signature to verify :param bytes data: The data that must be verified :type algorithm: a hashlib function :param algorithm: The hashing algorithm to use - :param bool allow_legacy: If True, allows a legacy signature verification. This method is intended for the case - where the encryptedDigest does not contain an ASN.1 structure, but a raw hash value instead. It is attempted + :param bool allow_legacy: If True, allows a legacy signature verification. + This method is intended for the case where the encryptedDigest does not + contain an ASN.1 structure, but a raw hash value instead. It is attempted automatically when verification of the RSA signature fails. This case is described in more detail on @@ -248,25 +290,34 @@ def verify_signature( verify_func = asymmetric.ecdsa_verify else: raise CertificateVerificationError( - "Signature algorithm %s is unsupported for %s" % (public_key.algorithm, self) + "Signature algorithm %s is unsupported for %s" + % (public_key.algorithm, self) ) try: verify_func(public_key, signature, data, algorithm().name) except Exception as e: if not allow_legacy or public_key.algorithm != "rsa": - raise CertificateVerificationError("Invalid signature for %s: %s" % (self, e)) + raise CertificateVerificationError( + "Invalid signature for %s: %s" % (self, e) + ) else: return try: hasher = algorithm() hasher.update(data) - asymmetric.rsa_pkcs1v15_verify(public_key, signature, hasher.digest(), "raw") + asymmetric.rsa_pkcs1v15_verify( + public_key, signature, hasher.digest(), "raw" + ) except Exception as e: - raise CertificateVerificationError("Invalid signature for %s (legacy attempted): %s" % (self, e)) + raise CertificateVerificationError( + "Invalid signature for %s (legacy attempted): %s" % (self, e) + ) - def potential_chains(self, context: x509.VerificationContext) -> Iterator[list[Certificate]]: + def potential_chains( + self, context: x509.VerificationContext + ) -> Iterator[list[Certificate]]: """Alias for :meth:`VerificationContext.potential_chains`""" return context.potential_chains(self) @@ -302,7 +353,9 @@ def dn(self) -> str: # associated with LDAP [4], then the type name string from that table # is used, otherwise it is encoded as the dotted-decimal encoding of # the AttributeType's OBJECT IDENTIFIER. - type = oids.OID_TO_RDN.get(type_value["type"], ".".join(map(str, type_value["type"]))) + type = oids.OID_TO_RDN.get( + type_value["type"], ".".join(map(str, type_value["type"])) + ) value = str(ber_decoder.decode(type_value["value"])[0]) # Escaping according to RFC2253 @@ -327,7 +380,9 @@ def get_components(self, component_type: None = None) -> Iterator[tuple[str, str def get_components(self, component_type: str | OidTuple) -> Iterator[str]: ... - def get_components(self, component_type: str | OidTuple | None = None) -> Iterator[tuple[str, str]] | Iterator[str]: + def get_components( + self, component_type: str | OidTuple | None = None + ) -> Iterator[tuple[str, str]] | Iterator[str]: """Get individual components of this CertificateName :param component_type: if provided, yields only values of this type, @@ -336,11 +391,17 @@ def get_components(self, component_type: str | OidTuple | None = None) -> Iterat for n in self.data[::-1]: type_value = n[0] # get the AttributeTypeAndValue object - type = oids.OID_TO_RDN.get(type_value["type"], ".".join(map(str, type_value["type"]))) + type = oids.OID_TO_RDN.get( + type_value["type"], ".".join(map(str, type_value["type"])) + ) value = str(ber_decoder.decode(type_value["value"])[0]) if component_type is not None: - if component_type in (type_value["type"], ".".join(map(str, type_value["type"])), type): + if component_type in ( + type_value["type"], + ".".join(map(str, type_value["type"])), + type, + ): yield value else: yield type, value diff --git a/signify/x509/context.py b/signify/x509/context.py index f247991..39f0986 100644 --- a/signify/x509/context.py +++ b/signify/x509/context.py @@ -14,7 +14,11 @@ from signify.authenticode import authroot from signify.x509.certificates import Certificate, CertificateName -from signify.exceptions import VerificationError, CertificateVerificationError, CertificateNotTrustedVerificationError +from signify.exceptions import ( + VerificationError, + CertificateVerificationError, + CertificateNotTrustedVerificationError, +) logger = logging.getLogger(__name__) @@ -30,7 +34,8 @@ def __init__( **kwargs: Any, ): """ - :param bool trusted: If true, all certificates that are appended to this structure are set to trusted. + :param bool trusted: If true, all certificates that are appended to this + structure are set to trusted. :param CertificateTrustList ctl: The certificate trust list to use (if any) """ super().__init__(*args, **kwargs) @@ -40,13 +45,18 @@ def __init__( def append(self, elem: Certificate) -> None: return super().append(elem) - def verify_trust(self, chain: list[Certificate], context: VerificationContext | None = None) -> bool: + def verify_trust( + self, chain: list[Certificate], context: VerificationContext | None = None + ) -> bool: """Verifies that the chain is trusted given the context.""" if not self.is_trusted(chain[0]): - # use subclass of CertificateVerificationError to allow VerificationContext.verify_trust to throw a better + # use subclass of CertificateVerificationError to allow + # VerificationContext.verify_trust to throw a better # exception - raise CertificateNotTrustedVerificationError("The certificate %s is not trusted by the store." % chain[0]) + raise CertificateNotTrustedVerificationError( + "The certificate %s is not trusted by the store." % chain[0] + ) if self.ctl is not None: self.ctl.verify_trust(chain, context=context) @@ -54,21 +64,24 @@ def verify_trust(self, chain: list[Certificate], context: VerificationContext | return True def is_trusted(self, certificate: Certificate) -> bool: - """Returns whether the provided certificate is trusted by this certificate store. + """Returns whether the provided certificate is trusted by this certificate + store. .. warning:: - This check does not verify that the certificate is valid according to the Trust List, if set. It merely - checks that the provided certificate is in a trusted certificate store. You still need to verify the chain - for its full trust. + This check does not verify that the certificate is valid according to the + Trust List, if set. It merely checks that the provided certificate is in a + trusted certificate store. You still need to verify the chain for its full + trust. """ return self.trusted and certificate in self def find_certificate(self, **kwargs: Any) -> Certificate: - """Finds the certificate as specified by the keyword arguments. See :meth:`find_certificates` - for all possible arguments. If there is not exactly 1 certificate matching the parameters, i.e. there are zero - or there are multiple, an error is raised. + """Finds the certificate as specified by the keyword arguments. See + :meth:`find_certificates` for all possible arguments. If there is not exactly + 1 certificate matching the parameters, i.e. there are zero or there are + multiple, an error is raised. :rtype: Certificate :raises KeyError: @@ -91,12 +104,15 @@ def find_certificates( issuer: CertificateName | None = None, sha256_fingerprint: str | None = None, ) -> Iterable[Certificate]: - """Finds all certificates given by the specified properties. A property can be omitted by specifying - :const:`None`. Calling this function without arguments is the same as iterating over this store + """Finds all certificates given by the specified properties. A property can be + omitted by specifying :const:`None`. Calling this function without arguments is + the same as iterating over this store - :param CertificateName subject: Certificate subject to look for, as CertificateName + :param CertificateName subject: Certificate subject to look for, as + CertificateName :param int serial_number: Serial number to look for. - :param CertificateName issuer: Certificate issuer to look for, as CertificateName + :param CertificateName issuer: Certificate issuer to look for, as + CertificateName :param str sha256_fingerprint: The SHA-256 fingerprint to look for :rtype: Iterable[Certificate] """ @@ -109,7 +125,8 @@ def find_certificates( if issuer is not None and certificate.issuer != issuer: continue if sha256_fingerprint is not None and ( - certificate.sha256_fingerprint.replace(" ", "").lower() != sha256_fingerprint.replace(" ", "").lower() + certificate.sha256_fingerprint.replace(" ", "").lower() + != sha256_fingerprint.replace(" ", "").lower() ): continue yield certificate @@ -123,7 +140,8 @@ class FileSystemCertificateStore(CertificateStore): def __init__(self, location: pathlib.Path, *args: Any, **kwargs: Any): """ :param pathlib.Path location: The file system location for the certificates. - :param bool trusted: If true, all certificates that are appended to this structure are set to trusted. + :param bool trusted: If true, all certificates that are appended to this + structure are set to trusted. """ super().__init__(*args, **kwargs) @@ -166,29 +184,37 @@ def __init__( crls: Iterable[asn1crypto.crl.CertificateList] | None = None, ocsps: Iterable[asn1crypto.ocsp.OCSPResponse] | None = None, ): - """A context holding properties about the verification of a signature or certificate. - - :param Iterable[CertificateStore] stores: A list of CertificateStore objects that contain certificates - :param datetime.datetime timestamp: The timestamp to verify with. If None, the current time is used. - Must be a timezone-aware timestamp. - :param Iterable[str] key_usages: An iterable with the keyUsages to check for. For valid options, see + """A context holding properties about the verification of a signature or + certificate. + + :param Iterable[CertificateStore] stores: A list of CertificateStore objects + that contain certificates + :param datetime.datetime timestamp: The timestamp to verify with. If None, the + current time is used. Must be a timezone-aware timestamp. + :param Iterable[str] key_usages: An iterable with the keyUsages to check for. + For valid options, see :meth:`certvalidator.CertificateValidator.validate_usage` - :param Iterable[str] extended_key_usages: An iterable with the EKU's to check for. See - :meth:`certvalidator.CertificateValidator.validate_usage` - :param bool optional_eku: If True, sets the extended_key_usages as optionally present in the certificates. - :param bool allow_legacy: If True, allows chain verification if the signature hash algorithm - is very old (e.g. MD2). Additionally, allows the SignedInfo encryptedDigest - to contain an encrypted hash instead of an encrypted DigestInfo ASN.1 structure. Both are found in the wild, + :param Iterable[str] extended_key_usages: An iterable with the EKU's to check + for. See :meth:`certvalidator.CertificateValidator.validate_usage` + :param bool optional_eku: If True, sets the extended_key_usages as optionally + present in the certificates. + :param bool allow_legacy: If True, allows chain verification if the signature + hash algorithm is very old (e.g. MD2). Additionally, allows the + SignedInfo encryptedDigest to contain an encrypted hash instead of an + encrypted DigestInfo ASN.1 structure. Both are found in the wild, but setting to True does reduce the reliability of the verification. - :param str revocation_mode: Can be either soft-fail, hard-fail or require. See the documentation of - :meth:`certvalidator.ValidationContext` for the full definition - :param bool allow_fetching: If True, allows the underlying verification module to obtain CRL and OSCP responses - when needed. + :param str revocation_mode: Can be either soft-fail, hard-fail or require. See + the documentation of :meth:`certvalidator.ValidationContext` for the full + definition + :param bool allow_fetching: If True, allows the underlying verification module + to obtain CRL and OSCP responses when needed. :param int fetch_timeout: The timeout used when fetching CRL/OSCP responses - :param Iterable[asn1crypto.crl.CertificateList] crls: List of :class:`asn1crypto.crl.CertificateList` objects to - aid in verifying revocation statuses. - :param Iterable[asn1crypto.ocsp.OCSPResponse] ocsps: List of :class:`asn1crypto.ocsp.OCSPResponse` objects to - aid in verifying revocation statuses. + :param Iterable[asn1crypto.crl.CertificateList] crls: List of + :class:`asn1crypto.crl.CertificateList` objects to aid in verifying + revocation statuses. + :param Iterable[asn1crypto.ocsp.OCSPResponse] ocsps: List of + :class:`asn1crypto.ocsp.OCSPResponse` objects to aid in verifying + revocation statuses. """ self.stores = list(stores) @@ -217,9 +243,10 @@ def certificates(self) -> Iterator[Certificate]: yield from store def find_certificate(self, **kwargs: Any) -> Certificate: - """Finds the certificate as specified by the keyword arguments. See :meth:`find_certificates` - for all possible arguments. If there is not exactly 1 certificate matching the parameters, i.e. there are zero - or there are multiple, an error is raised. + """Finds the certificate as specified by the keyword arguments. See + :meth:`find_certificates` for all possible arguments. If there is not exactly 1 + certificate matching the parameters, i.e. there are zero or there are + multiple, an error is raised. :rtype: Certificate :raises KeyError: @@ -236,7 +263,8 @@ def find_certificate(self, **kwargs: Any) -> Certificate: def find_certificates(self, **kwargs: Any) -> Iterator[Certificate]: """Finds all certificates given by the specified keyword arguments. See - :meth:`CertificateStore.find_certificates` for a list of all supported arguments. + :meth:`CertificateStore.find_certificates` for a list of all supported + arguments. :rtype: Iterable[Certificate] """ @@ -250,10 +278,14 @@ def find_certificates(self, **kwargs: Any) -> Iterator[Certificate]: seen_certs.append(certificate) yield certificate - def potential_chains(self, certificate: Certificate, depth: int = 10) -> Iterator[list[Certificate]]: - """Returns all possible chains from the provided certificate, solely based on issuer/subject matching. + def potential_chains( + self, certificate: Certificate, depth: int = 10 + ) -> Iterator[list[Certificate]]: + """Returns all possible chains from the provided certificate, solely based on + issuer/subject matching. - **THIS METHOD DOES NOT VERIFY WHETHER A CHAIN IS ACTUALLY VALID**. Use :meth:`verify` for that. + **THIS METHOD DOES NOT VERIFY WHETHER A CHAIN IS ACTUALLY VALID**. + Use :meth:`verify` for that. :param Certificate certificate: The certificate to build a potential chain for :param int depth: The maximum depth, used for recursion @@ -286,7 +318,8 @@ def verify(self, certificate: Certificate) -> Iterable[Certificate]: :param Certificate certificate: The certificate to verify :return: A valid certificate chain for this certificate. :rtype: Iterable[Certificate] - :raises AuthenticodeVerificationError: When the certificate could not be verified. + :raises AuthenticodeVerificationError: When the certificate could not be + verified. """ # we keep track of our asn1 objects to make sure we return Certificate objects when we're done @@ -317,31 +350,39 @@ def verify(self, certificate: Certificate) -> Iterable[Certificate]: ocsps=self.ocsps, ) validator = CertificateValidator( - end_entity_cert=to_check_asn1cert, intermediate_certs=list(intermediates), validation_context=context + end_entity_cert=to_check_asn1cert, + intermediate_certs=list(intermediates), + validation_context=context, ) # verify the chain try: chain = validator.validate_usage( key_usage=set(self.key_usages) if self.key_usages else set(), - extended_key_usage=set(self.extended_key_usages) if self.extended_key_usages else set(), + extended_key_usage=( + set(self.extended_key_usages) if self.extended_key_usages else set() + ), extended_optional=self.optional_eku, ) except Exception as e: - raise CertificateVerificationError("Chain verification from %s failed: %s" % (certificate, e)) + raise CertificateVerificationError( + "Chain verification from %s failed: %s" % (certificate, e) + ) signify_chain = [all_certs[x] for x in chain] self.verify_trust(signify_chain) return signify_chain def is_trusted(self, certificate: Certificate) -> bool: - """Returns whether the provided certificate is trusted by a trusted certificate store. + """Returns whether the provided certificate is trusted by a trusted certificate + store. .. warning:: - This check does not verify that the certificate is valid according to the Trust List, if set. It merely - checks that the provided certificate is in a trusted certificate store. You still need to verify the chain - for its full trust. + This check does not verify that the certificate is valid according to the + Trust List, if set. It merely checks that the provided certificate is in a + trusted certificate store. You still need to verify the chain for its full + trust. """ for store in self.stores: @@ -350,7 +391,8 @@ def is_trusted(self, certificate: Certificate) -> bool: return False def verify_trust(self, chain: list[Certificate]) -> bool: - """Determines whether the given certificate chain is trusted by a trusted certificate store. + """Determines whether the given certificate chain is trusted by a trusted + certificate store. :param List[Certificate] chain: The certificate chain to verify trust for. :return: True if the certificate chain is trusted by a certificate store. @@ -371,5 +413,6 @@ def verify_trust(self, chain: list[Certificate]) -> bool: raise exc raise CertificateVerificationError( - "The trust for %s could not be verified, as it is not trusted by any store" % chain + "The trust for %s could not be verified, as it is not trusted by any store" + % chain ) diff --git a/tests/_generate_test_results.py b/tests/_generate_test_results.py index 02f1a72..f7866d9 100644 --- a/tests/_generate_test_results.py +++ b/tests/_generate_test_results.py @@ -33,8 +33,12 @@ def main(): print("Updating {}...".format(filename)) with open(str(filename), "rb") as file_obj: fingerprinter = AuthenticodeFingerprinter(file_obj) - fingerprinter.add_hashers(hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512) - fingerprinter.add_authenticode_hashers(hashlib.md5, hashlib.sha1, hashlib.sha256) + fingerprinter.add_hashers( + hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512 + ) + fingerprinter.add_authenticode_hashers( + hashlib.md5, hashlib.sha1, hashlib.sha256 + ) results = fingerprinter.hashes() # convert to hex @@ -45,5 +49,5 @@ def main(): json.dump(results, res_obj) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/tests/requirements.txt b/tests/requirements.txt index 666699e..f4caaf7 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,2 +1,4 @@ flake8~=4.0.1 coverage~=6.2 +mypy~=1.5.1 +black~=23.7 diff --git a/tests/test_asn1.py b/tests/test_asn1.py index 2b88afd..5deec87 100644 --- a/tests/test_asn1.py +++ b/tests/test_asn1.py @@ -32,65 +32,96 @@ class TimeTest(unittest.TestCase): def test_conversion_utc(self): - utctime = useful.UTCTime('120614235959Z') + utctime = useful.UTCTime("120614235959Z") t = Time() - t['utcTime'] = utctime - self.assertEqual(time_to_python(t), datetime.datetime(2012, 6, 14, 23, 59, 59, tzinfo=datetime.timezone.utc)) + t["utcTime"] = utctime + self.assertEqual( + time_to_python(t), + datetime.datetime(2012, 6, 14, 23, 59, 59, tzinfo=datetime.timezone.utc), + ) def test_conversion_gen(self): - gen_time = useful.GeneralizedTime('20120614235959Z') + gen_time = useful.GeneralizedTime("20120614235959Z") t = Time() - t['generalTime'] = gen_time - self.assertEqual(time_to_python(t), datetime.datetime(2012, 6, 14, 23, 59, 59, tzinfo=datetime.timezone.utc)) + t["generalTime"] = gen_time + self.assertEqual( + time_to_python(t), + datetime.datetime(2012, 6, 14, 23, 59, 59, tzinfo=datetime.timezone.utc), + ) class RDNSequenceTest(unittest.TestCase): def test_to_string(self): certificate = TRUSTED_CERTIFICATE_STORE.find_certificate( - sha256_fingerprint="DF545BF919A2439C36983B54CDFC903DFA4F37D3996D8D84B4C31EEC6F3C163E" + sha256_fingerprint=( + "DF545BF919A2439C36983B54CDFC903DFA4F37D3996D8D84B4C31EEC6F3C163E" + ) ) - self.assertEqual(certificate.issuer.dn, - "CN=Microsoft Root Certificate Authority 2010, O=Microsoft Corporation, " - "L=Redmond, ST=Washington, C=US") + self.assertEqual( + certificate.issuer.dn, + "CN=Microsoft Root Certificate Authority 2010, O=Microsoft Corporation, " + "L=Redmond, ST=Washington, C=US", + ) def test_to_string_with_commas(self): certificate = TRUSTED_CERTIFICATE_STORE.find_certificate( - sha256_fingerprint="5B789987F3C4055B8700941B33783A5F16E0CFF937EA32011FE04779F7635308" + sha256_fingerprint=( + "5B789987F3C4055B8700941B33783A5F16E0CFF937EA32011FE04779F7635308" + ) ) - self.assertEqual(certificate.issuer.dn, - r"OU=NO LIABILITY ACCEPTED\, (c)97 VeriSign\, Inc., OU=VeriSign Time Stamping Service Root, " - r"OU=VeriSign\, Inc., O=VeriSign Trust Network") + self.assertEqual( + certificate.issuer.dn, + r"OU=NO LIABILITY ACCEPTED\, (c)97 VeriSign\, Inc., OU=VeriSign Time" + r" Stamping Service Root, " + r"OU=VeriSign\, Inc., O=VeriSign Trust Network", + ) def test_get_components(self): certificate = TRUSTED_CERTIFICATE_STORE.find_certificate( - sha256_fingerprint="5B789987F3C4055B8700941B33783A5F16E0CFF937EA32011FE04779F7635308" + sha256_fingerprint=( + "5B789987F3C4055B8700941B33783A5F16E0CFF937EA32011FE04779F7635308" + ) ) result = list(certificate.issuer.get_components("OU")) - self.assertEqual(result, ["NO LIABILITY ACCEPTED, (c)97 VeriSign, Inc.", - "VeriSign Time Stamping Service Root", - "VeriSign, Inc."]) + self.assertEqual( + result, + [ + "NO LIABILITY ACCEPTED, (c)97 VeriSign, Inc.", + "VeriSign Time Stamping Service Root", + "VeriSign, Inc.", + ], + ) self.assertEqual(list(certificate.issuer.get_components("CN")), []) def test_get_components_none(self): certificate = TRUSTED_CERTIFICATE_STORE.find_certificate( - sha256_fingerprint="5B789987F3C4055B8700941B33783A5F16E0CFF937EA32011FE04779F7635308" + sha256_fingerprint=( + "5B789987F3C4055B8700941B33783A5F16E0CFF937EA32011FE04779F7635308" + ) ) result = certificate.issuer.rdns - self.assertEqual(result, (('OU', 'NO LIABILITY ACCEPTED, (c)97 VeriSign, Inc.'), - ('OU', 'VeriSign Time Stamping Service Root'), - ('OU', 'VeriSign, Inc.'), - ('O', 'VeriSign Trust Network'))) + self.assertEqual( + result, + ( + ("OU", "NO LIABILITY ACCEPTED, (c)97 VeriSign, Inc."), + ("OU", "VeriSign Time Stamping Service Root"), + ("OU", "VeriSign, Inc."), + ("O", "VeriSign Trust Network"), + ), + ) class GuardedBerDecodeTest(unittest.TestCase): def test_normal_read(self): self.assertTrue(guarded_ber_decode(univ.Any("\x01\x01\xff"))) # true self.assertFalse(guarded_ber_decode(univ.Any("\x01\x01\x00"))) # false - self.assertIsInstance(guarded_ber_decode(univ.Any("\x05\x00")), univ.Null) # null + self.assertIsInstance( + guarded_ber_decode(univ.Any("\x05\x00")), univ.Null + ) # null def test_extra_read(self): self.assertRaises(ParseError, guarded_ber_decode, univ.Any("\x05\x00\x01")) diff --git a/tests/test_authenticode.py b/tests/test_authenticode.py index d5ca7b7..6f9de21 100644 --- a/tests/test_authenticode.py +++ b/tests/test_authenticode.py @@ -28,14 +28,28 @@ import datetime -from signify.authenticode import CERTIFICATE_LOCATION, TRUSTED_CERTIFICATE_STORE, TRUSTED_CERTIFICATE_STORE_NO_CTL -from signify.x509.context import VerificationContext, FileSystemCertificateStore, CertificateStore -from signify.exceptions import VerificationError, AuthenticodeVerificationError, SignedPEParseError +from signify.authenticode import ( + CERTIFICATE_LOCATION, + TRUSTED_CERTIFICATE_STORE, + TRUSTED_CERTIFICATE_STORE_NO_CTL, +) +from signify.x509.context import ( + VerificationContext, + FileSystemCertificateStore, + CertificateStore, +) +from signify.exceptions import ( + VerificationError, + AuthenticodeVerificationError, + SignedPEParseError, +) from signify.fingerprinter import AuthenticodeFingerprinter from signify.authenticode.signed_pe import SignedPEFile root_dir = pathlib.Path(__file__).parent -trusted_certificate_store = FileSystemCertificateStore(location=CERTIFICATE_LOCATION, trusted=True) +trusted_certificate_store = FileSystemCertificateStore( + location=CERTIFICATE_LOCATION, trusted=True +) class AuthenticodeParserTestCase(unittest.TestCase): @@ -46,8 +60,10 @@ def test_software_update(self): hashes = fingerprinter.hash() # Sanity check that the authenticode hash is still correct - self.assertEqual(binascii.hexlify(hashes['sha1']).decode('ascii'), - '978b90ace99c764841d2dd17d278fac4149962a3') + self.assertEqual( + binascii.hexlify(hashes["sha1"]).decode("ascii"), + "978b90ace99c764841d2dd17d278fac4149962a3", + ) pefile = SignedPEFile(f) @@ -59,7 +75,7 @@ def test_software_update(self): self.assertEqual(len(signed_datas), 1) signed_data = signed_datas[0] - self.assertEqual(signed_data._rest_data, b'\0') + self.assertEqual(signed_data._rest_data, b"\0") signed_data.verify() @@ -98,32 +114,70 @@ def test_2A6E(self): self.assertRaises(VerificationError, pefile.verify) def test_0d8c_valid(self): - with open(str(root_dir / "test_data" / "0d8c2bcb575378f6a88d17b5f6ce70e794a264cdc8556c8e812f0b5f9c709198"), "rb") as f: + with open( + str( + root_dir + / "test_data" + / "0d8c2bcb575378f6a88d17b5f6ce70e794a264cdc8556c8e812f0b5f9c709198" + ), + "rb", + ) as f: pefile = SignedPEFile(f) pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL) def test_provide_hash(self): - with open(str(root_dir / "test_data" / "0d8c2bcb575378f6a88d17b5f6ce70e794a264cdc8556c8e812f0b5f9c709198"), "rb") as f: + with open( + str( + root_dir + / "test_data" + / "0d8c2bcb575378f6a88d17b5f6ce70e794a264cdc8556c8e812f0b5f9c709198" + ), + "rb", + ) as f: pefile = SignedPEFile(f) with self.assertRaises(VerificationError): - pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, expected_hashes={"sha1": b"asdf"}) + pefile.verify( + trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, + expected_hashes={"sha1": b"asdf"}, + ) def test_19e8_expired(self): """this is an expired sample""" - with open(str(root_dir / "test_data" / "19e818d0da361c4feedd456fca63d68d4b024fbbd3d9265f606076c7ee72e8f8.ViR"), "rb") as f: + with open( + str( + root_dir + / "test_data" + / "19e818d0da361c4feedd456fca63d68d4b024fbbd3d9265f606076c7ee72e8f8.ViR" + ), + "rb", + ) as f: pefile = SignedPEFile(f) self.assertRaises(VerificationError, pefile.verify) def test_19e8_valid_within_period(self): """test whether the timestamp can be set on expired samples""" - with open(str(root_dir / "test_data" / "19e818d0da361c4feedd456fca63d68d4b024fbbd3d9265f606076c7ee72e8f8.ViR"), "rb") as f: + with open( + str( + root_dir + / "test_data" + / "19e818d0da361c4feedd456fca63d68d4b024fbbd3d9265f606076c7ee72e8f8.ViR" + ), + "rb", + ) as f: pefile = SignedPEFile(f) - pefile.verify(verification_context_kwargs= - {'timestamp': datetime.datetime(2013, 1, 1, tzinfo=datetime.timezone.utc)}) + pefile.verify( + verification_context_kwargs={ + "timestamp": datetime.datetime( + 2013, 1, 1, tzinfo=datetime.timezone.utc + ) + } + ) def test_sw_reporter(self): """Test for SHA256 hashes used in sig""" - with open(str(root_dir / "test_data" / "software_reporter_tool.exe"), "rb") as f: + with open( + str(root_dir / "test_data" / "software_reporter_tool.exe"), "rb" + ) as f: pefile = SignedPEFile(f) signed_datas = list(pefile.signed_datas) self.assertEqual(len(signed_datas), 1) @@ -139,7 +193,9 @@ def test_7z1900_invalid_cve2020_0601(self): def test_3a7de393a36ca8911cd0842a9a25b058_valid_different_contenttype(self): """uses a different contenttype, 1.2.840.113549.1.9.16.1.4 instead of Data""" - with open(str(root_dir / "test_data" / "3a7de393a36ca8911cd0842a9a25b058"), "rb") as f: + with open( + str(root_dir / "test_data" / "3a7de393a36ca8911cd0842a9a25b058"), "rb" + ) as f: pefile = SignedPEFile(f) pefile.verify() @@ -159,8 +215,13 @@ def test_jameslth_valid_when_revocation_not_checked(self): """this certificate is revoked""" with open(str(root_dir / "test_data" / "jameslth"), "rb") as f: pefile = SignedPEFile(f) - pefile.verify(verification_context_kwargs= - {'timestamp': datetime.datetime(2021, 1, 1, tzinfo=datetime.timezone.utc)}) + pefile.verify( + verification_context_kwargs={ + "timestamp": datetime.datetime( + 2021, 1, 1, tzinfo=datetime.timezone.utc + ) + } + ) def test_jameslth_revoked(self): """this certificate is revoked""" @@ -168,7 +229,12 @@ def test_jameslth_revoked(self): with open(str(root_dir / "test_data" / "jameslth"), "rb") as f: pefile = SignedPEFile(f) with self.assertRaises(VerificationError): - pefile.verify(verification_context_kwargs={'allow_fetching': True, 'revocation_mode': 'hard-fail'}) + pefile.verify( + verification_context_kwargs={ + "allow_fetching": True, + "revocation_mode": "hard-fail", + } + ) def test_zonealarm_rfc3161_different_hash_and_digest_algorithms(self): """this tests a RFC3161 sample that has distinct hash and digest algorithms""" @@ -178,7 +244,9 @@ def test_zonealarm_rfc3161_different_hash_and_digest_algorithms(self): def test_abnormal_attribute_order(self): """this tests a sample that has an abnormal attribute order""" - with open(str(root_dir / "test_data" / "8757bf55-0077-4df5-9807-122a3261ee40"), "rb") as f: + with open( + str(root_dir / "test_data" / "8757bf55-0077-4df5-9807-122a3261ee40"), "rb" + ) as f: pefile = SignedPEFile(f) pefile.verify() @@ -188,40 +256,65 @@ def test_multiple_signatures_all_valid(self): pefile = SignedPEFile(f) self.assertEqual(len(list(pefile.signed_datas)), 2) - for mode in ('all', 'first', 'any', 'best'): + for mode in ("all", "first", "any", "best"): with self.subTest(multi_verify_mode=mode): - pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, multi_verify_mode=mode) + pefile.verify( + trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, + multi_verify_mode=mode, + ) def test_multiple_signatures_invalid_sha1(self): """this tests a sample that has an invalid sha-1 hash, but valid sha-256 hash""" with open(str(root_dir / "test_data" / "sigcheck_sha1_patched.exe"), "rb") as f: pefile = SignedPEFile(f) - for mode in ('all', 'first'): - with self.subTest(multi_verify_mode=mode), self.assertRaises(VerificationError): - pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, multi_verify_mode=mode) - for mode in ('any', 'best'): + for mode in ("all", "first"): + with self.subTest(multi_verify_mode=mode), self.assertRaises( + VerificationError + ): + pefile.verify( + trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, + multi_verify_mode=mode, + ) + for mode in ("any", "best"): with self.subTest(multi_verify_mode=mode): - pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, multi_verify_mode=mode) + pefile.verify( + trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, + multi_verify_mode=mode, + ) def test_multiple_signatures_invalid_sha256(self): """this tests a sample that has an valid sha-1 hash, but invalid sha-256 hash""" - with open(str(root_dir / "test_data" / "sigcheck_sha256_patched.exe"), "rb") as f: + with open( + str(root_dir / "test_data" / "sigcheck_sha256_patched.exe"), "rb" + ) as f: pefile = SignedPEFile(f) - for mode in ('all', 'best'): - with self.subTest(multi_verify_mode=mode), self.assertRaises(VerificationError): - pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, multi_verify_mode=mode) - for mode in ('first', 'any'): + for mode in ("all", "best"): + with self.subTest(multi_verify_mode=mode), self.assertRaises( + VerificationError + ): + pefile.verify( + trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, + multi_verify_mode=mode, + ) + for mode in ("first", "any"): with self.subTest(multi_verify_mode=mode): - pefile.verify(trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, multi_verify_mode=mode) + pefile.verify( + trusted_certificate_store=TRUSTED_CERTIFICATE_STORE_NO_CTL, + multi_verify_mode=mode, + ) def test_multiple_signatures_all_invalid(self): """this tests a sample that has only invalid signautres""" - with open(str(root_dir / "test_data" / "sigcheck_sha256_patched.exe"), "rb") as f: + with open( + str(root_dir / "test_data" / "sigcheck_sha256_patched.exe"), "rb" + ) as f: pefile = SignedPEFile(f) - # we can test for the fact that all signatures are invalid here as well, because the normal CTL will - # disallow sha1 - for mode in ('all', 'best', 'first', 'any'): - with self.subTest(multi_verify_mode=mode), self.assertRaises(VerificationError): + # we can test for the fact that all signatures are invalid here as well, + # because the normal CTL will disallow sha1 + for mode in ("all", "best", "first", "any"): + with self.subTest(multi_verify_mode=mode), self.assertRaises( + VerificationError + ): pefile.verify(multi_verify_mode=mode) @@ -236,7 +329,9 @@ def test_all_trusted_certificates_are_trusted(self): self.assertListEqual(chain, [certificate]) def test_no_duplicates_in_default_store(self): - self.assertEqual(len(TRUSTED_CERTIFICATE_STORE), len(set(TRUSTED_CERTIFICATE_STORE))) + self.assertEqual( + len(TRUSTED_CERTIFICATE_STORE), len(set(TRUSTED_CERTIFICATE_STORE)) + ) def test_trust_fails(self): # we get a certificate we currently trust @@ -247,4 +342,3 @@ def test_trust_fails(self): # and verify using this store context = VerificationContext(store, timestamp=certificate.valid_to) self.assertRaises(VerificationError, certificate.verify, context) - diff --git a/tests/test_context.py b/tests/test_context.py index 9dedf96..95b7eee 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -2,7 +2,10 @@ import pathlib import unittest -from signify.authenticode import TRUSTED_CERTIFICATE_STORE, TRUSTED_CERTIFICATE_STORE_NO_CTL +from signify.authenticode import ( + TRUSTED_CERTIFICATE_STORE, + TRUSTED_CERTIFICATE_STORE_NO_CTL, +) from signify.x509.certificates import Certificate from signify.x509.context import VerificationContext, FileSystemCertificateStore from signify.exceptions import VerificationError @@ -18,12 +21,22 @@ def test_amount_of_certificates(self): class ContextTestCase(unittest.TestCase): def test_potential_chains(self): - with open(str(root_dir / "test_data" / "19e818d0da361c4feedd456fca63d68d4b024fbbd3d9265f606076c7ee72e8f8.ViR"), "rb") as f: + with open( + str( + root_dir + / "test_data" + / "19e818d0da361c4feedd456fca63d68d4b024fbbd3d9265f606076c7ee72e8f8.ViR" + ), + "rb", + ) as f: pefile = SignedPEFile(f) for signed_data in pefile.signed_datas: - - context = VerificationContext(TRUSTED_CERTIFICATE_STORE_NO_CTL, signed_data.certificates) - potential_chains = list(signed_data.signer_info.potential_chains(context)) + context = VerificationContext( + TRUSTED_CERTIFICATE_STORE_NO_CTL, signed_data.certificates + ) + potential_chains = list( + signed_data.signer_info.potential_chains(context) + ) self.assertEqual(len(potential_chains), 2) # for chain in potential_chains: # print("xxxx") @@ -32,17 +45,26 @@ def test_potential_chains(self): class ValidationTestCase(unittest.TestCase): - @unittest.skipIf(datetime.datetime.now() > datetime.datetime(2022, 10, 27), "revoked certificate expired") + @unittest.skipIf( + datetime.datetime.now() > datetime.datetime(2022, 10, 27), + "revoked certificate expired", + ) def test_revoked_certificate(self): - root = FileSystemCertificateStore(root_dir / "certs" / 'digicert-global-root-ca.pem', trusted=True) - intermediate = FileSystemCertificateStore(root_dir / "certs" / 'rapidssl-tls-2020.pem') - with open(str(root_dir / "certs" / 'revoked.badssl.com.pem'), "rb") as f: + root = FileSystemCertificateStore( + root_dir / "certs" / "digicert-global-root-ca.pem", trusted=True + ) + intermediate = FileSystemCertificateStore( + root_dir / "certs" / "rapidssl-tls-2020.pem" + ) + with open(str(root_dir / "certs" / "revoked.badssl.com.pem"), "rb") as f: cert = Certificate.from_pem(f.read()) # check that when we do not verify the CRL it does not fail context = VerificationContext(root, intermediate) context.verify(cert) - context = VerificationContext(root, intermediate, allow_fetching=True, revocation_mode='hard-fail') + context = VerificationContext( + root, intermediate, allow_fetching=True, revocation_mode="hard-fail" + ) with self.assertRaises(VerificationError): context.verify(cert) diff --git a/tests/test_fingerprinter.py b/tests/test_fingerprinter.py index eb41b2f..a464d1f 100644 --- a/tests/test_fingerprinter.py +++ b/tests/test_fingerprinter.py @@ -25,7 +25,12 @@ import unittest import pathlib -from signify.fingerprinter import AuthenticodeFingerprinter, Fingerprinter, Finger, Range +from signify.fingerprinter import ( + AuthenticodeFingerprinter, + Fingerprinter, + Finger, + Range, +) root_dir = pathlib.Path(__file__).parent @@ -40,8 +45,12 @@ def test_entire_blobs(self): with self.subTest(filename): with open(str(filename), "rb") as file_obj: fingerprinter = AuthenticodeFingerprinter(file_obj) - fingerprinter.add_hashers(hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512) - fingerprinter.add_authenticode_hashers(hashlib.md5, hashlib.sha1, hashlib.sha256) + fingerprinter.add_hashers( + hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha512 + ) + fingerprinter.add_authenticode_hashers( + hashlib.md5, hashlib.sha1, hashlib.sha256 + ) results = fingerprinter.hashes() # convert to hex @@ -58,7 +67,7 @@ def test_reasonable_interval(self): # Check if the limit on maximum blocksize for processing still holds. dummy = io.BytesIO(b"") fp = Fingerprinter(dummy) - fp._fingers.append(Finger([], [Range(0, 1000001)], "")) + fp._fingers.append(Finger([], [Range(0, 1000001)], "")) start, stop = fp._next_interval self.assertEqual(0, start) diff --git a/tests/test_signerinfo.py b/tests/test_signerinfo.py index e780d17..6831ffd 100644 --- a/tests/test_signerinfo.py +++ b/tests/test_signerinfo.py @@ -10,33 +10,38 @@ class GetDigestAlgorithmTest(unittest.TestCase): def test_acceptable_oid(self): sha1 = DigestAlgorithmIdentifier() - sha1['algorithm'] = (1, 3, 14, 3, 2, 26) + sha1["algorithm"] = (1, 3, 14, 3, 2, 26) self.assertEqual(_get_digest_algorithm(sha1, location="test"), hashlib.sha1) def test_unknown_oid(self): invalid = DigestAlgorithmIdentifier() - invalid['algorithm'] = (1, 2) + invalid["algorithm"] = (1, 2) self.assertRaises(ParseError, _get_digest_algorithm, invalid, location="test") def test_non_hashlib_oid(self): invalid = DigestAlgorithmIdentifier() - invalid['algorithm'] = (1, 2, 840, 113549, 1, 9, 3) + invalid["algorithm"] = (1, 2, 840, 113549, 1, 9, 3) self.assertRaises(ParseError, _get_digest_algorithm, invalid, location="test") def test_unacceptable_oid(self): sha1 = DigestAlgorithmIdentifier() - sha1['algorithm'] = (1, 3, 14, 3, 2, 26) - self.assertRaises(ParseError, _get_digest_algorithm, sha1, location="test", acceptable=[hashlib.md5]) + sha1["algorithm"] = (1, 3, 14, 3, 2, 26) + self.assertRaises( + ParseError, + _get_digest_algorithm, + sha1, + location="test", + acceptable=[hashlib.md5], + ) def test_null_parameters(self): sha1 = DigestAlgorithmIdentifier() - sha1['algorithm'] = (1, 3, 14, 3, 2, 26) - sha1['parameters'] = "\x05\0" # null value + sha1["algorithm"] = (1, 3, 14, 3, 2, 26) + sha1["parameters"] = "\x05\0" # null value self.assertEqual(_get_digest_algorithm(sha1, location="test"), hashlib.sha1) def test_non_null_parameters(self): sha1 = DigestAlgorithmIdentifier() - sha1['algorithm'] = (1, 3, 14, 3, 2, 26) - sha1['parameters'] = "\x01\x01\xff" # TRUE boolean + sha1["algorithm"] = (1, 3, 14, 3, 2, 26) + sha1["parameters"] = "\x01\x01\xff" # TRUE boolean self.assertRaises(ParseError, _get_digest_algorithm, sha1, location="test") -