Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make VRF work on Aleph VMs #4

Merged
merged 18 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM debian:bullseye

RUN apt-get update && apt-get -y upgrade && apt-get install -y \
libsecp256k1-dev \
python3-pip \
python3-venv \
squashfs-tools \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /usr/src/aleph_vrf
COPY . .

RUN mkdir /opt/packages
RUN pip install -t /opt/packages -r requirements.txt
6 changes: 6 additions & 0 deletions install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

python3 -m virtualenv venv
source venv/bin/activate

pip install -e .[testing]
7 changes: 7 additions & 0 deletions prepare_venv_volume.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

docker build -t aleph-vrf .
docker run --rm -ti -v "$(pwd)":/usr/src/aleph_vrf aleph-vrf \
mksquashfs /opt/packages aleph-vrf-venv.squashfs
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
aleph-sdk-python==0.7.0
fastapi
python-multipart
utilitybelt
aiohttp
nesitor marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions run-in-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
docker build -t aleph-vrf .
docker run --rm -ti -v "$(pwd)":/usr/src/aleph_vrf aleph-vrf bash
129 changes: 86 additions & 43 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import json
import logging
import random
from hashlib import sha3_256
from typing import Any, Dict, List, Union
from uuid import UUID, uuid4
from uuid import uuid4

import aiohttp
from aleph.sdk.chains.ethereum import ETHAccount
Expand All @@ -21,14 +22,24 @@
VRFResponseHash,
)
from aleph_vrf.settings import settings
from aleph_vrf.utils import bytes_to_int, generate_nonce, int_to_bytes, verify, xor_all
from aleph_vrf.utils import (
binary_to_bytes,
bytes_to_int,
generate_nonce,
int_to_bytes,
verify,
xor_all,
)

VRF_FUNCTION_GENERATE_PATH = "generate"
VRF_FUNCTION_PUBLISH_PATH = "publish"

VRF_FUNCTION_GENERATE_PATH = "/generate"
VRF_FUNCTION_PUBLISH_PATH = "/publish"

logger = logging.getLogger(__name__)


async def post_node_vrf(session, url):
nesitor marked this conversation as resolved.
Show resolved Hide resolved
async with session.post(url) as resp:
async with session.post(url, timeout=60) as resp:
if resp.status != 200:
raise ValueError(f"VRF node request failed on {url}")

Expand Down Expand Up @@ -63,12 +74,18 @@ async def select_random_nodes(node_amount: int) -> List[Node]:
resource_nodes = content["data"]["corechannel"]["resource_nodes"]

for resource_node in resource_nodes:
node = Node(
hash=resource_node["hash"],
address=resource_node["address"],
score=resource_node["score"],
)
node_list.append(node)
# Filter nodes by address and with linked status
if resource_node["status"] == "linked" and resource_node["address"] != "":
nesitor marked this conversation as resolved.
Show resolved Hide resolved
node_address = resource_node["address"].strip("/")
node = Node(
hash=resource_node["hash"],
address=node_address,
score=resource_node["score"],
nesitor marked this conversation as resolved.
Show resolved Hide resolved
)
node_list.append(node)

if len(node_list) < node_amount:
raise ValueError(f"Not enough CRNs linked, only {len(node_list)} available from {node_amount} requested")

# Randomize node order
return random.sample(node_list, min(node_amount, len(node_list)))
Expand All @@ -94,14 +111,23 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse:

request_item_hash = await publish_data(vrf_request, ref, account)

logger.debug(f"Generated VRF request with item_hash {request_item_hash}")

async with aiohttp.ClientSession() as session:
vrf_generated_result = await send_generate_requests(
session, selected_nodes, request_item_hash
)

