diff --git a/src/aleph_vrf/coordinator/main.py b/src/aleph_vrf/coordinator/main.py index 9f9b9ba..dda1d55 100644 --- a/src/aleph_vrf/coordinator/main.py +++ b/src/aleph_vrf/coordinator/main.py @@ -36,11 +36,10 @@ async def index(): async def receive_vrf() -> APIResponse: private_key = get_fallback_private_key() account = ETHAccount(private_key=private_key) - response = {"data": ""} try: response = await generate_vrf(account) except Exception as err: - response["data"] = {"error": str(err)} + response = {"error": str(err)} return APIResponse(data=response) diff --git a/src/aleph_vrf/coordinator/vrf.py b/src/aleph_vrf/coordinator/vrf.py index 0913cee..9cd2d7f 100644 --- a/src/aleph_vrf/coordinator/vrf.py +++ b/src/aleph_vrf/coordinator/vrf.py @@ -3,6 +3,7 @@ import logging import random from hashlib import sha3_256 +from pathlib import Path from typing import Any, Dict, List, Type, TypeVar, Union from uuid import uuid4 @@ -66,7 +67,20 @@ async def _get_corechannel_aggregate() -> Dict[str, Any]: return await response.json() -async def select_random_nodes(node_amount: int) -> List[Node]: +def _get_unauthorized_node_list() -> List[str]: + unauthorized_nodes_list_path = Path(__file__).with_name( + "unauthorized_node_list.json" + ) + if unauthorized_nodes_list_path.is_file(): + with open(unauthorized_nodes_list_path, "rb") as fd: + return json.load(fd) + + return [] + + +async def select_random_nodes( + node_amount: int, unauthorized_nodes: List[str] +) -> List[Node]: node_list: List[Node] = [] content = await _get_corechannel_aggregate() @@ -80,8 +94,12 @@ async def select_random_nodes(node_amount: int) -> List[Node]: resource_nodes = content["data"]["corechannel"]["resource_nodes"] for resource_node in resource_nodes: - # Filter nodes by address and with linked status - if resource_node["status"] == "linked" and resource_node["score"] > 0.9: + # Filter nodes by score, with linked status and remove unauthorized nodes + if ( + resource_node["status"] == "linked" + and resource_node["score"] > 0.9 + and resource_node["address"].strip("/") not in unauthorized_nodes + ): node_address = resource_node["address"].strip("/") node = Node( hash=resource_node["hash"], @@ -101,7 +119,8 @@ async def select_random_nodes(node_amount: int) -> List[Node]: async def generate_vrf(account: ETHAccount) -> VRFResponse: nb_executors = settings.NB_EXECUTORS - selected_nodes = await select_random_nodes(nb_executors) + unauthorized_nodes = _get_unauthorized_node_list() + selected_nodes = await select_random_nodes(nb_executors, unauthorized_nodes) selected_node_list = json.dumps(selected_nodes, default=pydantic_encoder).encode( encoding="utf-8" ) @@ -130,13 +149,14 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse: logger.debug( f"Received VRF generated requests from {len(vrf_generated_result)} nodes" ) - logger.debug(vrf_generated_result) - vrf_publish_result = await send_publish_requests(vrf_generated_result) + + vrf_publish_result = await send_publish_requests( + vrf_generated_result, vrf_request.request_id + ) logger.debug( f"Received VRF publish requests from {len(vrf_generated_result)} nodes" ) - logger.debug(vrf_publish_result) vrf_response = generate_final_vrf( nb_executors, @@ -175,13 +195,16 @@ async def send_generate_requests( async def send_publish_requests( vrf_generated_result: Dict[str, VRFResponseHash], + request_id: str, ) -> Dict[str, Union[Exception, VRFRandomBytes]]: publish_tasks = [] nodes: List[str] = [] for node, vrf_generated_response in vrf_generated_result.items(): nodes.append(node) if isinstance(vrf_generated_response, Exception): - raise ValueError(f"Generate response not found for Node {node}") + raise ValueError( + f"Generate response not found for Node {node} on request_id {request_id}" + ) node_message_hash = vrf_generated_response.message_hash url = ( diff --git a/tests/coordinator/test_vrf.py b/tests/coordinator/test_vrf.py index 6ce8356..e7096e6 100644 --- a/tests/coordinator/test_vrf.py +++ b/tests/coordinator/test_vrf.py @@ -104,6 +104,29 @@ def fixture_nodes_aggregate() -> Dict[str, Any]: "decentralization": 0.9235073688446255, "registration_url": "", }, + { + "hash": "a653f4f3b2166f20a6bf9b2be9bf14985eeab7525bc66a1fc968bb53761b00d1", + "name": "ImAleph_1", + "time": 1643048120.789, + "type": "compute", + "owner": "0xB25C7ED25b854a036FE0D96a92059dE9C8391253", + "score": 0.9421971134284096, + "banner": "", + "locked": False, + "parent": "9cbecc86d502a99e710e485266e37b9edab625245c406bfe93d9505a2550bcf8", + "reward": "0xB25C7ED25b854a036FE0D96a92059dE9C8391253", + "status": "linked", + "address": "https://aleph2.serverrg.eu", + "manager": "", + "picture": "683b2e0a75dae42b5789da4d33bf959c1b04abe9ebeb3fe880bd839938fe5ac5", + "authorized": "", + "description": "", + "performance": 0.8877354005528273, + "multiaddress": "", + "score_updated": True, + "decentralization": 0.9235073688446255, + "registration_url": "", + }, ], } }, @@ -120,15 +143,39 @@ async def test_select_random_nodes(fixture_nodes_aggregate: Dict[str, Any], mock # Sanity check, avoid network accesses assert network_fixture.called_once - nodes = await select_random_nodes(3) + nodes = await select_random_nodes(3, []) assert len(nodes) == 3 with pytest.raises(ValueError) as exception: resource_nodes = fixture_nodes_aggregate["data"]["corechannel"][ "resource_nodes" ] - await select_random_nodes(len(resource_nodes)) + await select_random_nodes(len(resource_nodes), []) assert ( str(exception.value) - == f"Not enough CRNs linked, only 3 available from 4 requested" + == f"Not enough CRNs linked, only 4 available from 5 requested" + ) + +@pytest.mark.asyncio +async def test_select_random_nodes_with_unauthorized(fixture_nodes_aggregate: Dict[str, Any], mocker): + network_fixture = mocker.patch( + "aleph_vrf.coordinator.vrf._get_corechannel_aggregate", + return_value=fixture_nodes_aggregate, + ) + + # Sanity check, avoid network accesses + assert network_fixture.called_once + + nodes = await select_random_nodes(3, ["https://aleph2.serverrg.eu"]) + assert len(nodes) == 3 + + with pytest.raises(ValueError) as exception: + resource_nodes = fixture_nodes_aggregate["data"]["corechannel"][ + "resource_nodes" + ] + await select_random_nodes(len(resource_nodes) - 1, ["https://aleph2.serverrg.eu"]) + assert ( + str(exception.value) + == f"Not enough CRNs linked, only 3 available from 4 requested" ) +