Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VRF work on Aleph VMs #4

Merged
merged 18 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM debian:bullseye

RUN apt-get update && apt-get -y upgrade && apt-get install -y \
libsecp256k1-dev \
python3-pip \
python3-venv \
squashfs-tools \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /usr/src/aleph_vrf
COPY . .

RUN mkdir /opt/packages
RUN pip install -t /opt/packages .
6 changes: 6 additions & 0 deletions install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

python3 -m virtualenv venv
source venv/bin/activate

pip install -e .[testing]
7 changes: 7 additions & 0 deletions prepare_venv_volume.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

docker build -t aleph-vrf .
docker run --rm -ti -v "$(pwd)":/usr/src/aleph_vrf aleph-vrf \
mksquashfs /opt/packages aleph-vrf-venv.squashfs
2 changes: 2 additions & 0 deletions run-in-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
docker build -t aleph-vrf .
docker run --rm -ti -v "$(pwd)":/usr/src/aleph_vrf aleph-vrf bash
8 changes: 6 additions & 2 deletions src/aleph_vrf/coordinator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ async def index():
async def receive_vrf() -> APIResponse:
private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)
response = {"data": ""}

vrf_response = await generate_vrf(account)
try:
response = await generate_vrf(account)
except Exception as err:
response["data"] = {"error": str(err)}

return APIResponse(data=vrf_response)
return APIResponse(data=response)
180 changes: 117 additions & 63 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import asyncio
import json
import logging
import random
from hashlib import sha3_256
from typing import Any, Dict, List, Union
from uuid import UUID, uuid4
from typing import Any, Dict, List, Type, TypeVar, Union
from uuid import uuid4

import aiohttp
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AuthenticatedAlephClient
from aleph_message.models import ItemHash
from aleph_message.status import MessageStatus
from pydantic import BaseModel
from pydantic.json import pydantic_encoder

from aleph_vrf.models import (
Expand All @@ -21,19 +23,34 @@
VRFResponseHash,
)
from aleph_vrf.settings import settings
from aleph_vrf.utils import bytes_to_int, generate_nonce, int_to_bytes, verify, xor_all
from aleph_vrf.utils import (
binary_to_bytes,
bytes_to_int,
generate_nonce,
int_to_bytes,
verify,
xor_all,
)

VRF_FUNCTION_GENERATE_PATH = "generate"
VRF_FUNCTION_PUBLISH_PATH = "publish"


logger = logging.getLogger(__name__)

VRF_FUNCTION_GENERATE_PATH = "/generate"
VRF_FUNCTION_PUBLISH_PATH = "/publish"

M = TypeVar("M", bound=BaseModel)


async def post_node_vrf(url: str, model: Type[M]) -> Union[Exception, M]:
async with aiohttp.ClientSession() as session:
async with session.post(url, timeout=60) as resp:
if resp.status != 200:
raise ValueError(f"VRF node request failed on {url}")

async def post_node_vrf(session, url):
async with session.post(url) as resp:
if resp.status != 200:
raise ValueError(f"VRF node request failed on {url}")
response = await resp.json()

response = await resp.json()
return response["data"]
return model.parse_obj(response["data"])


async def _get_corechannel_aggregate() -> Dict[str, Any]:
Expand Down Expand Up @@ -63,19 +80,28 @@ async def select_random_nodes(node_amount: int) -> List[Node]:
resource_nodes = content["data"]["corechannel"]["resource_nodes"]