vrf_publish_result = await send_publish_requests(
session, selected_nodes, vrf_generated_result
logger.debug(
f"Received VRF generated requests from {len(vrf_generated_result)} nodes"
)
logger.debug(vrf_generated_result)
vrf_publish_result = await send_publish_requests(session, vrf_generated_result)

logger.debug(
f"Received VRF publish requests from {len(vrf_generated_result)} nodes"
)
logger.debug(vrf_publish_result)

vrf_response = generate_final_vrf(
nonce,
Expand All @@ -112,6 +138,8 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse:

ref = f"vrf_{vrf_response.request_id}"

logger.debug(f"Publishing final VRF summary")

response_item_hash = await publish_data(vrf_response, ref, account)

vrf_response.message_hash = response_item_hash
Expand All @@ -121,73 +149,86 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse:

async def send_generate_requests(
session: aiohttp.ClientSession, selected_nodes: List[Node], request_item_hash: str
) -> Dict[Node, VRFResponseHash]:
) -> Dict[str, Dict]:
nesitor marked this conversation as resolved.
Show resolved Hide resolved
generate_tasks = []
nodes: List[str] = []
for node in selected_nodes:
url = f"{node.address}/{VRF_FUNCTION_GENERATE_PATH}/{request_item_hash}"
nodes.append(node.address)
url = f"{node.address}/vm/{settings.FUNCTION}/{VRF_FUNCTION_GENERATE_PATH}/{request_item_hash}"
generate_tasks.append(asyncio.create_task(post_node_vrf(session, url)))

vrf_generated_responses = await asyncio.gather(
*generate_tasks, return_exceptions=True
)
return dict(zip(selected_nodes, vrf_generated_responses))
return dict(zip(nodes, vrf_generated_responses))


async def send_publish_requests(
session: aiohttp.ClientSession,
selected_nodes: List[Node],
vrf_generated_result: Dict[Node, VRFResponseHash],
) -> Dict[Node, VRFRandomBytes]:
vrf_generated_result: Dict[str, Dict],
) -> Dict[str, Dict]:
publish_tasks = []
for node, vrf_generated_response in vrf_generated_result:
nodes: List[str] = []
for node, vrf_generated_response in vrf_generated_result.items():
nodes.append(node)
if isinstance(vrf_generated_response, Exception):
raise ValueError(f"Generate response not found for Node {node.address}")
raise ValueError(f"Generate response not found for Node {node}")
else:
nesitor marked this conversation as resolved.
Show resolved Hide resolved
url = f"{node.address}/{VRF_FUNCTION_PUBLISH_PATH}/{vrf_generated_response.message_hash}"
node_message_hash = vrf_generated_response["message_hash"]
url = (
f"{node}/vm/{settings.FUNCTION}"
f"/{VRF_FUNCTION_PUBLISH_PATH}/{node_message_hash}"
)
publish_tasks.append(asyncio.create_task(post_node_vrf(session, url)))

vrf_publish_responses = await asyncio.gather(*publish_tasks, return_exceptions=True)
return dict(zip(selected_nodes, vrf_publish_responses))
return dict(zip(nodes, vrf_publish_responses))


def generate_final_vrf(
nonce: int,
vrf_generated_result: Dict[Node, VRFResponseHash],
vrf_publish_result: Dict[Node, VRFRandomBytes],
vrf_generated_result: Dict[str, Dict],
vrf_publish_result: Dict[str, Dict],
vrf_request: VRFRequest,
) -> VRFResponse:
nodes_responses = []
random_numbers_list = []
for node, vrf_publish_response in vrf_publish_result:
for node, vrf_publish_response in vrf_publish_result.items():
if isinstance(vrf_publish_response, Exception):
raise ValueError(f"Publish response not found for {node.hash}")
raise ValueError(f"Publish response not found for {node}")

if (
vrf_generated_result[node].random_bytes_hash
!= vrf_publish_response.random_bytes_hash
vrf_generated_result[node]["random_bytes_hash"]
!= vrf_publish_response["random_bytes_hash"]
):
generated_hash = vrf_publish_response["random_bytes_hash"]
publish_hash = vrf_publish_response["random_bytes_hash"]
raise ValueError(
f"Publish response hash ({vrf_publish_response.random_bytes_hash})"
f"different from generated one ({vrf_generated_result[node].random_bytes_hash})"
f"Publish response hash ({publish_hash})"
f"different from generated one ({generated_hash})"
)

verified = verify(
vrf_publish_response.random_bytes,
binary_to_bytes(vrf_publish_response["random_bytes"]),
nonce,
vrf_publish_response.random_bytes_hash,
vrf_publish_response["random_bytes_hash"],
)
if not verified:
raise ValueError(f"Failed hash verification for {vrf_publish_response.url}")
execution = vrf_publish_response["execution_id"]
raise ValueError(f"Failed hash verification for {execution}")

random_numbers_list.append(int_to_bytes(vrf_publish_response.random_number))
random_numbers_list.append(
int_to_bytes(int(vrf_publish_response["random_number"]))
)

node_response = CRNVRFResponse(
url=vrf_publish_response.url,
node_hash=node.hash,
execution_id=vrf_publish_response.execution_id,
random_number=vrf_publish_response.random_number,
random_bytes=vrf_publish_response.random_bytes,
random_bytes_hash=vrf_generated_result[node].random_bytes_hash,
url=node,
execution_id=vrf_publish_response["execution_id"],
random_number=str(vrf_publish_response["random_number"]),
random_bytes=vrf_publish_response["random_bytes"],
random_bytes_hash=vrf_generated_result[node]["random_bytes_hash"],
generation_message_hash=vrf_generated_result[node]["message_hash"],
publish_message_hash=vrf_publish_response["message_hash"],
)
nodes_responses.append(node_response)

