Skip to content

Commit

Permalink
Internal: executor integration test
Browse files Browse the repository at this point in the history
Added an integration test for the normal flow of the executor.
This test starts the executor in a separate process, requests to
generate a random number, requests to publish the number and verifies
that the random number hash matches the published random number.
  • Loading branch information
odesenfans committed Sep 19, 2023
1 parent 48cf72d commit 8aeab0f
Show file tree
Hide file tree
Showing 6 changed files with 529 additions and 35 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ testing =
pytest-asyncio
pytest-cov
pytest-mock
uvicorn

[options.entry_points]
# Add here console scripts like:
Expand Down
67 changes: 54 additions & 13 deletions src/aleph_vrf/executor/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import logging
from typing import Dict, Union
from contextlib import asynccontextmanager
from typing import Dict, Union, Set
from uuid import UUID

import fastapi
from aleph.sdk.exceptions import MessageNotFoundError, MultipleMessagesError

from aleph_vrf.settings import settings

Expand All @@ -10,7 +15,7 @@
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AlephClient, AuthenticatedAlephClient
from aleph.sdk.vm.app import AlephApp
from aleph_message.models import ItemHash
from aleph_message.models import ItemHash, PostMessage
from aleph_message.status import MessageStatus

logger.debug("import fastapi")
Expand All @@ -28,15 +33,26 @@

logger.debug("imports done")

http_app = FastAPI()
app = AlephApp(http_app=http_app)

GENERATE_MESSAGE_REF_PATH = "hash"

# TODO: Use another method to save the data
ANSWERED_REQUESTS: Set[str] = set()
SAVED_GENERATED_BYTES: Dict[str, bytes] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
global ANSWERED_REQUESTS, SAVED_GENERATED_BYTES

ANSWERED_REQUESTS.clear()
SAVED_GENERATED_BYTES.clear()
yield


http_app = FastAPI(lifespan=lifespan)
app = AlephApp(http_app=http_app)


@app.get("/")
async def index():
return {
Expand All @@ -45,21 +61,46 @@ async def index():
}


async def _get_message(client: AlephClient, item_hash: ItemHash) -> PostMessage:
try:
return await client.get_message(item_hash=item_hash, message_type=PostMessage)
except MessageNotFoundError:
raise fastapi.HTTPException(
status_code=404, detail=f"Message {item_hash} not found"
)
except MultipleMessagesError:
raise fastapi.HTTPException(
status_code=409,
detail=f"Multiple messages have the following hash: {item_hash}",
)
except TypeError:
raise fastapi.HTTPException(
status_code=409, detail=f"Message {item_hash} is not a POST message"
)


@app.post("/generate/{vrf_request}")
async def receive_generate(vrf_request: str) -> APIResponse:
global SAVED_GENERATED_BYTES
async def receive_generate(vrf_request: ItemHash) -> APIResponse[VRFResponseHash]:
global SAVED_GENERATED_BYTES, ANSWERED_REQUESTS

private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)

async with AlephClient(api_server=settings.API_HOST) as client:
message = await client.get_message(item_hash=vrf_request)
message = await _get_message(client=client, item_hash=vrf_request)
generation_request = generate_request_from_message(message)

if generation_request.request_id in ANSWERED_REQUESTS:
raise fastapi.HTTPException(
status_code=409,
detail=f"A random number has already been generated for request {vrf_request}",
)

generated_bytes, hashed_bytes = generate(
generation_request.nb_bytes, generation_request.nonce
)
SAVED_GENERATED_BYTES[str(generation_request.execution_id)] = generated_bytes
ANSWERED_REQUESTS.add(generation_request.request_id)

response_hash = VRFResponseHash(
nb_bytes=generation_request.nb_bytes,
Expand All @@ -85,19 +126,19 @@ async def receive_generate(vrf_request: str) -> APIResponse:


@app.post("/publish/{hash_message}")
async def receive_publish(hash_message: str) -> APIResponse:
async def receive_publish(hash_message: ItemHash) -> APIResponse[VRFRandomBytes]:
global SAVED_GENERATED_BYTES

private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)

async with AlephClient(api_server=settings.API_HOST) as client:
message = await client.get_message(item_hash=hash_message)
message = await _get_message(client=client, item_hash=hash_message)
response_hash = generate_response_hash_from_message(message)

