Skip to content

Commit

Permalink
Internal: Aleph API client as FastAPI dependency in executor
Browse files Browse the repository at this point in the history
Problem: we spawn 2 Aleph API clients in each API call, while one
is enough.

Solution: spawn the Aleph API client as a dependency to let FastAPI
manage its lifecycle.
  • Loading branch information
odesenfans committed Sep 25, 2023
1 parent a99b7db commit fe763be
Showing 1 changed file with 89 additions and 78 deletions.
167 changes: 89 additions & 78 deletions src/aleph_vrf/executor/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Dict, Union, Set
from typing import Dict, Union, Set, Annotated

import fastapi
from aleph.sdk.exceptions import MessageNotFoundError, MultipleMessagesError
Expand All @@ -18,7 +18,7 @@
from aleph_message.status import MessageStatus

logger.debug("import fastapi")
from fastapi import FastAPI
from fastapi import FastAPI, Depends

logger.debug("local imports")
from aleph_vrf.models import (
Expand All @@ -44,6 +44,16 @@
app = AlephApp(http_app=http_app)


async def authenticated_aleph_client() -> AuthenticatedAlephClient:
private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)

async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
) as client:
yield client


@app.get("/")
async def index():
return {
Expand Down Expand Up @@ -73,111 +83,112 @@ async def _get_message(client: AlephClient, item_hash: ItemHash) -> PostMessage:
@app.post("/generate/{vrf_request}")
async def receive_generate(
vrf_request: ItemHash,
aleph_client: Annotated[
AuthenticatedAlephClient, Depends(authenticated_aleph_client)
],
) -> APIResponse[PublishedVRFResponseHash]:
global SAVED_GENERATED_BYTES, ANSWERED_REQUESTS

private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)
message = await _get_message(client=aleph_client, item_hash=vrf_request)
generation_request = generate_request_from_message(message)

async with AlephClient(api_server=settings.API_HOST) as client:
message = await _get_message(client=client, item_hash=vrf_request)
generation_request = generate_request_from_message(message)
if generation_request.request_id in ANSWERED_REQUESTS:
raise fastapi.HTTPException(
status_code=409,
detail=f"A random number has already been generated for request {vrf_request}",
)

if generation_request.request_id in ANSWERED_REQUESTS:
raise fastapi.HTTPException(
status_code=409,
detail=f"A random number has already been generated for request {vrf_request}",
)
generated_bytes, hashed_bytes = generate(
generation_request.nb_bytes, generation_request.nonce
)
SAVED_GENERATED_BYTES[generation_request.execution_id] = generated_bytes
ANSWERED_REQUESTS.add(generation_request.request_id)

generated_bytes, hashed_bytes = generate(
generation_request.nb_bytes, generation_request.nonce
)
SAVED_GENERATED_BYTES[generation_request.execution_id] = generated_bytes
ANSWERED_REQUESTS.add(generation_request.request_id)

response_hash = VRFResponseHash(
nb_bytes=generation_request.nb_bytes,
nonce=generation_request.nonce,
request_id=generation_request.request_id,
execution_id=generation_request.execution_id,
vrf_request=vrf_request,
random_bytes_hash=hashed_bytes,
)
response_hash = VRFResponseHash(
nb_bytes=generation_request.nb_bytes,
nonce=generation_request.nonce,
request_id=generation_request.request_id,
execution_id=generation_request.execution_id,
vrf_request=vrf_request,
random_bytes_hash=hashed_bytes,
)

ref = (
f"vrf"
f"_{response_hash.request_id}"
f"_{response_hash.execution_id}"
f"_{GENERATE_MESSAGE_REF_PATH}"
)
ref = (
f"vrf"
f"_{response_hash.request_id}"
f"_{response_hash.execution_id}"
f"_{GENERATE_MESSAGE_REF_PATH}"
)

message_hash = await publish_data(response_hash, ref, account)
message_hash = await publish_data(
aleph_client=aleph_client, data=response_hash, ref=ref
)

