Skip to content

Commit

Permalink
Run black on all files
Browse files Browse the repository at this point in the history
  • Loading branch information
ralphje committed Aug 18, 2023
1 parent d94440e commit a693b5f
Show file tree
Hide file tree
Showing 27 changed files with 1,450 additions and 677 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,13 @@ jobs:
with:
linters: mypy
run: mypy

black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
options: "--check --verbose"
src: "./signify"
version: "~= 23.7"
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
[tool.black]
line-length = 88
target-version = ['py38', 'py39', 'py310', 'py311']
preview = true

[tool.mypy]
files = "signify"
python_version = "3.8"
Expand Down
2 changes: 1 addition & 1 deletion signify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def _print_type(t: Any) -> str:
return ""
elif isinstance(t, tuple):
return ".".join(map(str, t))
elif callable(t) and hasattr(t(), 'name'):
elif callable(t) and hasattr(t(), "name"):
return cast(str, t().name) # used by hashlib
elif hasattr(t, "__name__"):
return cast(str, t.__name__)
Expand Down
8 changes: 6 additions & 2 deletions signify/asn1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def guarded_ber_decode(data: Any, asn1_spec: _T | None = None) -> Asn1Type | _T:
except Exception as e:
raise ParseError("Error while parsing %s BER: %s" % (_print_type(asn1_spec), e))
if rest:
raise ParseError("Extra information after parsing %s BER" % _print_type(asn1_spec))
raise ParseError(
"Extra information after parsing %s BER" % _print_type(asn1_spec)
)
return result


Expand All @@ -56,5 +58,7 @@ def guarded_der_decode(data: Any, asn1_spec: _T | None = None) -> Asn1Type | _T:
except Exception as e:
raise ParseError("Error while parsing %s DER: %s" % (_print_type(asn1_spec), e))
if rest:
raise ParseError("Extra information after parsing %s DER" % _print_type(asn1_spec))
raise ParseError(
"Extra information after parsing %s DER" % _print_type(asn1_spec)
)
return result
32 changes: 17 additions & 15 deletions signify/asn1/ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@


class CTLVersion(univ.Integer): # type: ignore[misc]
namedValues = namedval.NamedValues(
('v1', 0)
)
namedValues = namedval.NamedValues(("v1", 0))


class SubjectUsage(rfc5280.ExtKeyUsageSyntax): # type: ignore[misc]
Expand All @@ -25,8 +23,8 @@ class SubjectIdentifier(univ.OctetString): # type: ignore[misc]

class TrustedSubject(univ.Sequence): # type: ignore[misc]
componentType = namedtype.NamedTypes(
namedtype.NamedType('subjectIdentifier', SubjectIdentifier()),
namedtype.OptionalNamedType('subjectAttributes', rfc2315.Attributes()),
namedtype.NamedType("subjectIdentifier", SubjectIdentifier()),
namedtype.OptionalNamedType("subjectAttributes", rfc2315.Attributes()),
)


Expand All @@ -36,16 +34,20 @@ class TrustedSubjects(univ.SequenceOf): # type: ignore[misc]

class CertificateTrustList(univ.Sequence): # type: ignore[misc]
componentType = namedtype.NamedTypes(
namedtype.DefaultedNamedType('version', CTLVersion('v1')),
namedtype.NamedType('subjectUsage', SubjectUsage()),
namedtype.OptionalNamedType('listIdentifier', ListIdentifier()),
namedtype.OptionalNamedType('sequenceNumber', univ.Integer()),
namedtype.NamedType('ctlThisUpdate', rfc5280.Time()),
namedtype.OptionalNamedType('ctlNextUpdate', rfc5280.Time()),
namedtype.NamedType('subjectAlgorithm', rfc5280.AlgorithmIdentifier()),
namedtype.OptionalNamedType('trustedSubjects', TrustedSubjects()),
namedtype.OptionalNamedType('ctlExtensions', rfc5280.Extensions().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))
namedtype.DefaultedNamedType("version", CTLVersion("v1")),
namedtype.NamedType("subjectUsage", SubjectUsage()),
namedtype.OptionalNamedType("listIdentifier", ListIdentifier()),
namedtype.OptionalNamedType("sequenceNumber", univ.Integer()),
namedtype.NamedType("ctlThisUpdate", rfc5280.Time()),
namedtype.OptionalNamedType("ctlNextUpdate", rfc5280.Time()),
namedtype.NamedType("subjectAlgorithm", rfc5280.AlgorithmIdentifier()),
namedtype.OptionalNamedType("trustedSubjects", TrustedSubjects()),
namedtype.OptionalNamedType(
"ctlExtensions",
rfc5280.Extensions().subtype(
explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)
),
),
)