for resource_node in resource_nodes:
node = Node(
hash=resource_node["hash"],
address=resource_node["address"],
score=resource_node["score"],
# Filter nodes by address and with linked status
if resource_node["status"] == "linked" and resource_node["score"] > 0.9:
node_address = resource_node["address"].strip("/")
node = Node(
hash=resource_node["hash"],
address=node_address,
score=resource_node["score"],
nesitor marked this conversation as resolved.
Show resolved Hide resolved
)
node_list.append(node)

if len(node_list) < node_amount:
raise ValueError(
f"Not enough CRNs linked, only {len(node_list)} available from {node_amount} requested"
)
node_list.append(node)

# Randomize node order
return random.sample(node_list, min(node_amount, len(node_list)))


async def generate_vrf(account: ETHAccount) -> VRFResponse:
selected_nodes = await select_random_nodes(settings.NB_EXECUTORS)
nb_executors = settings.NB_EXECUTORS
selected_nodes = await select_random_nodes(nb_executors)
selected_node_list = json.dumps(selected_nodes, default=pydantic_encoder).encode(
encoding="utf-8"
)
Expand All @@ -84,6 +110,7 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse:

vrf_request = VRFRequest(
nb_bytes=settings.NB_BYTES,
nb_executors=nb_executors,
nonce=nonce,
vrf_function=ItemHash(settings.FUNCTION),
request_id=str(uuid4()),
Expand All @@ -94,100 +121,124 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse:

request_item_hash = await publish_data(vrf_request, ref, account)

async with aiohttp.ClientSession() as session:
vrf_generated_result = await send_generate_requests(
session, selected_nodes, request_item_hash
)
logger.debug(f"Generated VRF request with item_hash {request_item_hash}")

vrf_publish_result = await send_publish_requests(
session, selected_nodes, vrf_generated_result
)
vrf_generated_result = await send_generate_requests(
selected_nodes, request_item_hash
)

vrf_response = generate_final_vrf(
nonce,
vrf_generated_result,
vrf_publish_result,
vrf_request,
)
logger.debug(
f"Received VRF generated requests from {len(vrf_generated_result)} nodes"
)
logger.debug(vrf_generated_result)
vrf_publish_result = await send_publish_requests(vrf_generated_result)

logger.debug(
f"Received VRF publish requests from {len(vrf_generated_result)} nodes"
)
logger.debug(vrf_publish_result)

vrf_response = generate_final_vrf(
nb_executors,
nonce,
vrf_generated_result,
vrf_publish_result,
vrf_request,
)

ref = f"vrf_{vrf_response.request_id}"
ref = f"vrf_{vrf_response.request_id}"

response_item_hash = await publish_data(vrf_response, ref, account)
logger.debug(f"Publishing final VRF summary")

vrf_response.message_hash = response_item_hash
response_item_hash = await publish_data(vrf_response, ref, account)

return vrf_response
vrf_response.message_hash = response_item_hash

return vrf_response


async def send_generate_requests(
session: aiohttp.ClientSession, selected_nodes: List[Node], request_item_hash: str
) -> Dict[Node, VRFResponseHash]:
selected_nodes: List[Node], request_item_hash: str
) -> Dict[str, Union[Exception, VRFResponseHash]]:
generate_tasks = []
nodes: List[str] = []
for node in selected_nodes:
url = f"{node.address}/{VRF_FUNCTION_GENERATE_PATH}/{request_item_hash}"
generate_tasks.append(asyncio.create_task(post_node_vrf(session, url)))
nodes.append(node.address)
url = f"{node.address}/vm/{settings.FUNCTION}/{VRF_FUNCTION_GENERATE_PATH}/{request_item_hash}"
generate_tasks.append(asyncio.create_task(post_node_vrf(url, VRFResponseHash)))

vrf_generated_responses = await asyncio.gather(
*generate_tasks, return_exceptions=True
)
return dict(zip(selected_nodes, vrf_generated_responses))
return dict(zip(nodes, vrf_generated_responses))


async def send_publish_requests(
session: aiohttp.ClientSession,
selected_nodes: List[Node],
vrf_generated_result: Dict[Node, VRFResponseHash],
) -> Dict[Node, VRFRandomBytes]:
vrf_generated_result: Dict[str, VRFResponseHash],
) -> Dict[str, Union[Exception, VRFRandomBytes]]:
publish_tasks = []
for node, vrf_generated_response in vrf_generated_result:
nodes: List[str] = []
for node, vrf_generated_response in vrf_generated_result.items():
nodes.append(node)
if isinstance(vrf_generated_response, Exception):
raise ValueError(f"Generate response not found for Node {node.address}")
else:
url = f"{node.address}/{VRF_FUNCTION_PUBLISH_PATH}/{vrf_generated_response.message_hash}"
publish_tasks.append(asyncio.create_task(post_node_vrf(session, url)))
raise ValueError(f"Generate response not found for Node {node}")

node_message_hash = vrf_generated_response.message_hash
url = (
f"{node}/vm/{settings.FUNCTION}"
f"/{VRF_FUNCTION_PUBLISH_PATH}/{node_message_hash}"
)
publish_tasks.append(asyncio.create_task(post_node_vrf(url, VRFRandomBytes)))

vrf_publish_responses = await asyncio.gather(*publish_tasks, return_exceptions=True)
return dict(zip(selected_nodes, vrf_publish_responses))
return dict(zip(nodes, vrf_publish_responses))


