Skip to content

Commit

Permalink
IN PROGRESS: docs: registration policies: CWT decode
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
pdxjohnny committed Nov 11, 2023
1 parent 831bd3a commit 233bd7e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 7 deletions.
51 changes: 47 additions & 4 deletions docs/registration_policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ Simple drop rule based on claim content allowlist.
}
```

**TODO** This example being brought up of date with recent CWT within COSESign1
changes. Currently the `JWKS_URI` environment variable must be set to discover
valid notary keys used with `COSESign1` and CWT signing on statement creation.

```console
$ cat private-key.pem | python -c 'import sys, json, jwcrypto.jwt; key = jwcrypto.jwt.JWK(); key.import_from_pem(sys.stdin.buffer.read()); print(json.dumps({"keys":[{**key.export_public(as_dict=True),"use": "sig","kid": key.thumbprint()}]}, indent=4, sort_keys=True))' | tee jwks
{
"keys": [
{
"crv": "P-384",
"kid": "y96luxaBaw6FeWVEMti_iqLWPSYk8cKLzZG8X45PA2k",
"kty": "EC",
"use": "sig",
"x": "ZQazDzYmcMHF5Dstkbw7SwWvR_oXQHFS-TLppri-0xDby8TmCpzHyr6TH03CLBxj",
"y": "lsIbRskEv06Rf0vttkB3vpXdZ-a50ck74MVyRwOvN55P4s8usQAm3PY1KnAgWtHF"
}
]
}
$ python -m http.server 7777 &
$ export JWKS_URI="http://localhost:7777/jwks"
```

**jsonschema_validator.py**

```python
Expand All @@ -86,34 +108,55 @@ import json
import pathlib
import traceback

import jwt
import cwt
import cwt.algs.ec2
import cbor2
import pycose
# TODO Remove this once we have a example flow for proper key verification
import jwcrypto.jwk
from jsonschema import validate, ValidationError
import pycose.keys.ec2
from pycose.messages import CoseMessage, Sign1Message

from scitt_emulator.scitt import ClaimInvalidError, COSE_Headers_Issuer
from scitt_emulator.scitt import ClaimInvalidError, CWTClaims

claim = sys.stdin.buffer.read()

msg = CoseMessage.decode(claim)

if pycose.headers.ContentType not in msg.phdr:
raise ClaimInvalidError("Claim does not have a content type header parameter")
if COSE_Headers_Issuer not in msg.phdr:
raise ClaimInvalidError("Claim does not have an issuer header parameter")

if not msg.phdr[pycose.headers.ContentType].startswith("application/json"):
raise TypeError(
f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}"
)

# TODO jwt.PyJWKClient.get_signing_key_from_cwt ?
keys = list(
[
pycose.keys.ec2.EC2Key.from_dict(
cwt.algs.ec2.EC2Key.to_cose_key(
cwt.COSEKey.from_pem(
jwcrypto.jwk.JWK.from_json(key._jwk_data).export_to_pem(),
kid=jwcrypto.jwk.JWK.from_json(key._jwk_data).thumbprint()
).to_dict()
)
)
for key in jwt.PyJWKClient(os.environ["JWKS_URI"]).get_signing_keys()
]
)
cwt_claims = cwt.decode(msg.phdr[CWTClaims], keys)

SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text())

try:
validate(
instance={
"$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json",
"issuer": msg.phdr[COSE_Headers_Issuer],
"issuer": cwt_claims[1],
"subject": cwt_claims[2],
"claim": json.loads(msg.payload.decode()),
},
schema=SCHEMA,
Expand Down
28 changes: 25 additions & 3 deletions tests/test_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
import docutils.nodes
import docutils.utils

import jwcrypto

from scitt_emulator.client import ClaimOperationError

from .test_cli import (
Service,
content_type,
payload,
execute_cli,
create_flask_app_oidc_server,
)


repo_root = pathlib.Path(__file__).parents[1]
docs_dir = repo_root.joinpath("docs")
allowlisted_issuer = "did:web:example.org"
non_allowlisted_issuer = "did:web:example.com"
CLAIM_DENIED_ERROR = {"type": "denied", "detail": "content_address_of_reason"}
CLAIM_DENIED_ERROR_BLOCKED = {
Expand Down Expand Up @@ -170,7 +172,15 @@ def test_docs_registration_policies(tmp_path):
for name, content in docutils_find_code_samples(nodes).items():
tmp_path.joinpath(name).write_text(content)

key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-384")
algorithm = "ES384"
audience = "scitt.example.org"
subject = "repo:scitt-community/scitt-api-emulator:ref:refs/heads/main"

with Service(
{"key": key, "algorithms": [algorithm]},
create_flask_app=create_flask_app_oidc_server,
) as oidc_service, Service(
{
"tree_alg": "CCF",
"workspace": workspace_path,
Expand All @@ -188,6 +198,18 @@ def test_docs_registration_policies(tmp_path):
# set the policy to enforce
service.server.app.scitt_service.service_parameters["insertPolicy"] = "external"

# replace example issuer with test OIDC service issuer (URL)
claim_denied_error_blocked = CLAIM_DENIED_ERROR_BLOCKED
claim_denied_error_blocked["detail"] = claim_denied_error_blocked["detail"].replace(
"did:web:example.org", oidc_service.url,
)
allowlist_schema_json_path = tmp_path.joinpath("allowlist.schema.json")
allowlist_schema_json_path.write_text(
allowlist_schema_json_path.read_text().replace(
"did:web:example.org", oidc_service.url,
)
)

# create denied claim
command = [
"client",
Expand Down Expand Up @@ -226,7 +248,7 @@ def test_docs_registration_policies(tmp_path):
check_error = error
assert check_error
assert "error" in check_error.operation
assert check_error.operation["error"] == CLAIM_DENIED_ERROR_BLOCKED
assert check_error.operation["error"] == claim_denied_error_blocked
assert not os.path.exists(receipt_path)
assert not os.path.exists(entry_id_path)

Expand All @@ -237,7 +259,7 @@ def test_docs_registration_policies(tmp_path):
"--out",
claim_path,
"--issuer",
allowlisted_issuer,
oidc_service.url,
"--subject",
"test",
"--content-type",
Expand Down

0 comments on commit 233bd7e

Please sign in to comment.