published_response_hash = PublishedVRFResponseHash.from_vrf_response_hash(
vrf_response_hash=response_hash, message_hash=message_hash
)
published_response_hash = PublishedVRFResponseHash.from_vrf_response_hash(
vrf_response_hash=response_hash, message_hash=message_hash
)

return APIResponse(data=published_response_hash)
return APIResponse(data=published_response_hash)


@app.post("/publish/{hash_message}")
async def receive_publish(
hash_message: ItemHash,
aleph_client: Annotated[
AuthenticatedAlephClient, Depends(authenticated_aleph_client)
],
) -> APIResponse[PublishedVRFRandomBytes]:
global SAVED_GENERATED_BYTES

private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)

async with AlephClient(api_server=settings.API_HOST) as client:
message = await _get_message(client=client, item_hash=hash_message)
response_hash = generate_response_hash_from_message(message)
message = await _get_message(client=aleph_client, item_hash=hash_message)
response_hash = generate_response_hash_from_message(message)

if response_hash.execution_id not in SAVED_GENERATED_BYTES:
raise fastapi.HTTPException(
status_code=404, detail="The random number has already been published"
)
if response_hash.execution_id not in SAVED_GENERATED_BYTES:
raise fastapi.HTTPException(
status_code=404, detail="The random number has already been published"
)

random_bytes: bytes = SAVED_GENERATED_BYTES.pop(response_hash.execution_id)
random_bytes: bytes = SAVED_GENERATED_BYTES.pop(response_hash.execution_id)

response_bytes = VRFRandomBytes(
request_id=response_hash.request_id,
execution_id=response_hash.execution_id,
vrf_request=response_hash.vrf_request,
random_bytes=bytes_to_binary(random_bytes),
random_bytes_hash=response_hash.random_bytes_hash,
random_number=str(bytes_to_int(random_bytes)),
)
response_bytes = VRFRandomBytes(
request_id=response_hash.request_id,
execution_id=response_hash.execution_id,
vrf_request=response_hash.vrf_request,
random_bytes=bytes_to_binary(random_bytes),
random_bytes_hash=response_hash.random_bytes_hash,
random_number=str(bytes_to_int(random_bytes)),
)

ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}"
ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}"

message_hash = await publish_data(response_bytes, ref, account)
published_random_bytes = PublishedVRFRandomBytes.from_vrf_random_bytes(
vrf_random_bytes=response_bytes, message_hash=message_hash
)
message_hash = await publish_data(
aleph_client=aleph_client, data=response_bytes, ref=ref
)
published_random_bytes = PublishedVRFRandomBytes.from_vrf_random_bytes(
vrf_random_bytes=response_bytes, message_hash=message_hash
)

return APIResponse(data=published_random_bytes)
return APIResponse(data=published_random_bytes)


async def publish_data(
data: Union[VRFResponseHash, VRFRandomBytes], ref: str, account: ETHAccount
aleph_client: AuthenticatedAlephClient,
data: Union[VRFResponseHash, VRFRandomBytes],
ref: str,
) -> ItemHash:
channel = f"vrf_{data.request_id}"

async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST, allow_unix_sockets=False
) as client:
message, status = await client.create_post(
post_type="vrf_generation_post",
post_content=data,
channel=channel,
ref=ref,
sync=True,
message, status = await aleph_client.create_post(
post_type="vrf_generation_post",
post_content=data,
channel=channel,
ref=ref,
sync=True,
)

if status != MessageStatus.PROCESSED:
raise ValueError(
f"Message could not be processed for request {data.request_id} and execution_id {data.execution_id}"
)

if status != MessageStatus.PROCESSED:
raise ValueError(
f"Message could not be processed for request {data.request_id} and execution_id {data.execution_id}"
)

return message.item_hash
return message.item_hash

0 comments on commit fe763be

Please sign in to comment.