From ebb4f8536990c9f8b4930f9113e04b9e6fadc344 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:39:47 +0100 Subject: [PATCH 1/3] SN1-361: Query multiple miners to return fastest response + Scoring bug fix (#521) --- neurons/validator.py | 7 +- prompting/api/api.py | 5 +- prompting/weight_setting/weight_setter.py | 2 +- scripts/client.py | 1 - shared/misc.py | 2 +- validator_api/chat_completion.py | 151 +++++++++++++++++----- 6 files changed, 128 insertions(+), 40 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 9d7e6305..6062ec8f 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -81,11 +81,12 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) -def start_api(): +def start_api(scoring_queue, reward_events): async def start(): from prompting.api.api import start_scoring_api # noqa: F401 - await start_scoring_api() + await start_scoring_api(scoring_queue, reward_events) + while True: await asyncio.sleep(10) logger.debug("Running API...") @@ -125,7 +126,7 @@ async def main(): if shared_settings.DEPLOY_SCORING_API: # Use multiprocessing to bypass API blocking issue - api_process = mp.Process(target=start_api, name="API_Process") + api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process") api_process.start() processes.append(api_process) diff --git a/prompting/api/api.py b/prompting/api/api.py index 63678ab7..825b19c7 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -4,6 +4,7 @@ from prompting.api.miner_availabilities.api import router as miner_availabilities_router from prompting.api.scoring.api import router as scoring_router +from prompting.rewards.scoring import task_scorer from shared.settings import shared_settings app = FastAPI() @@ -17,7 +18,9 @@ def health(): return {"status": "healthy"} -async def start_scoring_api(): +async def start_scoring_api(scoring_queue, reward_events): + task_scorer.scoring_queue = scoring_queue + task_scorer.reward_events = reward_events logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}") uvicorn.run( "prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 28c195b1..12fc97c0 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -102,7 +102,7 @@ def set_weights( "weights": processed_weights.flatten(), "raw_weights": str(list(weights.flatten())), "averaged_weights": str(list(averaged_weights.flatten())), - "block": ttl_get_block(), + "block": ttl_get_block(subtensor=subtensor), } ) step_filename = "weights.csv" diff --git a/scripts/client.py b/scripts/client.py index 3ef6d36c..29e72a0d 100644 --- a/scripts/client.py +++ b/scripts/client.py @@ -9,7 +9,6 @@ from loguru import logger from shared.epistula import query_miners -from shared.settings import shared_settings """ This has assumed you have: diff --git a/shared/misc.py b/shared/misc.py index e58f313c..858765b7 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -100,7 +100,7 @@ def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int: efficiently reduces the workload on the blockchain interface. Example: - current_block = ttl_get_block(self) + current_block = ttl_get_block(subtensor=subtensor) Note: self here is the miner or validator instance """ diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index b0cef872..ee670e70 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -1,7 +1,7 @@ import asyncio import json import random -from typing import AsyncGenerator +from typing import AsyncGenerator, List, Optional import httpx from fastapi import HTTPException @@ -14,14 +14,15 @@ async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): - uid = int(uid) # sometimes uid is type np.uint64 - logger.info(f"Forwarding response to scoring with body: {body}") - if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default + uid = int(uid) + logger.info(f"Forwarding response from uid {uid} to scoring with body: {body} and chunks: {chunks}") + if not shared_settings.SCORE_ORGANICS: return if body.get("task") != "InferenceTask": logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") return + url = f"http://{shared_settings.VALIDATOR_API}/scoring" payload = {"body": body, "chunks": chunks, "uid": uid} try: @@ -32,62 +33,124 @@ async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): ) if response.status_code == 200: logger.info(f"Forwarding response completed with status {response.status_code}") - else: logger.exception( f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" ) - except Exception as e: logger.error(f"Tried to forward response to {url} with payload {payload}") logger.exception(f"Error while forwarding response: {e}") -async def stream_response( - response, collected_chunks: list[str], body: dict[str, any], uid: int +async def stream_from_first_response( + responses: List[asyncio.Task], collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] ) -> AsyncGenerator[str, None]: - chunks_received = False + first_valid_response = None try: - async for chunk in response: + # Wait for the first valid response + while responses and first_valid_response is None: + done, pending = await asyncio.wait(responses, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + response = await task + if response and not isinstance(response, Exception): + first_valid_response = response + break + except Exception as e: + logger.error(f"Error in miner response: {e}") + responses.remove(task) + + if first_valid_response is None: + logger.error("No valid response received from any miner") + yield 'data: {"error": "502 - No valid response received"}\n\n' + return + + # Stream the first valid response + chunks_received = False + async for chunk in first_valid_response: chunks_received = True - collected_chunks.append(chunk.choices[0].delta.content) + collected_chunks_list[0].append(chunk.choices[0].delta.content) yield f"data: {json.dumps(chunk.model_dump())}\n\n" if not chunks_received: logger.error("Stream is empty: No chunks were received") yield 'data: {"error": "502 - Response is empty"}\n\n' + yield "data: [DONE]\n\n" - # Forward the collected chunks after streaming is complete - asyncio.create_task(forward_response(uid=uid, body=body, chunks=collected_chunks)) + # Continue collecting remaining responses in background for scoring + remaining = asyncio.gather(*pending, return_exceptions=True) + asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) + except asyncio.CancelledError: logger.info("Client disconnected, streaming cancelled") + for task in responses: + task.cancel() raise except Exception as e: logger.exception(f"Error during streaming: {e}") yield 'data: {"error": "Internal server Error"}\n\n' -async def chat_completion(body: dict[str, any], uid: int | None = None) -> tuple | StreamingResponse: - """Handle regular chat completion without mixture of miners.""" - if uid is None: - uid = random.choice(get_uids(sampling_mode="top_incentive", k=100)) +async def collect_remaining_responses( + remaining: asyncio.Task, collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] +): + """Collect remaining responses for scoring without blocking the main response.""" + try: + responses = await remaining + logger.debug(f"responses to forward: {responses}") + for i, response in enumerate(responses): + if isinstance(response, Exception): + logger.error(f"Error collecting response from uid {uids[i+1]}: {response}") + continue + + async for chunk in response: + collected_chunks_list[i + 1].append(chunk.choices[0].delta.content) + for uid, chunks in zip(uids, collected_chunks_list): + # Forward for scoring + asyncio.create_task(forward_response(uid, body, chunks)) - if uid is None: - logger.error("No available miner found") - raise HTTPException(status_code=503, detail="No available miner found") + except Exception as e: + logger.exception(f"Error collecting remaining responses: {e}") - logger.debug(f"Querying uid {uid}") - STREAM = body.get("stream", False) - collected_chunks: list[str] = [] +async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple: + """Get response from a single miner.""" + return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False) - logger.info(f"Making {'streaming' if STREAM else 'non-streaming'} openai query with body: {body}") - response = await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=STREAM) + +async def chat_completion( + body: dict[str, any], uids: Optional[int] = None, num_miners: int = 3 +) -> tuple | StreamingResponse: + """Handle chat completion with multiple miners in parallel.""" + # Get multiple UIDs if none specified + if uids is None: + uids = list(get_uids(sampling_mode="top_incentive", k=100)) + if uids is None or len(uids) == 0: # if not uids throws error, figure out how to fix + logger.error("No available miners found") + raise HTTPException(status_code=503, detail="No available miners found") + selected_uids = random.sample(uids, min(num_miners, len(uids))) + else: + selected_uids = uids[:num_miners] # If UID is specified, only use that one + + logger.debug(f"Querying uids {selected_uids}") + STREAM = body.get("stream", False) + + # Initialize chunks collection for each miner + collected_chunks_list = [[] for _ in selected_uids] if STREAM: + # Create tasks for all miners + response_tasks = [ + asyncio.create_task( + make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=True) + ) + for uid in selected_uids + ] + return StreamingResponse( - stream_response(response, collected_chunks, body, uid), + stream_from_first_response(response_tasks, collected_chunks_list, body, selected_uids), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -95,10 +158,32 @@ async def chat_completion(body: dict[str, any], uid: int | None = None) -> tuple }, ) else: - asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) - return response[0] - - -async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple: - """Get response from a single miner.""" - return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False) + # For non-streaming requests, wait for first valid response + response_tasks = [asyncio.create_task(get_response_from_miner(body, uid)) for uid in selected_uids] + + first_valid_response = None + collected_responses = [] + + while response_tasks and first_valid_response is None: + done, pending = await asyncio.wait(response_tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + response = await task + if response and isinstance(response, tuple): + if first_valid_response is None: + first_valid_response = response + collected_responses.append(response) + except Exception as e: + logger.error(f"Error in miner response: {e}") + response_tasks.remove(task) + + if first_valid_response is None: + raise HTTPException(status_code=502, detail="No valid response received") + + # Forward all collected responses for scoring in the background + for i, response in enumerate(collected_responses): + if response and isinstance(response, tuple): + asyncio.create_task(forward_response(uid=selected_uids[i], body=body, chunks=response[1])) + + return first_valid_response[0] # Return only the response object, not the chunks From 667e264e0bed235e0f65bf989affc98ac2b4d3f1 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:40:17 +0100 Subject: [PATCH 2/3] Remove py3.9 from github actions (#527) --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 296f9b5c..d601319e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10"] + python-version: ["3.10"] steps: - uses: actions/checkout@v3 From 4654d3a92f08f2520c98416b1c5ceafee7f9ec43 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Fri, 3 Jan 2025 19:53:56 +0100 Subject: [PATCH 3/3] Adding web retrieval endpoint (#528) --- prompting/api/scoring/api.py | 72 ++++++++++++++++++++-------- prompting/datasets/random_website.py | 4 +- shared/dendrite.py | 2 +- shared/epistula.py | 2 +- shared/uids.py | 3 +- validator_api/chat_completion.py | 31 +----------- validator_api/gpt_endpoints.py | 51 ++++++++++++++++++++ validator_api/utils.py | 35 ++++++++++++++ 8 files changed, 146 insertions(+), 54 deletions(-) create mode 100644 validator_api/utils.py diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index ee18ade1..d7c981c1 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -4,9 +4,11 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Request from loguru import logger +from prompting.datasets.random_website import DDGDatasetEntry from prompting.llms.model_zoo import ModelZoo from prompting.rewards.scoring import task_scorer from prompting.tasks.inference import InferenceTask +from prompting.tasks.web_retrieval import WebRetrievalTask from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent from shared.epistula import SynapseStreamResult @@ -37,22 +39,54 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate uid = int(payload.get("uid")) chunks = payload.get("chunks") llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None - task_scorer.add_to_queue( - task=InferenceTask( - messages=[msg["content"] for msg in body.get("messages")], - llm_model=llm_model, - llm_model_id=body.get("model"), - seed=int(body.get("seed", 0)), - sampling_params=body.get("sampling_params", {}), - ), - response=DendriteResponseEvent( - uids=[uid], - stream_results=[SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])], - timeout=shared_settings.NEURON_TIMEOUT, - ), - dataset_entry=DatasetEntry(), - block=shared_settings.METAGRAPH.block, - step=-1, - task_id=str(uuid.uuid4()), - ) - logger.info("Organic tas appended to scoring queue") + task = body.get("task") + if task == "InferenceTask": + logger.info(f"Received Organic InferenceTask with body: {body}") + task_scorer.add_to_queue( + task=InferenceTask( + messages=[msg["content"] for msg in body.get("messages")], + llm_model=llm_model, + llm_model_id=body.get("model"), + seed=int(body.get("seed", 0)), + sampling_params=body.get("sampling_params", {}), + ), + response=DendriteResponseEvent( + uids=[uid], + stream_results=[ + SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) + ], + timeout=shared_settings.NEURON_TIMEOUT, + ), + dataset_entry=DatasetEntry(), + block=shared_settings.METAGRAPH.block, + step=-1, + task_id=str(uuid.uuid4()), + ) + elif task == "WebRetrievalTask": + logger.info(f"Received Organic WebRetrievalTask with body: {body}") + try: + search_term = body.get("messages")[0].get("content") + except Exception as ex: + logger.error(f"Failed to get search term from messages: {ex}, can't score WebRetrievalTask") + return + + task_scorer.add_to_queue( + task=WebRetrievalTask( + messages=[msg["content"] for msg in body.get("messages")], + seed=int(body.get("seed", 0)), + sampling_params=body.get("sampling_params", {}), + query=search_term, + ), + response=DendriteResponseEvent( + uids=[uid], + stream_results=[ + SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) + ], + timeout=shared_settings.NEURON_TIMEOUT, + ), + dataset_entry=DDGDatasetEntry(search_term=search_term), + block=shared_settings.METAGRAPH.block, + step=-1, + task_id=str(uuid.uuid4()), + ) + logger.info("Organic task appended to scoring queue") diff --git a/prompting/datasets/random_website.py b/prompting/datasets/random_website.py index 3058812a..18fd3c8d 100644 --- a/prompting/datasets/random_website.py +++ b/prompting/datasets/random_website.py @@ -15,8 +15,8 @@ class DDGDatasetEntry(DatasetEntry): search_term: str - website_url: str - website_content: str + website_url: str = None + website_content: str = None class DDGDataset(BaseDataset): diff --git a/shared/dendrite.py b/shared/dendrite.py index 254a6c79..ccfd5c86 100644 --- a/shared/dendrite.py +++ b/shared/dendrite.py @@ -35,9 +35,9 @@ def model_dump(self): class DendriteResponseEvent(BaseModel): uids: np.ndarray | list[float] - axons: list[str] timeout: float stream_results: list[SynapseStreamResult] + axons: list[str] = [] completions: list[str] = [] status_messages: list[str] = [] status_codes: list[int] = [] diff --git a/shared/epistula.py b/shared/epistula.py index 44a59f6f..5af06485 100644 --- a/shared/epistula.py +++ b/shared/epistula.py @@ -111,7 +111,7 @@ async def merged_stream(responses: list[AsyncGenerator]): logger.error(f"Error while streaming: {e}") -async def query_miners(uids, body: dict[str, Any]): +async def query_miners(uids, body: dict[str, Any]) -> list[SynapseStreamResult]: try: tasks = [] for uid in uids: diff --git a/shared/uids.py b/shared/uids.py index ef494ba2..a8c99b5d 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -114,7 +114,8 @@ def get_top_incentive_uids(k: int, vpermit_tao_limit: int) -> np.ndarray: # Extract the top uids. top_k_uids = [uid for uid, incentive in uid_incentive_pairs_sorted[:k]] - return np.array(top_k_uids).astype(int) + return list(np.array(top_k_uids).astype(int)) + # return [int(k) for k in top_k_uids] def get_uids( diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index ee670e70..fd05a174 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -3,7 +3,6 @@ import random from typing import AsyncGenerator, List, Optional -import httpx from fastapi import HTTPException from fastapi.responses import StreamingResponse from loguru import logger @@ -11,35 +10,7 @@ from shared.epistula import make_openai_query from shared.settings import shared_settings from shared.uids import get_uids - - -async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): - uid = int(uid) - logger.info(f"Forwarding response from uid {uid} to scoring with body: {body} and chunks: {chunks}") - if not shared_settings.SCORE_ORGANICS: - return - - if body.get("task") != "InferenceTask": - logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") - return - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunks, "uid": uid} - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} - ) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - else: - logger.exception( - f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" - ) - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}") +from validator_api.utils import forward_response async def stream_from_first_response( diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 3614e865..b363c1b1 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -1,12 +1,18 @@ +import asyncio import json import random +import numpy as np from fastapi import APIRouter, Depends, Header, HTTPException, Request from loguru import logger from starlette.responses import StreamingResponse +from shared.epistula import SynapseStreamResult, query_miners +from shared.settings import shared_settings +from shared.uids import get_uids from validator_api.chat_completion import chat_completion from validator_api.mixture_of_miners import mixture_of_miners +from validator_api.utils import forward_response router = APIRouter() @@ -37,3 +43,48 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) except Exception as e: logger.exception(f"Error in chat completion: {e}") return StreamingResponse(content="Internal Server Error", status_code=500) + + +@router.post("/web_retrieval") +async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = None): + uids = list(get_uids(sampling_mode="random", k=n_miners)) + logger.debug(f"🔍 Querying uids: {uids}") + if len(uids) == 0: + logger.warning("No available miners. This should already have been caught earlier.") + return + + body = { + "seed": random.randint(0, 1_000_000), + "sampling_parameters": shared_settings.SAMPLING_PARAMS, + "task": "WebRetrievalTask", + "messages": [ + {"role": "user", "content": search_query}, + ], + } + stream_results = await query_miners(uids, body) + results = [ + "".join(res.accumulated_chunks) + for res in stream_results + if isinstance(res, SynapseStreamResult) and res.accumulated_chunks + ] + distinct_results = list(np.unique(results)) + logger.info( + f"🔍 Collected responses from {len(stream_results)} miners. {len(results)} responded successfully with a total of {len(distinct_results)} distinct results" + ) + loaded_results = [] + for result in distinct_results: + try: + loaded_results.append(json.loads(result)) + logger.info(f"🔍 Result: {result}") + except Exception: + logger.error(f"🔍 Result: {result}") + if len(loaded_results) == 0: + raise HTTPException(status_code=500, detail="No miner responded successfully") + + for uid, res in zip(uids, stream_results): + asyncio.create_task( + forward_response( + uid=uid, body=body, chunks=res.accumulated_chunks if res and res.accumulated_chunks else [] + ) + ) + return loaded_results diff --git a/validator_api/utils.py b/validator_api/utils.py new file mode 100644 index 00000000..b83e819b --- /dev/null +++ b/validator_api/utils.py @@ -0,0 +1,35 @@ +import httpx +from loguru import logger + +from shared.settings import shared_settings + + +# TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but +# also means that on the validator side all responses are scored at once, speeding up the scoring process. +async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): + uid = int(uid) + logger.info(f"Forwarding response from uid {uid} to scoring with body: {body} and chunks: {chunks}") + if not shared_settings.SCORE_ORGANICS: + return + + if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": + logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") + return + + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + payload = {"body": body, "chunks": chunks, "uid": uid} + try: + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + ) + if response.status_code == 200: + logger.info(f"Forwarding response completed with status {response.status_code}") + else: + logger.exception( + f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" + ) + except Exception as e: + logger.error(f"Tried to forward response to {url} with payload {payload}") + logger.exception(f"Error while forwarding response: {e}")