diff --git a/deployment/prepare_vrf_vms.py b/deployment/prepare_vrf_vms.py index 88ec933..321723d 100644 --- a/deployment/prepare_vrf_vms.py +++ b/deployment/prepare_vrf_vms.py @@ -14,13 +14,15 @@ async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Executor], List[Executor]]: aleph_selection_policy = ExecuteOnAleph(vm_function=executor_item_hash) - executors = await aleph_selection_policy.get_candidate_executors() + executors = await aleph_selection_policy.get_all_executors() + print(f"Preparing VRF VMs for {len(executors)} nodes") prepare_tasks = [asyncio.create_task(prepare_executor_api_request(executor.api_url)) for executor in executors] vrf_prepare_responses = await asyncio.gather( *prepare_tasks, return_exceptions=True ) + prepare_results = dict(zip(executors, vrf_prepare_responses)) failed_nodes = [] diff --git a/src/aleph_vrf/coordinator/executor_selection.py b/src/aleph_vrf/coordinator/executor_selection.py index 5a7d20d..e5a9a67 100644 --- a/src/aleph_vrf/coordinator/executor_selection.py +++ b/src/aleph_vrf/coordinator/executor_selection.py @@ -127,6 +127,15 @@ async def get_candidate_executors(self) -> List[VRFExecutor]: return executors + async def get_all_executors(self) -> List[VRFExecutor]: + compute_nodes = self._list_compute_nodes() + executors: List[VRFExecutor] = [ + AlephExecutor(node=node, vm_function=self.vm_function) + async for node in compute_nodes + ] + + return executors + class UsePredeterminedExecutors(ExecutorSelectionPolicy): """ diff --git a/src/aleph_vrf/coordinator/vrf.py b/src/aleph_vrf/coordinator/vrf.py index 4ad09f8..7558da1 100644 --- a/src/aleph_vrf/coordinator/vrf.py +++ b/src/aleph_vrf/coordinator/vrf.py @@ -64,16 +64,20 @@ async def post_executor_api_request(url: str, model: Type[M]) -> M: async def prepare_executor_api_request(url: str) -> bool: - async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=120) as resp: - try: + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=30) as resp: resp.raise_for_status() response = await resp.json() return response["name"] == "vrf_generate_api" - except aiohttp.ClientResponseError as error: - raise ExecutorHttpError( - url=url, status_code=resp.status, response_text=await resp.text() - ) from error + except aiohttp.ClientResponseError as error: + raise ExecutorHttpError( + url=url, status_code=resp.status, response_text=await resp.text() + ) from error + except asyncio.TimeoutError as error: + raise ExecutorHttpError( + url=url, status_code=resp.status, response_text=await resp.text() + ) from error async def _generate_vrf( diff --git a/src/aleph_vrf/executor/main.py b/src/aleph_vrf/executor/main.py index 72984ef..fba041b 100644 --- a/src/aleph_vrf/executor/main.py +++ b/src/aleph_vrf/executor/main.py @@ -45,7 +45,7 @@ GENERATE_MESSAGE_REF_PATH = "hash" # TODO: Use another method to save the data -ANSWERED_REQUESTS: Set[RequestId] = set() +ANSWERED_REQUESTS: Set[ItemHash] = set() GENERATED_NUMBERS: Dict[ExecutionId, bytes] = {} http_app = FastAPI() @@ -105,21 +105,21 @@ async def receive_generate( global GENERATED_NUMBERS, ANSWERED_REQUESTS - message = await _get_message(client=aleph_client, item_hash=vrf_request_hash) - vrf_request = get_vrf_request_from_message(message) - execution_id = ExecutionId(str(uuid4())) - - if vrf_request.request_id in ANSWERED_REQUESTS: + if vrf_request_hash in ANSWERED_REQUESTS: raise fastapi.HTTPException( status_code=409, detail=f"A random number has already been generated for request {vrf_request_hash}", ) + message = await _get_message(client=aleph_client, item_hash=vrf_request_hash) + vrf_request = get_vrf_request_from_message(message) + execution_id = ExecutionId(str(uuid4())) + random_number, random_number_hash = generate( vrf_request.nb_bytes, vrf_request.nonce ) GENERATED_NUMBERS[execution_id] = random_number - ANSWERED_REQUESTS.add(vrf_request.request_id) + ANSWERED_REQUESTS.add(vrf_request_hash) vrf_random_number_hash = VRFRandomNumberHash( nb_bytes=vrf_request.nb_bytes, diff --git a/src/aleph_vrf/settings.py b/src/aleph_vrf/settings.py index 275e049..e175b88 100644 --- a/src/aleph_vrf/settings.py +++ b/src/aleph_vrf/settings.py @@ -19,7 +19,7 @@ class Settings(BaseSettings): default="corechannel", description="Key for the `corechannel` aggregate." ) FUNCTION: str = Field( - default="f6a734dbc98659f030e1cd9c12d8ffb769deac55d42d5db5285fba099755c779", + default="6ad7c5ace3dfbb68954f1eea6b775d8a38610f8d0fdc48b4f1b85ccfa9795931", description="VRF function to use.", ) NB_EXECUTORS: int = Field(default=32, description="Number of executors to use.")