Skip to content

Commit

Permalink
Merge pull request #5 from aleph-im/andres-feature_filter_unauthorize…
Browse files Browse the repository at this point in the history
…d_nodes

Allow the coordinator library to filter unauthorized nodes.
  • Loading branch information
nesitor committed Sep 18, 2023
2 parents 88ee0d1 + ce9091d commit 48cf72d
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 13 deletions.
3 changes: 1 addition & 2 deletions src/aleph_vrf/coordinator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ async def index():
async def receive_vrf() -> APIResponse:
private_key = get_fallback_private_key()
account = ETHAccount(private_key=private_key)
response = {"data": ""}

try:
response = await generate_vrf(account)
except Exception as err:
response["data"] = {"error": str(err)}
response = {"error": str(err)}

return APIResponse(data=response)
39 changes: 31 additions & 8 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
from hashlib import sha3_256
from pathlib import Path
from typing import Any, Dict, List, Type, TypeVar, Union
from uuid import uuid4

Expand Down Expand Up @@ -66,7 +67,20 @@ async def _get_corechannel_aggregate() -> Dict[str, Any]:
return await response.json()


async def select_random_nodes(node_amount: int) -> List[Node]:
def _get_unauthorized_node_list() -> List[str]:
unauthorized_nodes_list_path = Path(__file__).with_name(
"unauthorized_node_list.json"
)
if unauthorized_nodes_list_path.is_file():
with open(unauthorized_nodes_list_path, "rb") as fd:
return json.load(fd)

return []


async def select_random_nodes(
node_amount: int, unauthorized_nodes: List[str]
) -> List[Node]:
node_list: List[Node] = []

content = await _get_corechannel_aggregate()
Expand All @@ -80,8 +94,12 @@ async def select_random_nodes(node_amount: int) -> List[Node]:
resource_nodes = content["data"]["corechannel"]["resource_nodes"]

for resource_node in resource_nodes:
# Filter nodes by address and with linked status
if resource_node["status"] == "linked" and resource_node["score"] > 0.9:
# Filter nodes by score, with linked status and remove unauthorized nodes
if (
resource_node["status"] == "linked"
and resource_node["score"] > 0.9
and resource_node["address"].strip("/") not in unauthorized_nodes
):
node_address = resource_node["address"].strip("/")
node = Node(
hash=resource_node["hash"],
Expand All @@ -101,7 +119,8 @@ async def select_random_nodes(node_amount: int) -> List[Node]:

async def generate_vrf(account: ETHAccount) -> VRFResponse:
nb_executors = settings.NB_EXECUTORS
selected_nodes = await select_random_nodes(nb_executors)
unauthorized_nodes = _get_unauthorized_node_list()
selected_nodes = await select_random_nodes(nb_executors, unauthorized_nodes)
selected_node_list = json.dumps(selected_nodes, default=pydantic_encoder).encode(
encoding="utf-8"
)
Expand Down Expand Up @@ -130,13 +149,14 @@ async def generate_vrf(account: ETHAccount) -> VRFResponse:
logger.debug(
f"Received VRF generated requests from {len(vrf_generated_result)} nodes"
)
logger.debug(vrf_generated_result)
vrf_publish_result = await send_publish_requests(vrf_generated_result)

vrf_publish_result = await send_publish_requests(
vrf_generated_result, vrf_request.request_id
)

logger.debug(
f"Received VRF publish requests from {len(vrf_generated_result)} nodes"
)
logger.debug(vrf_publish_result)

vrf_response = generate_final_vrf(
nb_executors,
Expand Down Expand Up @@ -175,13 +195,16 @@ async def send_generate_requests(

async def send_publish_requests(
vrf_generated_result: Dict[str, VRFResponseHash],
request_id: str,
) -> Dict[str, Union[Exception, VRFRandomBytes]]:
publish_tasks = []
nodes: List[str] = []
for node, vrf_generated_response in vrf_generated_result.items():
nodes.append(node)
if isinstance(vrf_generated_response, Exception):
raise ValueError(f"Generate response not found for Node {node}")
raise ValueError(
f"Generate response not found for Node {node} on request_id {request_id}"
)

node_message_hash = vrf_generated_response.message_hash
url = (
Expand Down
53 changes: 50 additions & 3 deletions tests/coordinator/test_vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,29 @@ def fixture_nodes_aggregate() -> Dict[str, Any]:
"decentralization": 0.9235073688446255,
"registration_url": "",
},
{
"hash": "a653f4f3b2166f20a6bf9b2be9bf14985eeab7525bc66a1fc968bb53761b00d1",
"name": "ImAleph_1",
"time": 1643048120.789,
"type": "compute",
"owner": "0xB25C7ED25b854a036FE0D96a92059dE9C8391253",
"score": 0.9421971134284096,
"banner": "",
"locked": False,
"parent": "9cbecc86d502a99e710e485266e37b9edab625245c406bfe93d9505a2550bcf8",
"reward": "0xB25C7ED25b854a036FE0D96a92059dE9C8391253",
"status": "linked",
"address": "https://aleph2.serverrg.eu",
"manager": "",
"picture": "683b2e0a75dae42b5789da4d33bf959c1b04abe9ebeb3fe880bd839938fe5ac5",
"authorized": "",
"description": "",
"performance": 0.8877354005528273,
"multiaddress": "",
"score_updated": True,
"decentralization": 0.9235073688446255,
"registration_url": "",
},
],
}
},
Expand All @@ -120,15 +143,39 @@ async def test_select_random_nodes(fixture_nodes_aggregate: Dict[str, Any], mock
# Sanity check, avoid network accesses
assert network_fixture.called_once

nodes = await select_random_nodes(3)
nodes = await select_random_nodes(3, [])
assert len(nodes) == 3

with pytest.raises(ValueError) as exception:
resource_nodes = fixture_nodes_aggregate["data"]["corechannel"][
"resource_nodes"
]
await select_random_nodes(len(resource_nodes))
await select_random_nodes(len(resource_nodes), [])
assert (
str(exception.value)
== f"Not enough CRNs linked, only 3 available from 4 requested"
== f"Not enough CRNs linked, only 4 available from 5 requested"
)

@pytest.mark.asyncio
async def test_select_random_nodes_with_unauthorized(fixture_nodes_aggregate: Dict[str, Any], mocker):
network_fixture = mocker.patch(
"aleph_vrf.coordinator.vrf._get_corechannel_aggregate",
return_value=fixture_nodes_aggregate,
)

# Sanity check, avoid network accesses
assert network_fixture.called_once

nodes = await select_random_nodes(3, ["https://aleph2.serverrg.eu"])
assert len(nodes) == 3

with pytest.raises(ValueError) as exception:
resource_nodes = fixture_nodes_aggregate["data"]["corechannel"][
"resource_nodes"
]
await select_random_nodes(len(resource_nodes) - 1, ["https://aleph2.serverrg.eu"])
assert (
str(exception.value)
== f"Not enough CRNs linked, only 3 available from 4 requested"
)

0 comments on commit 48cf72d

Please sign in to comment.