diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1ec0936 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,14 @@ +FROM debian:bullseye + +RUN apt-get update && apt-get -y upgrade && apt-get install -y \ + libsecp256k1-dev \ + python3-pip \ + python3-venv \ + squashfs-tools \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /usr/src/aleph_vrf +COPY . . + +RUN mkdir /opt/packages +RUN pip install -t /opt/packages . \ No newline at end of file diff --git a/install.sh b/install.sh new file mode 100644 index 0000000..2232b92 --- /dev/null +++ b/install.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +python3 -m virtualenv venv +source venv/bin/activate + +pip install -e .[testing] \ No newline at end of file diff --git a/prepare_venv_volume.sh b/prepare_venv_volume.sh new file mode 100755 index 0000000..e590e9e --- /dev/null +++ b/prepare_venv_volume.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +docker build -t aleph-vrf . +docker run --rm -ti -v "$(pwd)":/usr/src/aleph_vrf aleph-vrf \ + mksquashfs /opt/packages aleph-vrf-venv.squashfs \ No newline at end of file diff --git a/run-in-docker.sh b/run-in-docker.sh new file mode 100644 index 0000000..c72f88a --- /dev/null +++ b/run-in-docker.sh @@ -0,0 +1,2 @@ +docker build -t aleph-vrf . +docker run --rm -ti -v "$(pwd)":/usr/src/aleph_vrf aleph-vrf bash \ No newline at end of file diff --git a/src/aleph_vrf/coordinator/main.py b/src/aleph_vrf/coordinator/main.py index 52d8cbf..9f9b9ba 100644 --- a/src/aleph_vrf/coordinator/main.py +++ b/src/aleph_vrf/coordinator/main.py @@ -36,7 +36,11 @@ async def index(): async def receive_vrf() -> APIResponse: private_key = get_fallback_private_key() account = ETHAccount(private_key=private_key) + response = {"data": ""} - vrf_response = await generate_vrf(account) + try: + response = await generate_vrf(account) + except Exception as err: + response["data"] = {"error": str(err)} - return APIResponse(data=vrf_response) + return APIResponse(data=response) diff --git a/src/aleph_vrf/coordinator/vrf.py b/src/aleph_vrf/coordinator/vrf.py index 5a7c54c..0913cee 100644 --- a/src/aleph_vrf/coordinator/vrf.py +++ b/src/aleph_vrf/coordinator/vrf.py @@ -1,15 +1,17 @@ import asyncio import json +import logging import random from hashlib import sha3_256 -from typing import Any, Dict, List, Union -from uuid import UUID, uuid4 +from typing import Any, Dict, List, Type, TypeVar, Union +from uuid import uuid4 import aiohttp from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.client import AuthenticatedAlephClient from aleph_message.models import ItemHash from aleph_message.status import MessageStatus +from pydantic import BaseModel from pydantic.json import pydantic_encoder from aleph_vrf.models import ( @@ -21,19 +23,34 @@ VRFResponseHash, ) from aleph_vrf.settings import settings -from aleph_vrf.utils import bytes_to_int, generate_nonce, int_to_bytes, verify, xor_all +from aleph_vrf.utils import ( + binary_to_bytes, + bytes_to_int, + generate_nonce, + int_to_bytes, + verify, + xor_all, +) + +VRF_FUNCTION_GENERATE_PATH = "generate" +VRF_FUNCTION_PUBLISH_PATH = "publish" + + +logger = logging.getLogger(__name__) -VRF_FUNCTION_GENERATE_PATH = "/generate" -VRF_FUNCTION_PUBLISH_PATH = "/publish" +M = TypeVar("M", bound=BaseModel) + + +async def post_node_vrf(url: str, model: Type[M]) -> Union[Exception, M]: + async with aiohttp.ClientSession() as session: + async with session.post(url, timeout=60) as resp: + if resp.status != 200: + raise ValueError(f"VRF node request failed on {url}") -async def post_node_vrf(session, url): - async with session.post(url) as resp: - if resp.status != 200: - raise ValueError(f"VRF node request failed on {url}") + response = await resp.json() - response = await resp.json() - return response["data"] + return model.parse_obj(response["data"]) async def _get_corechannel_aggregate() -> Dict[str, Any]: @@ -63,19 +80,28 @@ async def select_random_nodes(node_amount: int) -> List[Node]: resource_nodes = content["data"]["corechannel"]["resource_nodes"] for resource_node in resource_nodes: - node = Node( - hash=resource_node["hash"], - address=resource_node["address"], - score=resource_node["score"], + # Filter nodes by address and with linked status + if resource_node["status"] == "linked" and resource_node["score"] > 0.9: + node_address = resource_node["address"].strip("/") + node = Node( + hash=resource_node["hash"], + address=node_address, + score=resource_node["score"], + ) + node_list.append(node) + + if len(node_list) < node_amount: + raise ValueError( + f"Not enough CRNs linked, only {len(node_list)} available from {node_amount} requested" ) - node_list.append(node) # Randomize node order return random.sample(node_list, min(node_amount, len(node_list))) async def generate_vrf(account: ETHAccount) -> VRFResponse: - selected_nodes = await select_random_nodes(settings.NB_EXECUTORS) + nb_executors = settings.NB_EXECUTORS + selected_nodes = await select_random_nodes(nb_executors) selected_node_list = json.dumps(selected_nodes, default=pydantic_encoder).encode( encoding="utf-8" ) @@ -84,6 +110,7 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse: vrf_request = VRFRequest( nb_bytes=settings.NB_BYTES, + nb_executors=nb_executors, nonce=nonce, vrf_function=ItemHash(settings.FUNCTION), request_id=str(uuid4()), @@ -94,100 +121,124 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse: request_item_hash = await publish_data(vrf_request, ref, account) - async with aiohttp.ClientSession() as session: - vrf_generated_result = await send_generate_requests( - session, selected_nodes, request_item_hash - ) + logger.debug(f"Generated VRF request with item_hash {request_item_hash}") - vrf_publish_result = await send_publish_requests( - session, selected_nodes, vrf_generated_result - ) + vrf_generated_result = await send_generate_requests( + selected_nodes, request_item_hash + ) - vrf_response = generate_final_vrf( - nonce, - vrf_generated_result, - vrf_publish_result, - vrf_request, - ) + logger.debug( + f"Received VRF generated requests from {len(vrf_generated_result)} nodes" + ) + logger.debug(vrf_generated_result) + vrf_publish_result = await send_publish_requests(vrf_generated_result) + + logger.debug( + f"Received VRF publish requests from {len(vrf_generated_result)} nodes" + ) + logger.debug(vrf_publish_result) + + vrf_response = generate_final_vrf( + nb_executors, + nonce, + vrf_generated_result, + vrf_publish_result, + vrf_request, + ) - ref = f"vrf_{vrf_response.request_id}" + ref = f"vrf_{vrf_response.request_id}" - response_item_hash = await publish_data(vrf_response, ref, account) + logger.debug(f"Publishing final VRF summary") - vrf_response.message_hash = response_item_hash + response_item_hash = await publish_data(vrf_response, ref, account) - return vrf_response + vrf_response.message_hash = response_item_hash + + return vrf_response async def send_generate_requests( - session: aiohttp.ClientSession, selected_nodes: List[Node], request_item_hash: str -) -> Dict[Node, VRFResponseHash]: + selected_nodes: List[Node], request_item_hash: str +) -> Dict[str, Union[Exception, VRFResponseHash]]: generate_tasks = [] + nodes: List[str] = [] for node in selected_nodes: - url = f"{node.address}/{VRF_FUNCTION_GENERATE_PATH}/{request_item_hash}" - generate_tasks.append(asyncio.create_task(post_node_vrf(session, url))) + nodes.append(node.address) + url = f"{node.address}/vm/{settings.FUNCTION}/{VRF_FUNCTION_GENERATE_PATH}/{request_item_hash}" + generate_tasks.append(asyncio.create_task(post_node_vrf(url, VRFResponseHash))) vrf_generated_responses = await asyncio.gather( *generate_tasks, return_exceptions=True ) - return dict(zip(selected_nodes, vrf_generated_responses)) + return dict(zip(nodes, vrf_generated_responses)) async def send_publish_requests( - session: aiohttp.ClientSession, - selected_nodes: List[Node], - vrf_generated_result: Dict[Node, VRFResponseHash], -) -> Dict[Node, VRFRandomBytes]: + vrf_generated_result: Dict[str, VRFResponseHash], +) -> Dict[str, Union[Exception, VRFRandomBytes]]: publish_tasks = [] - for node, vrf_generated_response in vrf_generated_result: + nodes: List[str] = [] + for node, vrf_generated_response in vrf_generated_result.items(): + nodes.append(node) if isinstance(vrf_generated_response, Exception): - raise ValueError(f"Generate response not found for Node {node.address}") - else: - url = f"{node.address}/{VRF_FUNCTION_PUBLISH_PATH}/{vrf_generated_response.message_hash}" - publish_tasks.append(asyncio.create_task(post_node_vrf(session, url))) + raise ValueError(f"Generate response not found for Node {node}") + + node_message_hash = vrf_generated_response.message_hash + url = ( + f"{node}/vm/{settings.FUNCTION}" + f"/{VRF_FUNCTION_PUBLISH_PATH}/{node_message_hash}" + ) + publish_tasks.append(asyncio.create_task(post_node_vrf(url, VRFRandomBytes))) vrf_publish_responses = await asyncio.gather(*publish_tasks, return_exceptions=True) - return dict(zip(selected_nodes, vrf_publish_responses)) + return dict(zip(nodes, vrf_publish_responses)) def generate_final_vrf( + nb_executors: int, nonce: int, - vrf_generated_result: Dict[Node, VRFResponseHash], - vrf_publish_result: Dict[Node, VRFRandomBytes], + vrf_generated_result: Dict[str, VRFResponseHash], + vrf_publish_result: Dict[str, VRFRandomBytes], vrf_request: VRFRequest, ) -> VRFResponse: nodes_responses = [] random_numbers_list = [] - for node, vrf_publish_response in vrf_publish_result: + for node, vrf_publish_response in vrf_publish_result.items(): if isinstance(vrf_publish_response, Exception): - raise ValueError(f"Publish response not found for {node.hash}") + raise ValueError(f"Publish response not found for {node}") if ( vrf_generated_result[node].random_bytes_hash != vrf_publish_response.random_bytes_hash ): + generated_hash = vrf_publish_response.random_bytes_hash + publish_hash = vrf_publish_response.random_bytes_hash raise ValueError( - f"Publish response hash ({vrf_publish_response.random_bytes_hash})" - f"different from generated one ({vrf_generated_result[node].random_bytes_hash})" + f"Publish response hash ({publish_hash})" + f"different from generated one ({generated_hash})" ) verified = verify( - vrf_publish_response.random_bytes, + binary_to_bytes(vrf_publish_response.random_bytes), nonce, vrf_publish_response.random_bytes_hash, ) if not verified: - raise ValueError(f"Failed hash verification for {vrf_publish_response.url}") + execution = vrf_publish_response.execution_id + raise ValueError(f"Failed hash verification for {execution}") - random_numbers_list.append(int_to_bytes(vrf_publish_response.random_number)) + random_numbers_list.append( + int_to_bytes(int(vrf_publish_response.random_number)) + ) node_response = CRNVRFResponse( - url=vrf_publish_response.url, - node_hash=node.hash, + url=node, execution_id=vrf_publish_response.execution_id, - random_number=vrf_publish_response.random_number, + random_number=str(vrf_publish_response.random_number), random_bytes=vrf_publish_response.random_bytes, random_bytes_hash=vrf_generated_result[node].random_bytes_hash, + generation_message_hash=vrf_generated_result[node].message_hash, + publish_message_hash=vrf_publish_response.message_hash, ) nodes_responses.append(node_response) @@ -196,11 +247,12 @@ def generate_final_vrf( return VRFResponse( nb_bytes=settings.NB_BYTES, + nb_executors=nb_executors, nonce=nonce, vrf_function=settings.FUNCTION, request_id=vrf_request.request_id, nodes=nodes_responses, - random_number=final_random_number, + random_number=str(final_random_number), ) @@ -209,8 +261,10 @@ async def publish_data( ) -> ItemHash: channel = f"vrf_{data.request_id}" + logger.debug(f"Publishing message to {settings.API_HOST}") + async with AuthenticatedAlephClient( - account=account, api_server=settings.API_HOST + account=account, api_server=settings.API_HOST, allow_unix_sockets=False ) as client: message, status = await client.create_post( post_type="vrf_library_post", diff --git a/src/aleph_vrf/executor/main.py b/src/aleph_vrf/executor/main.py index 8dfd9e1..a6d82a4 100644 --- a/src/aleph_vrf/executor/main.py +++ b/src/aleph_vrf/executor/main.py @@ -1,8 +1,6 @@ import logging from typing import Dict, Union -from aleph_message.models import ItemHash - from aleph_vrf.settings import settings logger = logging.getLogger(__name__) @@ -12,6 +10,7 @@ from aleph.sdk.chains.ethereum import ETHAccount from aleph.sdk.client import AlephClient, AuthenticatedAlephClient from aleph.sdk.vm.app import AlephApp +from aleph_message.models import ItemHash from aleph_message.status import MessageStatus logger.debug("import fastapi") @@ -101,7 +100,7 @@ async def receive_publish(hash_message: str) -> APIResponse: f"Random bytes not existing for execution {response_hash.execution_id}" ) - random_bytes: bytes = SAVED_GENERATED_BYTES[str(response_hash.execution_id)] + random_bytes: bytes = SAVED_GENERATED_BYTES.pop(str(response_hash.execution_id)) response_bytes = VRFRandomBytes( request_id=response_hash.request_id, @@ -109,7 +108,7 @@ async def receive_publish(hash_message: str) -> APIResponse: vrf_request=response_hash.vrf_request, random_bytes=bytes_to_binary(random_bytes), random_bytes_hash=response_hash.random_bytes_hash, - random_number=bytes_to_int(random_bytes), + random_number=str(bytes_to_int(random_bytes)), ) ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}" @@ -127,7 +126,7 @@ async def publish_data( channel = f"vrf_{data.request_id}" async with AuthenticatedAlephClient( - account=account, api_server=settings.API_HOST + account=account, api_server=settings.API_HOST, allow_unix_sockets=False ) as client: message, status = await client.create_post( post_type="vrf_generation_post", diff --git a/src/aleph_vrf/models.py b/src/aleph_vrf/models.py index 7434de5..126b500 100644 --- a/src/aleph_vrf/models.py +++ b/src/aleph_vrf/models.py @@ -1,5 +1,5 @@ -from typing import Any, List, Optional -from uuid import UUID, uuid4 +from typing import Any, Dict, List, Optional +from uuid import uuid4 from aleph_message.models import ItemHash, PostMessage from pydantic import BaseModel @@ -13,6 +13,7 @@ class Node(BaseModel): class VRFRequest(BaseModel): nb_bytes: int + nb_executors: int nonce: int vrf_function: ItemHash request_id: str @@ -29,7 +30,6 @@ class VRFGenerationRequest(BaseModel): def generate_request_from_message(message: PostMessage) -> VRFGenerationRequest: content = message.content.content - print(content) return VRFGenerationRequest( nb_bytes=content["nb_bytes"], nonce=content["nonce"], @@ -54,11 +54,11 @@ def generate_response_hash_from_message(message: PostMessage) -> VRFResponseHash return VRFResponseHash( nb_bytes=content["nb_bytes"], nonce=content["nonce"], - url=content["url"], request_id=content["request_id"], execution_id=content["execution_id"], vrf_request=ItemHash(content["vrf_request"]), random_bytes_hash=content["random_bytes_hash"], + message_hash=content["message_hash"], ) @@ -68,15 +68,14 @@ class VRFRandomBytes(BaseModel): vrf_request: ItemHash random_bytes: str random_bytes_hash: str - random_number: int + random_number: str message_hash: Optional[str] = None class CRNVRFResponse(BaseModel): url: str - node_hash: str - execution_id: UUID - random_number: int + execution_id: str + random_number: str random_bytes: str random_bytes_hash: str generation_message_hash: str @@ -85,11 +84,12 @@ class CRNVRFResponse(BaseModel): class VRFResponse(BaseModel): nb_bytes: int + nb_executors: int nonce: int vrf_function: ItemHash request_id: str nodes: List[CRNVRFResponse] - random_number: int + random_number: str message_hash: Optional[str] = None diff --git a/src/aleph_vrf/settings.py b/src/aleph_vrf/settings.py index d58be77..ffa2ffc 100644 --- a/src/aleph_vrf/settings.py +++ b/src/aleph_vrf/settings.py @@ -14,7 +14,7 @@ class Settings(BaseSettings): "corechannel", description="Key for the `corechannel` aggregate." ) FUNCTION: str = Field( - "67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8", + "4992b4127d296b240bbb73058daea9bca09f717fa94767d6f4dc3ef53b4ef5ce", description="VRF function to use.", ) NB_EXECUTORS: int = Field(32, description="Number of executors to use.") diff --git a/src/aleph_vrf/utils.py b/src/aleph_vrf/utils.py index f4a827d..e6fc628 100644 --- a/src/aleph_vrf/utils.py +++ b/src/aleph_vrf/utils.py @@ -28,6 +28,11 @@ def bytes_to_binary(x: bytes) -> str: return "".join(format(b, "08b") for b in x) +def binary_to_bytes(s: str): + """Converts binary string to bytes.""" + return int(s, 2).to_bytes((len(s) + 7) // 8, byteorder="big") + + def generate_nonce() -> int: """Generates pseudo-random nonce number.""" return randint(0, 100000000) @@ -40,6 +45,6 @@ def generate(n: int, nonce: int) -> (bytes, bytes): return random_bytes, random_hash -def verify(random_bytes: bytes, nonce: int, random_hash: bytes) -> bool: +def verify(random_bytes: bytes, nonce: int, random_hash: str) -> bool: """Verifies that the random bytes were generated by the given nonce.""" return random_hash == sha3_256(random_bytes + int_to_bytes(nonce)).hexdigest() diff --git a/tests/coordinator/test_vrf.py b/tests/coordinator/test_vrf.py index 9b37343..6ce8356 100644 --- a/tests/coordinator/test_vrf.py +++ b/tests/coordinator/test_vrf.py @@ -41,7 +41,7 @@ def fixture_nodes_aggregate() -> Dict[str, Any]: "time": 1643047441.046, "type": "compute", "owner": "0x7057C12A7E270B9Db0E4c0d87c23Ba75fC5D82B1", - "score": 0.0, + "score": 0, "banner": "", "locked": "", "parent": None, @@ -123,6 +123,12 @@ async def test_select_random_nodes(fixture_nodes_aggregate: Dict[str, Any], mock nodes = await select_random_nodes(3) assert len(nodes) == 3 - resource_nodes = fixture_nodes_aggregate["data"]["corechannel"]["resource_nodes"] - nodes = await select_random_nodes(len(resource_nodes) + 1) - assert len(nodes) == len(resource_nodes) + with pytest.raises(ValueError) as exception: + resource_nodes = fixture_nodes_aggregate["data"]["corechannel"][ + "resource_nodes" + ] + await select_random_nodes(len(resource_nodes)) + assert ( + str(exception.value) + == f"Not enough CRNs linked, only 3 available from 4 requested" + )