From e704c3abb2ea0ebbdc171c2d93bd1075e9622e23 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 25 Sep 2023 17:15:42 +0200 Subject: [PATCH] Internal: Aleph API client as FastAPI dependency in executor Problem: we spawn 2 Aleph API clients in each API call, while one is enough. Solution: spawn the Aleph API client as a dependency to let FastAPI manage its lifecycle. --- src/aleph_vrf/executor/main.py | 167 ++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 78 deletions(-) diff --git a/src/aleph_vrf/executor/main.py b/src/aleph_vrf/executor/main.py index 5995f63..146fe29 100644 --- a/src/aleph_vrf/executor/main.py +++ b/src/aleph_vrf/executor/main.py @@ -1,5 +1,5 @@ import logging -from typing import Dict, Union, Set +from typing import Dict, Union, Set, Annotated import fastapi from aleph.sdk.exceptions import MessageNotFoundError, MultipleMessagesError @@ -18,7 +18,7 @@ from aleph_message.status import MessageStatus logger.debug("import fastapi") -from fastapi import FastAPI +from fastapi import FastAPI, Depends logger.debug("local imports") from aleph_vrf.models import ( @@ -44,6 +44,16 @@ app = AlephApp(http_app=http_app) +async def authenticated_aleph_client() -> AuthenticatedAlephClient: + private_key = get_fallback_private_key() + account = ETHAccount(private_key=private_key) + + async with AuthenticatedAlephClient( + account=account, api_server=settings.API_HOST + ) as client: + yield client + + @app.get("/") async def index(): return { @@ -73,111 +83,112 @@ async def _get_message(client: AlephClient, item_hash: ItemHash) -> PostMessage: @app.post("/generate/{vrf_request}") async def receive_generate( vrf_request: ItemHash, + aleph_client: Annotated[ + AuthenticatedAlephClient, Depends(authenticated_aleph_client) + ], ) -> APIResponse[PublishedVRFResponseHash]: global SAVED_GENERATED_BYTES, ANSWERED_REQUESTS - private_key = get_fallback_private_key() - account = ETHAccount(private_key=private_key) + message = await _get_message(client=aleph_client, item_hash=vrf_request) + generation_request = generate_request_from_message(message) - async with AlephClient(api_server=settings.API_HOST) as client: - message = await _get_message(client=client, item_hash=vrf_request) - generation_request = generate_request_from_message(message) + if generation_request.request_id in ANSWERED_REQUESTS: + raise fastapi.HTTPException( + status_code=409, + detail=f"A random number has already been generated for request {vrf_request}", + ) - if generation_request.request_id in ANSWERED_REQUESTS: - raise fastapi.HTTPException( - status_code=409, - detail=f"A random number has already been generated for request {vrf_request}", - ) + generated_bytes, hashed_bytes = generate( + generation_request.nb_bytes, generation_request.nonce + ) + SAVED_GENERATED_BYTES[generation_request.execution_id] = generated_bytes + ANSWERED_REQUESTS.add(generation_request.request_id) - generated_bytes, hashed_bytes = generate( - generation_request.nb_bytes, generation_request.nonce - ) - SAVED_GENERATED_BYTES[generation_request.execution_id] = generated_bytes - ANSWERED_REQUESTS.add(generation_request.request_id) - - response_hash = VRFResponseHash( - nb_bytes=generation_request.nb_bytes, - nonce=generation_request.nonce, - request_id=generation_request.request_id, - execution_id=generation_request.execution_id, - vrf_request=vrf_request, - random_bytes_hash=hashed_bytes, - ) + response_hash = VRFResponseHash( + nb_bytes=generation_request.nb_bytes, + nonce=generation_request.nonce, + request_id=generation_request.request_id, + execution_id=generation_request.execution_id, + vrf_request=vrf_request, + random_bytes_hash=hashed_bytes, + ) - ref = ( - f"vrf" - f"_{response_hash.request_id}" - f"_{response_hash.execution_id}" - f"_{GENERATE_MESSAGE_REF_PATH}" - ) + ref = ( + f"vrf" + f"_{response_hash.request_id}" + f"_{response_hash.execution_id}" + f"_{GENERATE_MESSAGE_REF_PATH}" + ) - message_hash = await publish_data(response_hash, ref, account) + message_hash = await publish_data( + aleph_client=aleph_client, data=response_hash, ref=ref + ) - published_response_hash = PublishedVRFResponseHash.from_vrf_response_hash( - vrf_response_hash=response_hash, message_hash=message_hash - ) + published_response_hash = PublishedVRFResponseHash.from_vrf_response_hash( + vrf_response_hash=response_hash, message_hash=message_hash + ) - return APIResponse(data=published_response_hash) + return APIResponse(data=published_response_hash) @app.post("/publish/{hash_message}") async def receive_publish( hash_message: ItemHash, + aleph_client: Annotated[ + AuthenticatedAlephClient, Depends(authenticated_aleph_client) + ], ) -> APIResponse[PublishedVRFRandomBytes]: global SAVED_GENERATED_BYTES - private_key = get_fallback_private_key() - account = ETHAccount(private_key=private_key) - - async with AlephClient(api_server=settings.API_HOST) as client: - message = await _get_message(client=client, item_hash=hash_message) - response_hash = generate_response_hash_from_message(message) + message = await _get_message(client=aleph_client, item_hash=hash_message) + response_hash = generate_response_hash_from_message(message) - if response_hash.execution_id not in SAVED_GENERATED_BYTES: - raise fastapi.HTTPException( - status_code=404, detail="The random number has already been published" - ) + if response_hash.execution_id not in SAVED_GENERATED_BYTES: + raise fastapi.HTTPException( + status_code=404, detail="The random number has already been published" + ) - random_bytes: bytes = SAVED_GENERATED_BYTES.pop(response_hash.execution_id) + random_bytes: bytes = SAVED_GENERATED_BYTES.pop(response_hash.execution_id) - response_bytes = VRFRandomBytes( - request_id=response_hash.request_id, - execution_id=response_hash.execution_id, - vrf_request=response_hash.vrf_request, - random_bytes=bytes_to_binary(random_bytes), - random_bytes_hash=response_hash.random_bytes_hash, - random_number=str(bytes_to_int(random_bytes)), - ) + response_bytes = VRFRandomBytes( + request_id=response_hash.request_id, + execution_id=response_hash.execution_id, + vrf_request=response_hash.vrf_request, + random_bytes=bytes_to_binary(random_bytes), + random_bytes_hash=response_hash.random_bytes_hash, + random_number=str(bytes_to_int(random_bytes)), + ) - ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}" + ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}" - message_hash = await publish_data(response_bytes, ref, account) - published_random_bytes = PublishedVRFRandomBytes.from_vrf_random_bytes( - vrf_random_bytes=response_bytes, message_hash=message_hash - ) + message_hash = await publish_data( + aleph_client=aleph_client, data=response_bytes, ref=ref + ) + published_random_bytes = PublishedVRFRandomBytes.from_vrf_random_bytes( + vrf_random_bytes=response_bytes, message_hash=message_hash + ) - return APIResponse(data=published_random_bytes) + return APIResponse(data=published_random_bytes) async def publish_data( - data: Union[VRFResponseHash, VRFRandomBytes], ref: str, account: ETHAccount + aleph_client: AuthenticatedAlephClient, + data: Union[VRFResponseHash, VRFRandomBytes], + ref: str, ) -> ItemHash: channel = f"vrf_{data.request_id}" - async with AuthenticatedAlephClient( - account=account, api_server=settings.API_HOST, allow_unix_sockets=False - ) as client: - message, status = await client.create_post( - post_type="vrf_generation_post", - post_content=data, - channel=channel, - ref=ref, - sync=True, + message, status = await aleph_client.create_post( + post_type="vrf_generation_post", + post_content=data, + channel=channel, + ref=ref, + sync=True, + ) + + if status != MessageStatus.PROCESSED: + raise ValueError( + f"Message could not be processed for request {data.request_id} and execution_id {data.execution_id}" ) - if status != MessageStatus.PROCESSED: - raise ValueError( - f"Message could not be processed for request {data.request_id} and execution_id {data.execution_id}" - ) - - return message.item_hash + return message.item_hash