def generate_final_vrf(
nb_executors: int,
nonce: int,
vrf_generated_result: Dict[Node, VRFResponseHash],
vrf_publish_result: Dict[Node, VRFRandomBytes],
vrf_generated_result: Dict[str, VRFResponseHash],
vrf_publish_result: Dict[str, VRFRandomBytes],
vrf_request: VRFRequest,
) -> VRFResponse:
nodes_responses = []
random_numbers_list = []
for node, vrf_publish_response in vrf_publish_result:
for node, vrf_publish_response in vrf_publish_result.items():
if isinstance(vrf_publish_response, Exception):
raise ValueError(f"Publish response not found for {node.hash}")
raise ValueError(f"Publish response not found for {node}")

if (
vrf_generated_result[node].random_bytes_hash
!= vrf_publish_response.random_bytes_hash
):
generated_hash = vrf_publish_response.random_bytes_hash
publish_hash = vrf_publish_response.random_bytes_hash
raise ValueError(
f"Publish response hash ({vrf_publish_response.random_bytes_hash})"
f"different from generated one ({vrf_generated_result[node].random_bytes_hash})"
f"Publish response hash ({publish_hash})"
f"different from generated one ({generated_hash})"
)

verified = verify(
vrf_publish_response.random_bytes,
binary_to_bytes(vrf_publish_response.random_bytes),
nonce,
vrf_publish_response.random_bytes_hash,
)
if not verified:
raise ValueError(f"Failed hash verification for {vrf_publish_response.url}")
execution = vrf_publish_response.execution_id
raise ValueError(f"Failed hash verification for {execution}")

random_numbers_list.append(int_to_bytes(vrf_publish_response.random_number))
random_numbers_list.append(
int_to_bytes(int(vrf_publish_response.random_number))
)

node_response = CRNVRFResponse(
url=vrf_publish_response.url,
node_hash=node.hash,
url=node,
execution_id=vrf_publish_response.execution_id,
random_number=vrf_publish_response.random_number,
random_number=str(vrf_publish_response.random_number),
random_bytes=vrf_publish_response.random_bytes,
random_bytes_hash=vrf_generated_result[node].random_bytes_hash,
generation_message_hash=vrf_generated_result[node].message_hash,
publish_message_hash=vrf_publish_response.message_hash,
)
nodes_responses.append(node_response)

Expand All @@ -196,11 +247,12 @@ def generate_final_vrf(

return VRFResponse(
nb_bytes=settings.NB_BYTES,
nb_executors=nb_executors,
nonce=nonce,
vrf_function=settings.FUNCTION,
request_id=vrf_request.request_id,
nodes=nodes_responses,
random_number=final_random_number,
random_number=str(final_random_number),
)


Expand All @@ -209,8 +261,10 @@ async def publish_data(
) -> ItemHash:
channel = f"vrf_{data.request_id}"

logger.debug(f"Publishing message to {settings.API_HOST}")

async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
account=account, api_server=settings.API_HOST, allow_unix_sockets=False
) as client:
message, status = await client.create_post(
post_type="vrf_library_post",
Expand Down
9 changes: 4 additions & 5 deletions src/aleph_vrf/executor/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
from typing import Dict, Union

from aleph_message.models import ItemHash

from aleph_vrf.settings import settings

logger = logging.getLogger(__name__)
Expand All @@ -12,6 +10,7 @@
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AlephClient, AuthenticatedAlephClient
from aleph.sdk.vm.app import AlephApp
from aleph_message.models import ItemHash
from aleph_message.status import MessageStatus

logger.debug("import fastapi")
Expand Down Expand Up @@ -101,15 +100,15 @@ async def receive_publish(hash_message: str) -> APIResponse:
f"Random bytes not existing for execution {response_hash.execution_id}"
)

random_bytes: bytes = SAVED_GENERATED_BYTES[str(response_hash.execution_id)]
random_bytes: bytes = SAVED_GENERATED_BYTES.pop(str(response_hash.execution_id))

response_bytes = VRFRandomBytes(
request_id=response_hash.request_id,
execution_id=response_hash.execution_id,
vrf_request=response_hash.vrf_request,
random_bytes=bytes_to_binary(random_bytes),
random_bytes_hash=response_hash.random_bytes_hash,
random_number=bytes_to_int(random_bytes),
random_number=str(bytes_to_int(random_bytes)),
)

ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}"
Expand All @@ -127,7 +126,7 @@ async def publish_data(
channel = f"vrf_{data.request_id}"

async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
account=account, api_server=settings.API_HOST, allow_unix_sockets=False
) as client:
message, status = await client.create_post(
post_type="vrf_generation_post",
Expand Down
Loading