if not SAVED_GENERATED_BYTES[str(response_hash.execution_id)]:
raise ValueError(
f"Random bytes not existing for execution {response_hash.execution_id}"
if response_hash.execution_id not in SAVED_GENERATED_BYTES:
raise fastapi.HTTPException(
status_code=404, detail="The random number has already been published"
)

random_bytes: bytes = SAVED_GENERATED_BYTES.pop(str(response_hash.execution_id))
Expand Down
48 changes: 27 additions & 21 deletions src/aleph_vrf/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any, Dict, List, Optional
from typing import List, Optional, TypeVar, Generic
from uuid import uuid4

import fastapi
from aleph_message.models import ItemHash, PostMessage
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError, Field
from pydantic.generics import GenericModel


class Node(BaseModel):
Expand All @@ -24,19 +26,19 @@ class VRFGenerationRequest(BaseModel):
nb_bytes: int
nonce: int
request_id: str
execution_id: str
execution_id: str = Field(default_factory=lambda: str(uuid4()))
vrf_function: ItemHash


def generate_request_from_message(message: PostMessage) -> VRFGenerationRequest:
content = message.content.content
return VRFGenerationRequest(
nb_bytes=content["nb_bytes"],
nonce=content["nonce"],
request_id=content["request_id"],
execution_id=str(uuid4()),
vrf_function=ItemHash(content["vrf_function"]),
)
try:
return VRFGenerationRequest.parse_obj(content)
except ValidationError as e:
raise fastapi.HTTPException(
status_code=422,
detail=f"Could not parse content of {message.item_hash} as VRF request object: {e.json()}",
)


class VRFResponseHash(BaseModel):
Expand All @@ -51,15 +53,16 @@ class VRFResponseHash(BaseModel):

def generate_response_hash_from_message(message: PostMessage) -> VRFResponseHash:
content = message.content.content
return VRFResponseHash(
nb_bytes=content["nb_bytes"],
nonce=content["nonce"],
request_id=content["request_id"],
execution_id=content["execution_id"],
vrf_request=ItemHash(content["vrf_request"]),
random_bytes_hash=content["random_bytes_hash"],
message_hash=content["message_hash"],
)
try:
response_hash = VRFResponseHash.parse_obj(content)
except ValidationError as e:
raise fastapi.HTTPException(
422,
detail=f"Could not parse content of {message.item_hash} as VRF response hash object: {e.json()}",
)

response_hash.message_hash = message.item_hash
return response_hash


class VRFRandomBytes(BaseModel):
Expand Down Expand Up @@ -93,5 +96,8 @@ class VRFResponse(BaseModel):
message_hash: Optional[str] = None


class APIResponse(BaseModel):
data: Any
M = TypeVar("M", bound=BaseModel)


class APIResponse(GenericModel, Generic[M]):
data: M
91 changes: 90 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,94 @@
- https://docs.pytest.org/en/stable/fixture.html
- https://docs.pytest.org/en/stable/writing_plugins.html
"""
import multiprocessing
import os
import socket
from contextlib import contextmanager
from time import sleep
from typing import Union

# import pytest
import aiohttp
import fastapi.applications
import pytest
import pytest_asyncio
import uvicorn

from aleph_vrf.settings import settings
from mock_ccn import app as mock_ccn_app


def wait_for_server(host: str, port: int, nb_retries: int = 3, wait_time: int = 0.1):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)

retries = 0
while retries < nb_retries:
try:
sock.connect((host, port))
except ConnectionError:
retries += 1
sleep(wait_time)
continue

break


@contextmanager
def run_http_app(
app: Union[str, fastapi.applications.ASGIApp], host: str, port: int
) -> multiprocessing.Process:
uvicorn_process = multiprocessing.Process(
target=uvicorn.run, args=(app,), kwargs={"host": host, "port": port}
)
uvicorn_process.start()

try:
# Wait for uvicorn to start
wait_for_server(host, port)
yield uvicorn_process

finally:
uvicorn_process.terminate()
uvicorn_process.join()


@pytest.fixture
def mock_ccn() -> str:
host, port = "127.0.0.1", 4024
url = f"http://{host}:{port}"

default_api_host = settings.API_HOST

# Configure the mock CCN as API host. Note that `settings` must be modified as the object is
# already built when running all tests in the same run.
os.environ["ALEPH_VRF_API_HOST"] = url
settings.API_HOST = url

with run_http_app(app=mock_ccn_app, host=host, port=port):
yield url

# Clean up settings for other tests
del os.environ["ALEPH_VRF_API_HOST"]
settings.API_HOST = default_api_host


@pytest_asyncio.fixture
async def mock_ccn_client(mock_ccn: str):
async with aiohttp.ClientSession(mock_ccn) as client:
yield client


@pytest.fixture
def executor_server(mock_ccn: str) -> str:
assert mock_ccn, "The mock CCN server must be running"

host, port = "127.0.0.1", 8081
with run_http_app(app="aleph_vrf.executor.main:app", host=host, port=port):
yield f"http://{host}:{port}"


@pytest_asyncio.fixture
async def executor_client(executor_server: str) -> aiohttp.ClientSession:
async with aiohttp.ClientSession(executor_server) as client:
yield client
Loading

0 comments on commit 8aeab0f

Please sign in to comment.