Expand All @@ -200,7 +241,7 @@ def generate_final_vrf(
vrf_function=settings.FUNCTION,
request_id=vrf_request.request_id,
nodes=nodes_responses,
random_number=final_random_number,
random_number=str(final_random_number),
)


Expand All @@ -209,8 +250,10 @@ async def publish_data(
) -> ItemHash:
channel = f"vrf_{data.request_id}"

logger.debug(f"Publishing message to {settings.API_HOST}")

async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
account=account, api_server=settings.API_HOST, allow_unix_sockets=False
) as client:
message, status = await client.create_post(
post_type="vrf_library_post",
Expand Down
7 changes: 3 additions & 4 deletions src/aleph_vrf/executor/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import logging
from typing import Dict, Union

from aleph_message.models import ItemHash

from aleph_vrf.settings import settings

logger = logging.getLogger(__name__)
Expand All @@ -12,6 +10,7 @@
from aleph.sdk.chains.ethereum import ETHAccount
from aleph.sdk.client import AlephClient, AuthenticatedAlephClient
from aleph.sdk.vm.app import AlephApp
from aleph_message.models import ItemHash
from aleph_message.status import MessageStatus

logger.debug("import fastapi")
Expand Down Expand Up @@ -109,7 +108,7 @@ async def receive_publish(hash_message: str) -> APIResponse:
vrf_request=response_hash.vrf_request,
random_bytes=bytes_to_binary(random_bytes),
random_bytes_hash=response_hash.random_bytes_hash,
random_number=bytes_to_int(random_bytes),
random_number=str(bytes_to_int(random_bytes)),
)

ref = f"vrf_{response_hash.request_id}_{response_hash.execution_id}"
Expand All @@ -127,7 +126,7 @@ async def publish_data(
channel = f"vrf_{data.request_id}"

async with AuthenticatedAlephClient(
account=account, api_server=settings.API_HOST
account=account, api_server=settings.API_HOST, allow_unix_sockets=False
) as client:
message, status = await client.create_post(
post_type="vrf_generation_post",
Expand Down
16 changes: 7 additions & 9 deletions src/aleph_vrf/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any, List, Optional
from uuid import UUID, uuid4
from typing import Any, Dict, List, Optional
from uuid import uuid4

from aleph_message.models import ItemHash, PostMessage
from pydantic import BaseModel
Expand Down Expand Up @@ -29,7 +29,6 @@ class VRFGenerationRequest(BaseModel):

def generate_request_from_message(message: PostMessage) -> VRFGenerationRequest:
content = message.content.content
print(content)
return VRFGenerationRequest(
nb_bytes=content["nb_bytes"],
nonce=content["nonce"],
Expand All @@ -54,11 +53,11 @@ def generate_response_hash_from_message(message: PostMessage) -> VRFResponseHash
return VRFResponseHash(
nb_bytes=content["nb_bytes"],
nonce=content["nonce"],
url=content["url"],
request_id=content["request_id"],
execution_id=content["execution_id"],
vrf_request=ItemHash(content["vrf_request"]),
random_bytes_hash=content["random_bytes_hash"],
message_hash=content["message_hash"],
)


Expand All @@ -68,15 +67,14 @@ class VRFRandomBytes(BaseModel):
vrf_request: ItemHash
random_bytes: str
random_bytes_hash: str
random_number: int
random_number: str
message_hash: Optional[str] = None


class CRNVRFResponse(BaseModel):
url: str
node_hash: str
execution_id: UUID
random_number: int
execution_id: str
random_number: str
random_bytes: str
random_bytes_hash: str
generation_message_hash: str
Expand All @@ -89,7 +87,7 @@ class VRFResponse(BaseModel):
vrf_function: ItemHash
request_id: str
nodes: List[CRNVRFResponse]
random_number: int
random_number: str
message_hash: Optional[str] = None


Expand Down
2 changes: 1 addition & 1 deletion src/aleph_vrf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Settings(BaseSettings):
"corechannel", description="Key for the `corechannel` aggregate."
)
FUNCTION: str = Field(
"67705389842a0a1b95eaa408b009741027964edc805997475e95c505d642edd8",
"4992b4127d296b240bbb73058daea9bca09f717fa94767d6f4dc3ef53b4ef5ce",
description="VRF function to use.",
)
NB_EXECUTORS: int = Field(32, description="Number of executors to use.")
Expand Down
Loading
Loading