Skip to content

Commit

Permalink
Problem: Sometimes the VRF request fails and the execution cannot use…
Browse files Browse the repository at this point in the history
… the same custom request id.

Solution: Allow to use the same request_id until it is finished completely.

Extra: Added some tweaks on the coordinator VM and on the preparation script.
  • Loading branch information
nesitor committed Jul 18, 2024
1 parent 3a8b592 commit 015e816
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 16 deletions.
4 changes: 3 additions & 1 deletion deployment/prepare_vrf_vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Executor], List[Executor]]:
aleph_selection_policy = ExecuteOnAleph(vm_function=executor_item_hash)
executors = await aleph_selection_policy.get_candidate_executors()
executors = await aleph_selection_policy.get_all_executors()
print(f"Preparing VRF VMs for {len(executors)} nodes")
prepare_tasks = [asyncio.create_task(prepare_executor_api_request(executor.api_url))
for executor in executors]

vrf_prepare_responses = await asyncio.gather(
*prepare_tasks, return_exceptions=True
)

prepare_results = dict(zip(executors, vrf_prepare_responses))

failed_nodes = []
Expand Down
9 changes: 9 additions & 0 deletions src/aleph_vrf/coordinator/executor_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ async def get_candidate_executors(self) -> List[VRFExecutor]:

return executors

async def get_all_executors(self) -> List[VRFExecutor]:
compute_nodes = self._list_compute_nodes()
executors: List[VRFExecutor] = [
AlephExecutor(node=node, vm_function=self.vm_function)
async for node in compute_nodes
]

return executors


class UsePredeterminedExecutors(ExecutorSelectionPolicy):
"""
Expand Down
18 changes: 11 additions & 7 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,20 @@ async def post_executor_api_request(url: str, model: Type[M]) -> M:


async def prepare_executor_api_request(url: str) -> bool:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=120) as resp:
try:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=30) as resp:
resp.raise_for_status()
response = await resp.json()
return response["name"] == "vrf_generate_api"
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error
except asyncio.TimeoutError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error


async def _generate_vrf(
Expand Down
14 changes: 7 additions & 7 deletions src/aleph_vrf/executor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
GENERATE_MESSAGE_REF_PATH = "hash"

# TODO: Use another method to save the data
ANSWERED_REQUESTS: Set[RequestId] = set()
ANSWERED_REQUESTS: Set[ItemHash] = set()
GENERATED_NUMBERS: Dict[ExecutionId, bytes] = {}

http_app = FastAPI()
Expand Down Expand Up @@ -105,21 +105,21 @@ async def receive_generate(

global GENERATED_NUMBERS, ANSWERED_REQUESTS

message = await _get_message(client=aleph_client, item_hash=vrf_request_hash)
vrf_request = get_vrf_request_from_message(message)
execution_id = ExecutionId(str(uuid4()))

if vrf_request.request_id in ANSWERED_REQUESTS:
if vrf_request_hash in ANSWERED_REQUESTS:
raise fastapi.HTTPException(
status_code=409,
detail=f"A random number has already been generated for request {vrf_request_hash}",
)

message = await _get_message(client=aleph_client, item_hash=vrf_request_hash)
vrf_request = get_vrf_request_from_message(message)
execution_id = ExecutionId(str(uuid4()))

random_number, random_number_hash = generate(
vrf_request.nb_bytes, vrf_request.nonce
)
GENERATED_NUMBERS[execution_id] = random_number
ANSWERED_REQUESTS.add(vrf_request.request_id)
ANSWERED_REQUESTS.add(vrf_request_hash)

vrf_random_number_hash = VRFRandomNumberHash(
nb_bytes=vrf_request.nb_bytes,
Expand Down
2 changes: 1 addition & 1 deletion src/aleph_vrf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Settings(BaseSettings):
default="corechannel", description="Key for the `corechannel` aggregate."
)
FUNCTION: str = Field(
default="f6a734dbc98659f030e1cd9c12d8ffb769deac55d42d5db5285fba099755c779",
default="6ad7c5ace3dfbb68954f1eea6b775d8a38610f8d0fdc48b4f1b85ccfa9795931",
description="VRF function to use.",
)
NB_EXECUTORS: int = Field(default=32, description="Number of executors to use.")
Expand Down

0 comments on commit 015e816

Please sign in to comment.