Expand Down
24 changes: 18 additions & 6 deletions signify/asn1/hashing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@
from signify.exceptions import ParseError

# this list must be in the order of worst to best
ACCEPTED_DIGEST_ALGORITHMS = (hashlib.md5, hashlib.sha1, hashlib.sha256, hashlib.sha384, hashlib.sha512)


def _verify_empty_algorithm_parameters(algorithm: rfc5280.AlgorithmIdentifier, location: str) -> None:
ACCEPTED_DIGEST_ALGORITHMS = (
hashlib.md5,
hashlib.sha1,
hashlib.sha256,
hashlib.sha384,
hashlib.sha512,
)


def _verify_empty_algorithm_parameters(
algorithm: rfc5280.AlgorithmIdentifier, location: str
) -> None:
if "parameters" in algorithm and algorithm["parameters"].isValue:
parameters = guarded_ber_decode(algorithm["parameters"])
if not isinstance(parameters, univ.Null):
Expand All @@ -30,7 +38,8 @@ def _get_digest_algorithm(
result = asn1.oids.get(algorithm["algorithm"], asn1.oids.OID_TO_HASH)
if isinstance(result, tuple) or result not in acceptable:
raise ParseError(
"%s must be one of %s, not %s" % (location, [x().name for x in acceptable], _print_type(result))
"%s must be one of %s, not %s"
% (location, [x().name for x in acceptable], _print_type(result))
)

_verify_empty_algorithm_parameters(algorithm, location)
Expand All @@ -41,7 +50,10 @@ def _get_digest_algorithm(
def _get_encryption_algorithm(algorithm: univ.Sequence, location: str) -> str:
result = asn1.oids.OID_TO_PUBKEY.get(algorithm["algorithm"])
if result is None:
raise ParseError("%s: %s is not acceptable as encryption algorithm" % (location, algorithm["algorithm"]))
raise ParseError(
"%s: %s is not acceptable as encryption algorithm"
% (location, algorithm["algorithm"])
)

_verify_empty_algorithm_parameters(algorithm, location)
return result
77 changes: 51 additions & 26 deletions signify/asn1/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,28 @@


def time_to_python(time: GeneralizedTime | UTCTime) -> datetime.datetime | None:
if 'utcTime' in time:
return cast(datetime.datetime, time['utcTime'].asDateTime)
elif 'generalTime' in time:
return cast(datetime.datetime, time['generalTime'].asDateTime)
if "utcTime" in time:
return cast(datetime.datetime, time["utcTime"].asDateTime)
elif "generalTime" in time:
return cast(datetime.datetime, time["generalTime"].asDateTime)
else:
return None


def accuracy_to_python(accuracy: rfc3161.Accuracy) -> datetime.timedelta:
delta = datetime.timedelta()
if 'seconds' in accuracy and accuracy['seconds'].isValue:
delta += datetime.timedelta(seconds=int(accuracy['seconds']))
if 'millis' in accuracy and accuracy['millis'].isValue:
delta += datetime.timedelta(milliseconds=int(accuracy['millis']))
if 'micros' in accuracy and accuracy['micros'].isValue:
delta += datetime.timedelta(microseconds=int(accuracy['micros']))
if "seconds" in accuracy and accuracy["seconds"].isValue:
delta += datetime.timedelta(seconds=int(accuracy["seconds"]))
if "millis" in accuracy and accuracy["millis"].isValue:
delta += datetime.timedelta(milliseconds=int(accuracy["millis"]))
if "micros" in accuracy and accuracy["micros"].isValue:
delta += datetime.timedelta(microseconds=int(accuracy["micros"]))
return delta


def bitstring_to_bytes(s: str) -> bytes:
# based on https://stackoverflow.com/questions/32675679/convert-binary-string-to-bytearray-in-python-3
return int(str(s), 2).to_bytes((len(s) + 7) // 8, byteorder='big')
return int(str(s), 2).to_bytes((len(s) + 7) // 8, byteorder="big")


@contextlib.contextmanager
Expand All @@ -45,25 +45,50 @@ def patch_rfc5652_signeddata() -> Iterator[rfc5652.SignedData]:
original_component_type = CertificateChoices.componentType

# first allow changing values on the object
del CertificateChoices._readOnly['componentType']
del CertificateChoices._readOnly["componentType"]
CertificateChoices.componentType = rfc5652.namedtype.NamedTypes(
rfc5652.namedtype.NamedType('certificate', rfc5652.rfc5280.Certificate()),
rfc5652.namedtype.NamedType('extendedCertificate', rfc5652.ExtendedCertificate().subtype(
implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 0))),
# The following line is the only one changed to reflect that tag 1 is also used for v2AttrCerts.
# Note that we do not update the actual name in the scheme to prevent naming com
rfc5652.namedtype.NamedType('v1AttrCert', rfc5652.AttributeCertificateV2().subtype(
implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 1))),
rfc5652.namedtype.NamedType('v2AttrCert', rfc5652.AttributeCertificateV2().subtype(
implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 2))),
rfc5652.namedtype.NamedType('other', rfc5652.OtherCertificateFormat().subtype(
implicitTag=rfc5652.tag.Tag(rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 3)))
rfc5652.namedtype.NamedType("certificate", rfc5652.rfc5280.Certificate()),
rfc5652.namedtype.NamedType(
"extendedCertificate",
rfc5652.ExtendedCertificate().subtype(
implicitTag=rfc5652.tag.Tag(
rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 0
)
),
),
# The following line is the only one changed to reflect that tag 1 is
# also used for v2AttrCerts.
# Note that we do not update the actual name in the scheme to preventnaming com
rfc5652.namedtype.NamedType(
"v1AttrCert",
rfc5652.AttributeCertificateV2().subtype(
implicitTag=rfc5652.tag.Tag(
rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 1
)
),
),
rfc5652.namedtype.NamedType(
"v2AttrCert",
rfc5652.AttributeCertificateV2().subtype(
implicitTag=rfc5652.tag.Tag(
rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatSimple, 2
)
),
),
rfc5652.namedtype.NamedType(
"other",
rfc5652.OtherCertificateFormat().subtype(
implicitTag=rfc5652.tag.Tag(
rfc5652.tag.tagClassContext, rfc5652.tag.tagFormatConstructed, 3
)
),
),
)
CertificateChoices._readOnly['componentType'] = CertificateChoices.componentType
CertificateChoices._readOnly["componentType"] = CertificateChoices.componentType

try:
yield SignedData()
finally:
del CertificateChoices._readOnly['componentType']
del CertificateChoices._readOnly["componentType"]
CertificateChoices.componentType = original_component_type
CertificateChoices._readOnly['componentType'] = CertificateChoices.componentType
CertificateChoices._readOnly["componentType"] = CertificateChoices.componentType
23 changes: 9 additions & 14 deletions signify/asn1/preserving_der.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,32 @@
from pyasn1.compat.octets import null, str2octs
from pyasn1.type import univ

__all__ = ['encode']
__all__ = ["encode"]


class SetOfEncoder(cer_encoder.SetOfEncoder): # type: ignore[misc]
"""This class is identical to the one of the CER encoder, except that the sorting has been removed. """
"""This class is identical to the one of the CER encoder, except that the sorting
has been removed.
"""

def encodeValue(self, value, asn1Spec, encodeFun, **options): # type: ignore[no-untyped-def]
chunks = self._encodeComponents(
value, asn1Spec, encodeFun, **options)
chunks = self._encodeComponents(value, asn1Spec, encodeFun, **options)

if len(chunks) > 1:
zero = str2octs('\x00')
zero = str2octs("\x00")
maxLen = max(map(len, chunks))
paddedChunks = [
(x.ljust(maxLen, zero), x) for x in chunks
]
paddedChunks = [(x.ljust(maxLen, zero), x) for x in chunks]

chunks = [x[1] for x in paddedChunks]

return null.join(chunks), True, True


tagMap = encoder.tagMap.copy()
tagMap.update({
univ.SetOf.tagSet: SetOfEncoder()
})
tagMap.update({univ.SetOf.tagSet: SetOfEncoder()})

typeMap = encoder.typeMap.copy()
typeMap.update({
univ.SetOf.typeId: SetOfEncoder()
})
typeMap.update({univ.SetOf.typeId: SetOfEncoder()})


class Encoder(encoder.Encoder): # type: ignore[misc]
Expand Down
Loading

0 comments on commit a693b5f

Please sign in to comment.