Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Use hash of claim as entry ID for content addressabbility
Browse files Browse the repository at this point in the history
Also helps federation sync state

Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
pdxjohnny committed Oct 18, 2023
1 parent 0219237 commit 9c54f9c
Showing 1 changed file with 20 additions and 10 deletions.
30 changes: 20 additions & 10 deletions scitt_emulator/scitt.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import json
import uuid
import hashlib

import cbor2
from pycose.messages import CoseMessage, Sign1Message
Expand All @@ -29,6 +30,8 @@
MOST_PERMISSIVE_INSERT_POLICY = "*"
DEFAULT_INSERT_POLICY = MOST_PERMISSIVE_INSERT_POLICY

# hash algorithm used to content address claims to entryIDs
DEFAULT_ENTRY_ID_HASH_ALGORITHM = "sha384"

class ClaimInvalidError(Exception):
pass
Expand Down Expand Up @@ -111,6 +114,12 @@ def get_claim(self, entry_id: str) -> bytes:
def submit_claim(self, claim: bytes, long_running=True) -> dict:
insert_policy = self.service_parameters.get("insertPolicy", DEFAULT_INSERT_POLICY)

try:
entry_id = self.get_entry_id(claim)
return self.get_entry(entry_id)
except EntryNotFoundError:
pass

if long_running:
return self._create_operation(claim)
elif insert_policy != MOST_PERMISSIVE_INSERT_POLICY:
Expand All @@ -124,20 +133,21 @@ def public_service_parameters(self) -> bytes:
# TODO Only export public portion of cert
return json.dumps(self.service_parameters).encode()

def _create_entry(self, claim: bytes) -> dict:
last_entry_path = self.storage_path / "last_entry_id.txt"
if last_entry_path.exists():
with open(last_entry_path, "r") as f:
last_entry_id = int(f.read())
else:
last_entry_id = 0
def get_entry_id(self, claim: bytes) -> str:
entry_id_hash_alg = self.service_parameters.get(
"entryIdHashAlgorith",
DEFAULT_ENTRY_ID_HASH_ALGORITHM,
)
entry_id_hash = hashlib.new(entry_id_hash_alg)
entry_id_hash.update(claim)
entry_id = f"{entry_id_hash_alg}:{entry_id_hash.hexdigest()}"
return entry_id

entry_id = str(last_entry_id + 1)
def _create_entry(self, claim: bytes) -> dict:
entry_id = self.get_entry_id(claim)

receipt = self._create_receipt(claim, entry_id)

last_entry_path.write_text(entry_id)

claim_path = self.storage_path / f"{entry_id}.cose"
claim_path.write_bytes(claim)

Expand Down

0 comments on commit 9c54f9c

Please sign in to comment.