Skip to content

Commit

Permalink
tests: federation: activitypub: bovine: Working testcase
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
pdxjohnny committed Nov 20, 2023
1 parent 4cd2671 commit b402cb9
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 28 deletions.
14 changes: 10 additions & 4 deletions scitt_emulator/federation_activitypub_bovine.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ async def federate_created_entries_pass_client(
created_entry: SCITTSignalsFederationCreatedEntry = None,
):
nonlocal client
signals.add_background_task(
federate_created_entries(client, sender, created_entry)
)
nonlocal signals
await federate_created_entries(client, sender, created_entry)

client.federate_created_entries = types.MethodType(
signals.federation.created_entry.connect(
Expand Down Expand Up @@ -251,7 +250,7 @@ async def federate_created_entries_pass_client(
# Send signal to submit federated claim
# TODO Announce that this entry ID was created via
# federation to avoid an infinate loop
signals.federation.submit_claim.send(
await signals.federation.submit_claim.send_async(
client, claim=claim
)
except Exception as ex:
Expand Down Expand Up @@ -294,6 +293,13 @@ async def federate_created_entries(
sender: SCITTServiceEmulator,
created_entry: SCITTSignalsFederationCreatedEntry = None,
):
print()
print()
print()
print(client, sender, created_entry)
print()
print()
print()
try:
logger.info("federate_created_entry() created_entry: %r", created_entry)
note = (
Expand Down
36 changes: 17 additions & 19 deletions scitt_emulator/scitt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ def connect_signals(self):
self.signal_receiver_submit_claim,
)

def signal_receiver_submit_claim(self, _sender, claim: bytes) -> None:
self.submit_claim(claim, long_running=False)
async def signal_receiver_submit_claim(self, _sender, claim: bytes) -> None:
await self.submit_claim(claim, long_running=False)

@abstractmethod
def initialize_service(self):
Expand All @@ -87,7 +87,7 @@ def create_receipt_contents(self, countersign_tbi: bytes, entry_id: str):
def verify_receipt_contents(receipt_contents: list, countersign_tbi: bytes):
raise NotImplementedError

def get_operation(self, operation_id: str) -> dict:
async def get_operation(self, operation_id: str) -> dict:
operation_path = self.operations_path / f"{operation_id}.json"
try:
with open(operation_path, "r") as f:
Expand All @@ -98,7 +98,7 @@ def get_operation(self, operation_id: str) -> dict:
if operation["status"] == "running":
# Pretend that the service finishes the operation after
# the client having checked the operation status once.
operation = self._finish_operation(operation)
operation = await self._finish_operation(operation)
return operation

def get_entry(self, entry_id: str) -> dict:
Expand All @@ -118,7 +118,7 @@ def get_claim(self, entry_id: str) -> bytes:
raise EntryNotFoundError(f"Entry {entry_id} not found")
return claim

def submit_claim(self, claim: bytes, long_running=True) -> dict:
async def submit_claim(self, claim: bytes, long_running=True) -> dict:
insert_policy = self.service_parameters.get("insertPolicy", DEFAULT_INSERT_POLICY)

try:
Expand All @@ -134,7 +134,7 @@ def submit_claim(self, claim: bytes, long_running=True) -> dict:
f"non-* insertPolicy only works with long_running=True: {insert_policy!r}"
)
else:
return self._create_entry(claim)
return await self._create_entry(claim)

def public_service_parameters(self) -> bytes:
# TODO Only export public portion of cert
Expand All @@ -150,7 +150,7 @@ def get_entry_id(self, claim: bytes) -> str:
entry_id = f"{entry_id_hash_alg}:{entry_id_hash.hexdigest()}"
return entry_id

def _create_entry(self, claim: bytes) -> dict:
async def _create_entry(self, claim: bytes) -> dict:
entry_id = self.get_entry_id(claim)

receipt = self._create_receipt(claim, entry_id)
Expand All @@ -162,16 +162,14 @@ def _create_entry(self, claim: bytes) -> dict:

entry = {"entryId": entry_id}

self.signals.add_background_task(
self.signals.federation.created_entry.send_async(
self,
created_entry=SCITTSignalsFederationCreatedEntry(
tree_alg=self.tree_alg,
entry_id=entry_id,
receipt=receipt,
claim=claim,
public_service_parameters=self.public_service_parameters(),
)
await self.signals.federation.created_entry.send_async(
self,
created_entry=SCITTSignalsFederationCreatedEntry(
tree_alg=self.tree_alg,
entry_id=entry_id,
receipt=receipt,
claim=claim,
public_service_parameters=self.public_service_parameters(),
)
)

Expand Down Expand Up @@ -233,7 +231,7 @@ def _sync_policy_result(self, operation: dict):

return policy_result

def _finish_operation(self, operation: dict):
async def _finish_operation(self, operation: dict):
operation_id = operation["operationId"]
operation_path = self.operations_path / f"{operation_id}.json"
claim_src_path = self.operations_path / f"{operation_id}.cose"
Expand All @@ -250,7 +248,7 @@ def _finish_operation(self, operation: dict):
return operation

claim = claim_src_path.read_bytes()
entry = self._create_entry(claim)
entry = await self._create_entry(claim)
claim_src_path.unlink()

operation["status"] = "succeeded"
Expand Down
4 changes: 2 additions & 2 deletions scitt_emulator/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def submit_claim():
return await make_unavailable_error()
try:
# NOTE This got refactored to support content addressable claims
result = app.scitt_service.submit_claim(await request.get_data(), long_running=use_lro)
result = await app.scitt_service.submit_claim(await request.get_data(), long_running=use_lro)
if "operationId" in result:
headers = {
"Location": f"{request.host_url}/operations/{result['operationId']}",
Expand All @@ -118,7 +118,7 @@ async def get_operation(operation_id: str):
if is_unavailable():
return await make_unavailable_error()
try:
operation = app.scitt_service.get_operation(operation_id)
operation = await app.scitt_service.get_operation(operation_id)
except OperationNotFoundError as e:
return await make_error("operationNotFound", str(e), 404)
headers = {}
Expand Down
6 changes: 3 additions & 3 deletions tests/test_federation_activitypub_bovine.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ async def make_client_session(self):
# Create claims in each instance
claims = []
for handle_name, service in services.items():
our_service = services[handle_name]

# create claim
command = [
"client",
Expand Down Expand Up @@ -276,6 +274,8 @@ async def make_client_session(self):
}
)

# await asyncio.sleep(100)

# Test that we can download claims from all instances federated with
for handle_name, service in services.items():
for claim in claims:
Expand All @@ -301,7 +301,7 @@ async def make_client_session(self):
# TODO Retry with backoff with cap
# TODO Remove try except, fix federation
error = None
for i in range(0, 50):
for i in range(0, 10):
try:
execute_cli(command)
break
Expand Down

0 comments on commit b402cb9

Please sign in to comment.