Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Execution VM with custom request ids #34

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion deployment/prepare_vrf_vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@

async def prepare_executor_nodes(executor_item_hash: ItemHash) -> Tuple[List[Executor], List[Executor]]:
aleph_selection_policy = ExecuteOnAleph(vm_function=executor_item_hash)
executors = await aleph_selection_policy.get_candidate_executors()
executors = await aleph_selection_policy.get_all_executors()
print(f"Preparing VRF VMs for {len(executors)} nodes")
prepare_tasks = [asyncio.create_task(prepare_executor_api_request(executor.api_url))
for executor in executors]

vrf_prepare_responses = await asyncio.gather(
*prepare_tasks, return_exceptions=True
)

prepare_results = dict(zip(executors, vrf_prepare_responses))

failed_nodes = []
Expand Down
9 changes: 9 additions & 0 deletions src/aleph_vrf/coordinator/executor_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ async def get_candidate_executors(self) -> List[VRFExecutor]:

return executors

async def get_all_executors(self) -> List[VRFExecutor]:
compute_nodes = self._list_compute_nodes()
executors: List[VRFExecutor] = [
AlephExecutor(node=node, vm_function=self.vm_function)
async for node in compute_nodes
]

return executors


class UsePredeterminedExecutors(ExecutorSelectionPolicy):
"""
Expand Down
18 changes: 11 additions & 7 deletions src/aleph_vrf/coordinator/vrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,20 @@ async def post_executor_api_request(url: str, model: Type[M]) -> M:


async def prepare_executor_api_request(url: str) -> bool:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=120) as resp:
try:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=30) as resp:
resp.raise_for_status()
response = await resp.json()
return response["name"] == "vrf_generate_api"
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error
except aiohttp.ClientResponseError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error
except asyncio.TimeoutError as error:
raise ExecutorHttpError(
url=url, status_code=resp.status, response_text=await resp.text()
) from error


async def _generate_vrf(
Expand Down
14 changes: 7 additions & 7 deletions src/aleph_vrf/executor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
GENERATE_MESSAGE_REF_PATH = "hash"

# TODO: Use another method to save the data
ANSWERED_REQUESTS: Set[RequestId] = set()
ANSWERED_REQUESTS: Set[ItemHash] = set()
GENERATED_NUMBERS: Dict[ExecutionId, bytes] = {}

http_app = FastAPI()
Expand Down Expand Up @@ -105,21 +105,21 @@ async def receive_generate(

global GENERATED_NUMBERS, ANSWERED_REQUESTS

message = await _get_message(client=aleph_client, item_hash=vrf_request_hash)
vrf_request = get_vrf_request_from_message(message)
execution_id = ExecutionId(str(uuid4()))

if vrf_request.request_id in ANSWERED_REQUESTS:
if vrf_request_hash in ANSWERED_REQUESTS:
raise fastapi.HTTPException(
status_code=409,
detail=f"A random number has already been generated for request {vrf_request_hash}",
)

message = await _get_message(client=aleph_client, item_hash=vrf_request_hash)
vrf_request = get_vrf_request_from_message(message)
execution_id = ExecutionId(str(uuid4()))

random_number, random_number_hash = generate(
vrf_request.nb_bytes, vrf_request.nonce
)
GENERATED_NUMBERS[execution_id] = random_number
ANSWERED_REQUESTS.add(vrf_request.request_id)
ANSWERED_REQUESTS.add(vrf_request_hash)

vrf_random_number_hash = VRFRandomNumberHash(
nb_bytes=vrf_request.nb_bytes,
Expand Down
2 changes: 1 addition & 1 deletion src/aleph_vrf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Settings(BaseSettings):
default="corechannel", description="Key for the `corechannel` aggregate."
)
FUNCTION: str = Field(
default="f6a734dbc98659f030e1cd9c12d8ffb769deac55d42d5db5285fba099755c779",
default="6ad7c5ace3dfbb68954f1eea6b775d8a38610f8d0fdc48b4f1b85ccfa9795931",
description="VRF function to use.",
)
NB_EXECUTORS: int = Field(default=32, description="Number of executors to use.")
Expand Down
Loading