From 9c1659d4d1e1745221d77e0d30aaa221279f5c97 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:08:07 +0000 Subject: [PATCH 01/14] SN1-362 fix issues with scoring endpoint (#507) Co-authored-by: bkb2135 <98138173+bkb2135@users.noreply.github.com> Co-authored-by: richwardle --- .gitignore | 1 + neurons/validator.py | 139 ++++++++++++++++------ prompting/api/api.py | 2 +- prompting/api/scoring/api.py | 21 +++- prompting/llms/model_manager.py | 10 +- prompting/rewards/scoring.py | 22 ++-- prompting/tasks/task_creation.py | 17 ++- prompting/tasks/task_sending.py | 53 ++++++--- prompting/weight_setting/weight_setter.py | 60 ++++++---- pyproject.toml | 2 +- shared/misc.py | 6 +- shared/settings.py | 2 + shared/uids.py | 5 +- tests/prompting/test_weight_settings.py | 9 +- validator_api/gpt_endpoints.py | 17 +-- 15 files changed, 252 insertions(+), 114 deletions(-) diff --git a/.gitignore b/.gitignore index 0f23e623..3655df69 100644 --- a/.gitignore +++ b/.gitignore @@ -183,3 +183,4 @@ wandb .vscode api_keys.json prompting/api/api_keys.json +weights.csv diff --git a/neurons/validator.py b/neurons/validator.py index a0c9c782..a414b256 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -8,6 +8,7 @@ import multiprocessing as mp import time +import torch from loguru import logger from prompting.api.api import start_scoring_api @@ -20,48 +21,112 @@ from prompting.weight_setting.weight_setter import weight_setter from shared.profiling import profiler +torch.multiprocessing.set_start_method("spawn", force=True) + NEURON_SAMPLE_SIZE = 100 +def create_loop_process(task_queue, scoring_queue, reward_events): + async def spawn_loops(task_queue, scoring_queue, reward_events): + logger.info("Starting Profiler...") + asyncio.create_task(profiler.print_stats(), name="Profiler"), + logger.info("Starting ModelScheduler...") + asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler"), + logger.info("Starting TaskScorer...") + asyncio.create_task(task_scorer.start(scoring_queue, reward_events), name="TaskScorer"), + logger.info("Starting WeightSetter...") + asyncio.create_task(weight_setter.start(reward_events)) + + # Main monitoring loop + start = time.time() + + logger.info("Starting Main Monitoring Loop...") + while True: + await asyncio.sleep(5) + current_time = time.time() + time_diff = current_time - start + start = current_time + + # Check if all tasks are still running + logger.debug(f"Running {time_diff:.2f} seconds") + logger.debug(f"Number of tasks in Task Queue: {len(task_queue)}") + logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}") + logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}") + + asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) + + +def start_api(): + async def start(): + await start_scoring_api() + while True: + await asyncio.sleep(10) + logger.debug("Running API...") + + asyncio.run(start()) + + +def create_task_loop(task_queue, scoring_queue): + async def start(task_queue, scoring_queue): + logger.info("Starting AvailabilityCheckingLoop...") + asyncio.create_task(availability_checking_loop.start()) + + logger.info("Starting TaskSender...") + asyncio.create_task(task_sender.start(task_queue, scoring_queue)) + + logger.info("Starting TaskLoop...") + asyncio.create_task(task_loop.start(task_queue, scoring_queue)) + while True: + await asyncio.sleep(10) + logger.debug("Running task loop...") + + asyncio.run(start(task_queue, scoring_queue)) + + async def main(): # will start checking the availability of miners at regular intervals, needed for API and Validator - asyncio.create_task(availability_checking_loop.start()) - - if shared_settings.DEPLOY_SCORING_API: - # Use multiprocessing to bypass API blocking issue. - api_process = mp.Process(target=lambda: asyncio.run(start_scoring_api())) - api_process.start() - - GPUInfo.log_gpu_info() - # start profiling - asyncio.create_task(profiler.print_stats()) - - # start rotating LLM models - asyncio.create_task(model_scheduler.start()) - - # start creating tasks - asyncio.create_task(task_loop.start()) - - # will start checking the availability of miners at regular intervals - asyncio.create_task(availability_checking_loop.start()) - - # start sending tasks to miners - asyncio.create_task(task_sender.start()) - - # sets weights at regular intervals (synchronised between all validators) - asyncio.create_task(weight_setter.start()) - - # start scoring tasks in separate loop - asyncio.create_task(task_scorer.start()) - # # TODO: Think about whether we want to store the task queue locally in case of a crash - # # TODO: Possibly run task scorer & model scheduler with a lock so I don't unload a model whilst it's generating - # # TODO: Make weight setting happen as specific intervals as we load/unload models - start = time.time() - await asyncio.sleep(60) - while True: - await asyncio.sleep(5) - time_diff = -start + (start := time.time()) - logger.debug(f"Running {time_diff:.2f} seconds") + with torch.multiprocessing.Manager() as manager: + reward_events = manager.list() + scoring_queue = manager.list() + task_queue = manager.list() + + # Create process pool for managed processes + processes = [] + + try: + # # Start checking the availability of miners at regular intervals + + if shared_settings.DEPLOY_SCORING_API: + # Use multiprocessing to bypass API blocking issue + api_process = mp.Process(target=start_api, name="API_Process") + api_process.start() + processes.append(api_process) + + loop_process = mp.Process( + target=create_loop_process, args=(task_queue, scoring_queue, reward_events), name="LoopProcess" + ) + task_loop_process = mp.Process( + target=create_task_loop, args=(task_queue, scoring_queue), name="TaskLoopProcess" + ) + loop_process.start() + task_loop_process.start() + processes.append(loop_process) + processes.append(task_loop_process) + GPUInfo.log_gpu_info() + + while True: + await asyncio.sleep(10) + logger.debug("Running...") + + except Exception as e: + logger.error(f"Main loop error: {e}") + raise + finally: + # Clean up processes + for process in processes: + if process.is_alive(): + process.terminate() + process.join() # The main function parses the configuration and runs the validator. diff --git a/prompting/api/api.py b/prompting/api/api.py index a3038581..63678ab7 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -18,7 +18,7 @@ def health(): async def start_scoring_api(): - logger.info("Starting API...") + logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}") uvicorn.run( "prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False ) diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index a2492db2..ee18ade1 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -1,7 +1,8 @@ import uuid from typing import Any -from fastapi import APIRouter, Request +from fastapi import APIRouter, Depends, Header, HTTPException, Request +from loguru import logger from prompting.llms.model_zoo import ModelZoo from prompting.rewards.scoring import task_scorer @@ -14,10 +15,25 @@ router = APIRouter() +def validate_scoring_key(api_key: str = Header(...)): + if api_key != shared_settings.SCORING_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + + @router.post("/scoring") -async def score_response(request: Request): # , api_key_data: dict = Depends(validate_api_key)): +async def score_response(request: Request, api_key_data: dict = Depends(validate_scoring_key)): + model = None payload: dict[str, Any] = await request.json() body = payload.get("body") + + try: + if body.get("model") is not None: + model = ModelZoo.get_model_by_id(body.get("model")) + except Exception: + logger.warning( + f"Organic request with model {body.get('model')} made but the model cannot be found in model zoo. Skipping scoring." + ) + return uid = int(payload.get("uid")) chunks = payload.get("chunks") llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None @@ -39,3 +55,4 @@ async def score_response(request: Request): # , api_key_data: dict = Depends(va step=-1, task_id=str(uuid.uuid4()), ) + logger.info("Organic tas appended to scoring queue") diff --git a/prompting/llms/model_manager.py b/prompting/llms/model_manager.py index b5d236c8..b0f516af 100644 --- a/prompting/llms/model_manager.py +++ b/prompting/llms/model_manager.py @@ -9,7 +9,6 @@ from prompting.llms.hf_llm import ReproducibleHF from prompting.llms.model_zoo import ModelConfig, ModelZoo from prompting.llms.utils import GPUInfo -from prompting.mutable_globals import scoring_queue from shared.loop_runner import AsyncLoopRunner from shared.settings import shared_settings @@ -158,6 +157,11 @@ def generate( class AsyncModelScheduler(AsyncLoopRunner): llm_model_manager: ModelManager interval: int = 14400 + scoring_queue: list | None = None + + async def start(self, scoring_queue: list): + self.scoring_queue = scoring_queue + return await super().start() async def initialise_loop(self): model_manager.load_always_active_models() @@ -165,7 +169,7 @@ async def initialise_loop(self): async def run_step(self): """This method is called periodically according to the interval.""" # try to load the model belonging to the oldest task in the queue - selected_model = scoring_queue[0].task.llm_model if scoring_queue else None + selected_model = self.scoring_queue[0].task.llm_model if self.scoring_queue else None if not selected_model: selected_model = ModelZoo.get_random(max_ram=self.llm_model_manager.total_ram) logger.info(f"Loading model {selected_model.llm_model_id} for {self.interval} seconds.") @@ -174,7 +178,7 @@ async def run_step(self): logger.info(f"Model {selected_model.llm_model_id} is already loaded.") return - logger.debug(f"Active models: {model_manager.active_models.keys()}") + logger.debug(f"Active models: {self.llm_model_manager.active_models.keys()}") # Load the selected model loop = asyncio.get_running_loop() await loop.run_in_executor(None, self.llm_model_manager.load_model, selected_model) diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 7a1a6285..1405f517 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -5,7 +5,6 @@ from loguru import logger from pydantic import ConfigDict -from prompting import mutable_globals from prompting.llms.model_manager import model_manager, model_scheduler from prompting.tasks.base_task import BaseTextTask from prompting.tasks.task_registry import TaskRegistry @@ -33,9 +32,16 @@ class TaskScorer(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None interval: int = 10 + scoring_queue: list | None = None + reward_events: list | None = None model_config = ConfigDict(arbitrary_types_allowed=True) + async def start(self, scoring_queue, reward_events): + self.scoring_queue = scoring_queue + self.reward_events = reward_events + return await super().start() + def add_to_queue( self, task: BaseTextTask, @@ -45,7 +51,7 @@ def add_to_queue( step: int, task_id: str, ) -> None: - mutable_globals.scoring_queue.append( + self.scoring_queue.append( ScoringConfig( task=task, response=response, @@ -55,26 +61,24 @@ def add_to_queue( task_id=task_id, ) ) - logger.debug( - f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(mutable_globals.scoring_queue)}" - ) + logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") async def run_step(self) -> RewardLoggingEvent: await asyncio.sleep(0.1) # Only score responses for which the model is loaded scorable = [ scoring_config - for scoring_config in mutable_globals.scoring_queue + for scoring_config in self.scoring_queue if (scoring_config.task.llm_model in model_manager.active_models.keys()) or (scoring_config.task.llm_model is None) ] if len(scorable) == 0: logger.debug("Nothing to score. Skipping scoring step.") # Run a model_scheduler step to load a new model as there are no more tasks to be scored - if len(mutable_globals.scoring_queue) > 0: + if len(self.scoring_queue) > 0: await model_scheduler.run_step() return - mutable_globals.scoring_queue.remove(scorable[0]) + self.scoring_queue.remove(scorable[0]) scoring_config: ScoringConfig = scorable.pop(0) # here we generate the actual reference @@ -94,7 +98,7 @@ async def run_step(self) -> RewardLoggingEvent: model_id=scoring_config.task.llm_model, task=scoring_config.task, ) - mutable_globals.reward_events.append(reward_events) + self.reward_events.append(reward_events) logger.debug( f"REFERENCE: {scoring_config.task.reference}\n\n||||RESPONSES: {scoring_config.response.completions}" ) diff --git a/prompting/tasks/task_creation.py b/prompting/tasks/task_creation.py index c215a75e..21b1d754 100644 --- a/prompting/tasks/task_creation.py +++ b/prompting/tasks/task_creation.py @@ -5,7 +5,6 @@ from pydantic import ConfigDict from prompting.miner_availability.miner_availability import miner_availabilities -from prompting.mutable_globals import scoring_queue, task_queue from prompting.tasks.task_registry import TaskRegistry from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner @@ -18,14 +17,20 @@ class TaskLoop(AsyncLoopRunner): is_running: bool = False thread: threading.Thread = None interval: int = 10 - + task_queue: list | None = [] + scoring_queue: list | None = [] model_config = ConfigDict(arbitrary_types_allowed=True) + async def start(self, task_queue, scoring_queue): + self.task_queue = task_queue + self.scoring_queue = scoring_queue + await super().start() + async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: - if len(task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD: + if len(self.task_queue) > shared_settings.TASK_QUEUE_LENGTH_THRESHOLD: logger.debug("Task queue is full. Skipping task generation.") return None - if len(scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: + if len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: logger.debug("Scoring queue is full. Skipping task generation.") return None await asyncio.sleep(0.1) @@ -55,7 +60,9 @@ async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: if not task.query: logger.debug(f"Generating query for task: {task.__class__.__name__}.") task.make_query(dataset_entry=dataset_entry) - task_queue.append(task) + + logger.debug(f"Appending task: {task.__class__.__name__} to task queue.") + self.task_queue.append(task) except Exception as ex: logger.exception(ex) return None diff --git a/prompting/tasks/task_sending.py b/prompting/tasks/task_sending.py index 7288e4f6..659ed814 100644 --- a/prompting/tasks/task_sending.py +++ b/prompting/tasks/task_sending.py @@ -3,19 +3,19 @@ import time from typing import List +import bittensor as bt from loguru import logger -from prompting import mutable_globals from prompting.miner_availability.miner_availability import miner_availabilities -from prompting.mutable_globals import scoring_queue -from prompting.rewards.scoring import task_scorer + +# from prompting.rewards.scoring import task_scorer +from prompting.rewards.scoring import ScoringConfig from prompting.tasks.base_task import BaseTextTask from prompting.tasks.inference import InferenceTask from shared.dendrite import DendriteResponseEvent, SynapseStreamResult from shared.epistula import query_miners from shared.logging import ErrorLoggingEvent, ValidatorLoggingEvent from shared.loop_runner import AsyncLoopRunner -from shared.misc import ttl_get_block from shared.settings import shared_settings from shared.timer import Timer @@ -61,6 +61,8 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: log_stream_results(stream_results) + logger.debug("🔍 Creating response event") + response_event = DendriteResponseEvent( stream_results=stream_results, uids=uids, @@ -68,6 +70,7 @@ async def collect_responses(task: BaseTextTask) -> DendriteResponseEvent | None: shared_settings.INFERENCE_TIMEOUT if isinstance(task, InferenceTask) else shared_settings.NEURON_TIMEOUT ), ) + logger.debug("🔍 Response event created") return response_event @@ -76,10 +79,25 @@ class TaskSender(AsyncLoopRunner): _lock: asyncio.Lock = asyncio.Lock() time_of_block_sync: float | None = None + task_queue: list | None = None + scoring_queue: list | None = None + subtensor: bt.Subtensor | None = None + + class Config: + arbitrary_types_allowed = True + + async def start(self, task_queue, scoring_queue): + self.task_queue = task_queue + self.scoring_queue = scoring_queue + + # shared_settings is not initialised inside this process, meaning it cannot access any non-constants from here + self.subtensor = bt.subtensor(network=shared_settings.SUBTENSOR_NETWORK) + return await super().start() + @property def block(self): self.time_of_block_sync = time.time() - return ttl_get_block() + return self.subtensor.get_current_block() @property def estimate_block(self): @@ -101,9 +119,7 @@ def estimate_block(self): return estimated_block - async def run_step( - self, k: int = shared_settings.ORGANIC_SAMPLE_SIZE, timeout: float = shared_settings.NEURON_TIMEOUT - ) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: + async def run_step(self) -> ValidatorLoggingEvent | ErrorLoggingEvent | None: """Executes a single step of the agent, which consists of: - Getting a list of uids to query - Querying the network @@ -118,16 +134,16 @@ async def run_step( timeout (float): The timeout for the queries. exclude (list, optional): The list of uids to exclude from the query. Defaults to []. """ - while len(scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: + while len(self.scoring_queue) > shared_settings.SCORING_QUEUE_LENGTH_THRESHOLD: logger.debug("Scoring queue is full. Waiting 1 second...") await asyncio.sleep(1) - while len(mutable_globals.task_queue) == 0: + while len(self.task_queue) == 0: logger.warning("No tasks in queue. Waiting 1 second...") await asyncio.sleep(1) try: # get task from the task queue - mutable_globals.task_queue: list[BaseTextTask] - task = mutable_globals.task_queue.pop(0) + self.task_queue: list[BaseTextTask] + task = self.task_queue.pop(0) # send the task to the miners and collect the responses with Timer() as timer: @@ -135,17 +151,22 @@ async def run_step( if response_event is None: logger.warning("No response event collected. This should not be happening.") return - logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") - # scoring_manager will score the responses as and when the correct model is loaded - task_scorer.add_to_queue( + logger.debug("🔍 Estimating block") + estimated_block = self.estimate_block + logger.debug("🔍 Creating scoring config") + + scoring_config = ScoringConfig( task=task, response=response_event, dataset_entry=task.dataset_entry, - block=self.estimate_block, + block=estimated_block, step=self.step, task_id=task.task_id, ) + logger.debug(f"Collected responses in {timer.final_time:.2f} seconds") + self.scoring_queue.append(scoring_config) + logger.debug(f"SCORING: Added to queue: {task.__class__.__name__}. Queue size: {len(self.scoring_queue)}") # Log the step event. return ValidatorLoggingEvent( diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 9db78cc6..d1e1a9de 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -6,7 +6,7 @@ import pandas as pd from loguru import logger -from prompting import __spec_version__, mutable_globals +from prompting import __spec_version__ from prompting.llms.model_zoo import ModelZoo from prompting.rewards.reward import WeightedRewardEvent from prompting.tasks.inference import InferenceTask @@ -17,17 +17,8 @@ from shared.settings import shared_settings FILENAME = "validator_weights.npz" - -try: - with np.load(FILENAME) as data: - PAST_WEIGHTS = [data[key] for key in data.files] - logger.debug(f"Loaded Past Weights: {PAST_WEIGHTS}") -except FileNotFoundError: - logger.info("No weights file found - this is expected on a new validator, starting with empty weights") - PAST_WEIGHTS = [] -except Exception as ex: - logger.error(f"Couldn't load weights from file: {ex}") WEIGHTS_HISTORY_LENGTH = 24 +PAST_WEIGHTS: list[np.ndarray] = [] def apply_reward_func(raw_rewards: np.ndarray, p=0.5): @@ -52,7 +43,9 @@ def save_weights(weights: list[np.ndarray]): np.savez_compressed(FILENAME, *weights) -def set_weights(weights: np.ndarray, step: int = 0): +def set_weights( + weights: np.ndarray, step: int = 0, subtensor: bt.Subtensor | None = None, metagraph: bt.Metagraph | None = None +): """ Sets the validator weights to the metagraph hotkeys based on the scores it has received from the miners. The weights determine the trust and incentive level the validator assigns to miner nodes on the network. """ @@ -80,8 +73,8 @@ def set_weights(weights: np.ndarray, step: int = 0): uids=shared_settings.METAGRAPH.uids, weights=averaged_weights, netuid=shared_settings.NETUID, - subtensor=shared_settings.SUBTENSOR, - metagraph=shared_settings.METAGRAPH, + subtensor=subtensor, + metagraph=metagraph, ) # Convert to uint16 weights and uids. @@ -124,7 +117,7 @@ def set_weights(weights: np.ndarray, step: int = 0): return # Set the weights on chain via our subtensor connection. - result = shared_settings.SUBTENSOR.set_weights( + result = subtensor.set_weights( wallet=shared_settings.WALLET, netuid=shared_settings.NETUID, uids=uint_uids, @@ -145,21 +138,42 @@ class WeightSetter(AsyncLoopRunner): sync: bool = True interval: int = 60 * 22 # set rewards every 20 minutes + reward_events: list[list[WeightedRewardEvent]] | None = None + subtensor: bt.Subtensor | None = None + metagraph: bt.Metagraph | None = None # interval: int = 60 + class Config: + arbitrary_types_allowed = True + + async def start(self, reward_events): + self.reward_events = reward_events + self.subtensor = bt.Subtensor(network=shared_settings.SUBTENSOR_NETWORK) + self.metagraph = self.subtensor.metagraph(netuid=shared_settings.NETUID) + global PAST_WEIGHTS + + try: + with np.load(FILENAME) as data: + PAST_WEIGHTS = [data[key] for key in data.files] + logger.debug(f"Loaded Past Weights: {PAST_WEIGHTS}") + except FileNotFoundError: + logger.info("No weights file found - this is expected on a new validator, starting with empty weights") + PAST_WEIGHTS = [] + except Exception as ex: + logger.error(f"Couldn't load weights from file: {ex}") + return await super().start() + async def run_step(self): await asyncio.sleep(0.01) try: logger.info("Reward setting loop running") - if len(mutable_globals.reward_events) == 0: + if len(self.reward_events) == 0: logger.warning("No reward events in queue, skipping weight setting...") return - logger.debug(f"Found {len(mutable_globals.reward_events)} reward events in queue") + logger.debug(f"Found {len(self.reward_events)} reward events in queue") # reward_events is a list of lists of WeightedRewardEvents - the 'sublists' each contain the multiple reward events for a single task - mutable_globals.reward_events: list[ - list[WeightedRewardEvent] - ] = mutable_globals.reward_events # to get correct typehinting + self.reward_events: list[list[WeightedRewardEvent]] = self.reward_events # to get correct typehinting # reward_dict = {uid: 0 for uid in get_uids(sampling_mode="all")} reward_dict = {uid: 0 for uid in range(1024)} @@ -174,7 +188,7 @@ async def run_step(self): logger.debug(f"Miner rewards before processing: {miner_rewards}") inference_events: list[WeightedRewardEvent] = [] - for reward_events in mutable_globals.reward_events: + for reward_events in self.reward_events: await asyncio.sleep(0.01) for reward_event in reward_events: if np.sum(reward_event.rewards) > 0: @@ -225,8 +239,8 @@ async def run_step(self): except Exception as ex: logger.exception(f"{ex}") # set weights on chain - set_weights(final_rewards, step=self.step) - mutable_globals.reward_events = [] # empty reward events queue + set_weights(final_rewards, step=self.step, subtensor=self.subtensor, metagraph=self.metagraph) + self.reward_events = [] # empty reward events queue await asyncio.sleep(0.01) return final_rewards diff --git a/pyproject.toml b/pyproject.toml index 21f4d70e..b24f97a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "prompting" -version = "2.15.1" +version = "2.15.2" description = "Subnetwork 1 runs on Bittensor and is maintained by Macrocosmos. It's an effort to create decentralised AI" authors = ["Kalei Brady, Dmytro Bobrenko, Felix Quinque, Steffen Cruz, Richard Wardle"] readme = "README.md" diff --git a/shared/misc.py b/shared/misc.py index 1b2bbe7a..c26285ec 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -5,10 +5,10 @@ from math import floor from typing import Any, Callable +import bittensor as bt from loguru import logger from shared.exceptions import BittensorError -from shared.settings import shared_settings class classproperty: @@ -86,7 +86,7 @@ def _ttl_hash_gen(seconds: int): # 12 seconds updating block. @ttl_cache(maxsize=1, ttl=12) -def ttl_get_block() -> int: +def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int: """ Retrieves the current block number from the blockchain. This method is cached with a time-to-live (TTL) of 12 seconds, meaning that it will only refresh the block number from the blockchain at most every 12 seconds, @@ -105,7 +105,7 @@ def ttl_get_block() -> int: Note: self here is the miner or validator instance """ try: - return shared_settings.SUBTENSOR.get_current_block() + return subtensor.get_current_block() except Exception as e: raise BittensorError(f"Bittensor error: {str(e)}") from e diff --git a/shared/settings.py b/shared/settings.py index a74e5615..9de9c091 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -164,6 +164,8 @@ def validate_mode(cls, v): logger.warning( "No SCORING_KEY found in .env.api file. You must add a scoring key that will allow us to forward miner responses to the validator for scoring." ) + if not os.getenv("SCORE_ORGANICS"): + logger.warning("Not scoring organics. This means that miners may not respond as consistently.") elif v["mode"] == "miner": if not dotenv.load_dotenv(".env.miner"): logger.warning("No .env.miner file found. Please create one.") diff --git a/shared/uids.py b/shared/uids.py index ac49e007..ef494ba2 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -114,7 +114,7 @@ def get_top_incentive_uids(k: int, vpermit_tao_limit: int) -> np.ndarray: # Extract the top uids. top_k_uids = [uid for uid, incentive in uid_incentive_pairs_sorted[:k]] - return np.array(top_k_uids) + return np.array(top_k_uids).astype(int) def get_uids( @@ -125,7 +125,8 @@ def get_uids( ) -> np.ndarray: if shared_settings.TEST and shared_settings.TEST_MINER_IDS: return random.sample( - list(np.array(shared_settings.TEST_MINER_IDS)), min(len(shared_settings.TEST_MINER_IDS), k or 10**6) + list(np.array(shared_settings.TEST_MINER_IDS).astype(int)), + min(len(shared_settings.TEST_MINER_IDS), k or 10**6), ) if sampling_mode == "random": return get_random_uids(k=k, exclude=exclude or []) diff --git a/tests/prompting/test_weight_settings.py b/tests/prompting/test_weight_settings.py index cce49182..00b923a5 100644 --- a/tests/prompting/test_weight_settings.py +++ b/tests/prompting/test_weight_settings.py @@ -43,7 +43,7 @@ def test_run_step_with_reward_events(): with ( patch("shared.uids.get_uids") as mock_get_uids, patch("prompting.weight_setting.weight_setter.TaskRegistry") as MockTaskRegistry, - patch("prompting.weight_setting.weight_setter.mutable_globals") as mock_mutable_globals, + # patch("prompting.weight_setting.weight_setter.mutable_globals") as mock_mutable_globals, patch("prompting.weight_setting.weight_setter.set_weights") as mock_set_weights, patch("prompting.weight_setting.weight_setter.logger") as mock_logger, ): @@ -75,7 +75,9 @@ def __init__(self, task, uids, rewards, weight): mock_task_registry.get_task_config = MagicMock(return_value=mock_task_registry.task_configs[0]) # Set up the mock mutable_globals - mock_mutable_globals.reward_events = [ + + weight_setter = WeightSetter() + reward_events = [ [ WeightedRewardEvent( task=mock_task_registry.task_configs[0], uids=mock_uids, rewards=[1.0, 2.0, 3.0, 4.0, 5.0], weight=1 @@ -87,8 +89,7 @@ def __init__(self, task, uids, rewards, weight): ), ], ] - - weight_setter = WeightSetter() + weight_setter.reward_events = reward_events output = asyncio.run(weight_setter.run_step()) print(output) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index bc3d2e2f..84ff7fdb 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -15,22 +15,22 @@ async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): + uid = int(uid) # sometimes uid is type np.uint64 + logger.info(f"Forwarding response to scoring with body: {body}") if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default return - # if body.get("task") != "InferenceTask": - # logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") - # return + if body.get("task") != "InferenceTask": + logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") + return url = f"http://{shared_settings.VALIDATOR_API}/scoring" payload = {"body": body, "chunks": chunks, "uid": uid} - # headers = { - # "Authorization": f"Bearer {shared_settings.SCORING_KEY}", #Add API key in Authorization header - # "Content-Type": "application/json", - # } try: timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post(url, json=payload) # , headers=headers) + response = await client.post( + url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + ) if response.status_code == 200: logger.info(f"Forwarding response completed with status {response.status_code}") @@ -98,6 +98,7 @@ async def stream_with_error_handling(): }, ) else: + logger.info("Forwarding response to scoring...") asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) return response[0] From b0d8e85eba9dce58accef93896135fa82277aa49 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Fri, 20 Dec 2024 12:15:20 +0100 Subject: [PATCH 02/14] Add Mixture-of-Miners endpoint (#472) ## Changes - Add Mixture-of-Miners endpoint. - Move chat completions processing and mixture of miners into two separate functions. - Add system prompt to the inference task. --- prompting/tasks/base_task.py | 2 +- validator_api/chat_completion.py | 104 +++++++++++++++++++++++++++++ validator_api/gpt_endpoints.py | 99 +++------------------------ validator_api/mixture_of_miners.py | 88 ++++++++++++++++++++++++ 4 files changed, 203 insertions(+), 90 deletions(-) create mode 100644 validator_api/chat_completion.py create mode 100644 validator_api/mixture_of_miners.py diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 404a912e..4290c68f 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -90,7 +90,7 @@ def generate_query( """Generates a query to be used for generating the challenge""" logger.info("🤖 Generating query...") llm_messages = [LLMMessage(role="system", content=self.query_system_prompt)] if self.query_system_prompt else [] - llm_messages += [LLMMessage(role="user", content=message) for message in messages] + llm_messages.extend([LLMMessage(role="user", content=message) for message in messages]) self.query = LLMWrapper.chat_complete(messages=LLMMessages(*llm_messages)) diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py new file mode 100644 index 00000000..b0cef872 --- /dev/null +++ b/validator_api/chat_completion.py @@ -0,0 +1,104 @@ +import asyncio +import json +import random +from typing import AsyncGenerator + +import httpx +from fastapi import HTTPException +from fastapi.responses import StreamingResponse +from loguru import logger + +from shared.epistula import make_openai_query +from shared.settings import shared_settings +from shared.uids import get_uids + + +async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): + uid = int(uid) # sometimes uid is type np.uint64 + logger.info(f"Forwarding response to scoring with body: {body}") + if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default + return + + if body.get("task") != "InferenceTask": + logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") + return + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + payload = {"body": body, "chunks": chunks, "uid": uid} + try: + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + ) + if response.status_code == 200: + logger.info(f"Forwarding response completed with status {response.status_code}") + + else: + logger.exception( + f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" + ) + + except Exception as e: + logger.error(f"Tried to forward response to {url} with payload {payload}") + logger.exception(f"Error while forwarding response: {e}") + + +async def stream_response( + response, collected_chunks: list[str], body: dict[str, any], uid: int +) -> AsyncGenerator[str, None]: + chunks_received = False + try: + async for chunk in response: + chunks_received = True + collected_chunks.append(chunk.choices[0].delta.content) + yield f"data: {json.dumps(chunk.model_dump())}\n\n" + + if not chunks_received: + logger.error("Stream is empty: No chunks were received") + yield 'data: {"error": "502 - Response is empty"}\n\n' + yield "data: [DONE]\n\n" + + # Forward the collected chunks after streaming is complete + asyncio.create_task(forward_response(uid=uid, body=body, chunks=collected_chunks)) + except asyncio.CancelledError: + logger.info("Client disconnected, streaming cancelled") + raise + except Exception as e: + logger.exception(f"Error during streaming: {e}") + yield 'data: {"error": "Internal server Error"}\n\n' + + +async def chat_completion(body: dict[str, any], uid: int | None = None) -> tuple | StreamingResponse: + """Handle regular chat completion without mixture of miners.""" + if uid is None: + uid = random.choice(get_uids(sampling_mode="top_incentive", k=100)) + + if uid is None: + logger.error("No available miner found") + raise HTTPException(status_code=503, detail="No available miner found") + + logger.debug(f"Querying uid {uid}") + STREAM = body.get("stream", False) + + collected_chunks: list[str] = [] + + logger.info(f"Making {'streaming' if STREAM else 'non-streaming'} openai query with body: {body}") + response = await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=STREAM) + + if STREAM: + return StreamingResponse( + stream_response(response, collected_chunks, body, uid), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + }, + ) + else: + asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) + return response[0] + + +async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple: + """Get response from a single miner.""" + return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 84ff7fdb..34681f0e 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -1,107 +1,28 @@ -import asyncio -import json import random -import httpx -from fastapi import APIRouter, HTTPException, Request +from fastapi import APIRouter, Request from loguru import logger from starlette.responses import StreamingResponse -from shared.epistula import make_openai_query -from shared.settings import shared_settings -from shared.uids import get_uids +from validator_api.chat_completion import chat_completion +from validator_api.mixture_of_miners import mixture_of_miners router = APIRouter() -async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): - uid = int(uid) # sometimes uid is type np.uint64 - logger.info(f"Forwarding response to scoring with body: {body}") - if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default - return - - if body.get("task") != "InferenceTask": - logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") - return - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunks, "uid": uid} - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} - ) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - - else: - logger.exception( - f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" - ) - - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}") - - @router.post("/v1/chat/completions") -async def chat_completion(request: Request): # , cbackground_tasks: BackgroundTasks): +async def completions(request: Request): + """Main endpoint that handles both regular and mixture of miners chat completion.""" try: body = await request.json() body["seed"] = int(body.get("seed") or random.randint(0, 1000000)) - STREAM = body.get("stream") or False - logger.debug(f"Streaming: {STREAM}") - # Get random miner from top 100 incentive. - uid = random.choice(get_uids(sampling_mode="top_incentive", k=100)) - # uid = get_available_miner(task=body.get("task"), model=body.get("model")) - if uid is None: - logger.error("No available miner found") - raise HTTPException(status_code=503, detail="No available miner found") - logger.debug(f"Querying uid {uid}") - - collected_chunks: list[str] = [] - - # Create a wrapper for the streaming response - async def stream_with_error_handling(): - chunks_received = False - try: - async for chunk in response: - chunks_received = True - collected_chunks.append(chunk.choices[0].delta.content) - yield f"data: {json.dumps(chunk.model_dump())}\n\n" - - if not chunks_received: - logger.error("Stream is empty: No chunks were received") - yield 'data: {"error": "502 - Response is empty"}\n\n' - yield "data: [DONE]\n\n" - - # Once the stream is done, forward the collected chunks - asyncio.create_task(forward_response(uid=uid, body=body, chunks=collected_chunks)) - # background_tasks.add_task(forward_response, uid=uid, body=body, chunks=collected_chunks) - except asyncio.CancelledError: - logger.info("Client disconnected, streaming cancelled") - raise - except Exception as e: - logger.exception(f"Error during streaming: {e}") - yield 'data: {"error": "Internal server Error"}\n\n' - - logger.info(f"Making {'streaming' if STREAM else 'non-streaming'} openai query with body: {body}") - response = await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=STREAM) - if STREAM: - return StreamingResponse( - stream_with_error_handling(), - media_type="text/event-stream", - headers={ - "Cache-Control": "no-cache", - "Connection": "keep-alive", - }, - ) + # Choose between regular completion and mixture of miners. + if body.get("mixture", False): + return await mixture_of_miners(body) else: - logger.info("Forwarding response to scoring...") - asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) - return response[0] + return await chat_completion(body) except Exception as e: - logger.exception(f"Error setting up streaming: {e}") + logger.exception(f"Error in chat completion: {e}") return StreamingResponse(content="Internal Server Error", status_code=500) diff --git a/validator_api/mixture_of_miners.py b/validator_api/mixture_of_miners.py new file mode 100644 index 00000000..e2aaa05a --- /dev/null +++ b/validator_api/mixture_of_miners.py @@ -0,0 +1,88 @@ +import asyncio +import copy +import random + +from fastapi import HTTPException +from fastapi.responses import StreamingResponse +from loguru import logger + +from shared.uids import get_uids +from validator_api.chat_completion import chat_completion, get_response_from_miner + +DEFAULT_SYSTEM_PROMPT = """You have been provided with a set of responses from various open-source models to the latest user query. +Your task is to synthesize these responses into a single, high-quality and concise response. +It is crucial to follow the provided instuctions or examples in the given prompt if any, and ensure the answer is in correct and expected format. +Critically evaluate the information provided in these responses, recognizing that some of it may be biased or incorrect. +Your response should not simply replicate the given answers but should offer a refined and accurate reply to the instruction. +Ensure your response is well-structured, coherent, and adheres to the highest standards of accuracy and reliability. +Responses from models:""" + +TASK_SYSTEM_PROMPT = { + None: DEFAULT_SYSTEM_PROMPT, + # Add more task-specific system prompts here. +} + +NUM_MIXTURE_MINERS = 5 +TOP_INCENTIVE_POOL = 100 + + +async def get_miner_response(body: dict, uid: str) -> tuple | None: + """Get response from a single miner with error handling.""" + try: + return await get_response_from_miner(body, uid) + except Exception as e: + logger.error(f"Error getting response from miner {uid}: {e}") + return None + + +async def mixture_of_miners(body: dict[str, any]) -> tuple | StreamingResponse: + """Handle chat completion with mixture of miners approach. + + Based on Mixture-of-Agents Enhances Large Language Model Capabilities, 2024, Wang et al.: + https://arxiv.org/abs/2406.04692 + + Args: + body: Query parameters: + messages: User prompt. + stream: If True, stream the response. + model: Optional model used for inference, SharedSettings.LLM_MODEL is used by default. + task: Optional task, see prompting/tasks/task_registry.py, InferenceTask is used by default. + """ + body_first_step = copy.deepcopy(body) + body_first_step["stream"] = False + + # Get multiple miners + miner_uids = get_uids(sampling_mode="top_incentive", k=NUM_MIXTURE_MINERS) + if len(miner_uids) == 0: + raise HTTPException(status_code=503, detail="No available miners found") + + # Concurrently collect responses from all miners. + miner_tasks = [get_miner_response(body_first_step, uid) for uid in miner_uids] + responses = await asyncio.gather(*miner_tasks) + + # Filter out None responses (failed requests). + valid_responses = [r for r in responses if r is not None] + + if not valid_responses: + raise HTTPException(status_code=503, detail="Failed to get responses from miners") + + # Extract completions from the responses. + completions = [response[1][0] for response in valid_responses if response and len(response) > 1] + + task_name = body.get("task") + system_prompt = TASK_SYSTEM_PROMPT.get(task_name, DEFAULT_SYSTEM_PROMPT) + + # Aggregate responses into one system prompt. + agg_system_prompt = system_prompt + "\n" + "\n".join([f"{i+1}. {comp}" for i, comp in enumerate(completions)]) + + # Prepare new messages with the aggregated system prompt. + new_messages = [{"role": "system", "content": agg_system_prompt}] + new_messages.extend([msg for msg in body["messages"] if msg["role"] != "system"]) + + # Update the body with the new messages. + final_body = copy.deepcopy(body) + final_body["messages"] = new_messages + + # Get final response using a random top miner. + final_uid = random.choice(get_uids(sampling_mode="top_incentive", k=TOP_INCENTIVE_POOL)) + return await chat_completion(final_body, final_uid) From 472ba83ef85a4ea056cd670cf0aa10d60371b530 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:01:13 +0000 Subject: [PATCH 03/14] Revert debug mode --- neurons/miners/epistula_miner/miner.py | 6 +- neurons/test_vanilla_post.py | 6 +- prompting/settings.py | 134 ------------------------- scripts/client.py | 6 +- 4 files changed, 9 insertions(+), 143 deletions(-) delete mode 100644 prompting/settings.py diff --git a/neurons/miners/epistula_miner/miner.py b/neurons/miners/epistula_miner/miner.py index 7e918830..d6887954 100644 --- a/neurons/miners/epistula_miner/miner.py +++ b/neurons/miners/epistula_miner/miner.py @@ -1,8 +1,8 @@ # ruff: noqa: E402 -from prompting import settings +from shared import settings -settings.settings = settings.Settings.load(mode="miner") -settings = settings.settings +settings.shared_settings = settings.SharedSettings.load(mode="miner") +shared_settings = settings.shared_settings import asyncio import json diff --git a/neurons/test_vanilla_post.py b/neurons/test_vanilla_post.py index ecd45878..31e6adeb 100644 --- a/neurons/test_vanilla_post.py +++ b/neurons/test_vanilla_post.py @@ -1,10 +1,10 @@ import openai from httpx import Timeout -from prompting import settings +from shared import settings -settings.settings = settings.Settings.load(mode="validator") -settings = settings.settings +settings.shared_settings = settings.SharedSettings.load(mode="validator") +shared_settings = settings.shared_settings from shared.epistula import create_header_hook diff --git a/prompting/settings.py b/prompting/settings.py deleted file mode 100644 index 01ce347d..00000000 --- a/prompting/settings.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -from functools import cached_property -from typing import Any, Literal, Optional - -import bittensor as bt -import dotenv -from loguru import logger -from pydantic import Field, model_validator -from pydantic_settings import BaseSettings - - -class Settings(BaseSettings): - mode: Literal["miner", "validator", "mock"] - MOCK: bool = False - NO_BACKGROUND_THREAD: bool = True - SAVE_PATH: Optional[str] = Field("./storage", env="SAVE_PATH") - model_config = {"frozen": True, "arbitrary_types_allowed": False} - - # Class variables for singleton. - _instance: Optional["Settings"] = None - _instance_mode: Optional[str] = None - - @classmethod - def load_env_file(cls, mode: Literal["miner", "validator", "mock"]): - """Load the appropriate .env file based on the mode.""" - if mode == "miner": - dotenv_file = ".env.miner" - elif mode == "validator": - dotenv_file = ".env.validator" - # For mock testing, still make validator env vars available where possible. - elif mode == "mock": - dotenv_file = ".env.validator" - else: - raise ValueError(f"Invalid mode: {mode}") - - if dotenv_file: - if not dotenv.load_dotenv(dotenv.find_dotenv(filename=dotenv_file)): - logger.warning( - f"No {dotenv_file} file found. The use of args when running a {mode} will be deprecated " - "in the near future." - ) - - @classmethod - def load(cls, mode: Literal["miner", "validator", "mock"]) -> "Settings": - """Load or retrieve the Settings instance based on the mode.""" - if cls._instance is not None and cls._instance_mode == mode: - return cls._instance - else: - cls.load_env_file(mode) - cls._instance = cls(mode=mode) - cls._instance_mode = mode - return cls._instance - - @model_validator(mode="before") - def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: - mode = values["mode"] - netuid = values.get("NETUID", 61) - - if netuid is None: - raise ValueError("NETUID must be specified") - values["TEST"] = netuid != 1 - if values.get("TEST_MINER_IDS"): - values["TEST_MINER_IDS"] = str(values["TEST_MINER_IDS"]).split(",") - if mode == "mock": - values["MOCK"] = True - values["NEURON_DEVICE"] = "cpu" - logger.info("Running in mock mode. Bittensor objects will not be initialized.") - return values - - # load slow packages only if not in mock mode - import torch - - if not values.get("NEURON_DEVICE"): - values["NEURON_DEVICE"] = "cuda" if torch.cuda.is_available() else "cpu" - - # Ensure SAVE_PATH exists. - save_path = values.get("SAVE_PATH", "./storage") - if not os.path.exists(save_path): - os.makedirs(save_path) - if values.get("SN19_API_KEY") is None or values.get("SN19_API_URL") is None: - logger.warning( - "It is strongly recommended to provide an SN19 API KEY + URL to avoid incurring OpenAI API costs." - ) - if mode == "validator": - if values.get("OPENAI_API_KEY") is None: - raise Exception( - "You must provide an OpenAI API key as a backup. It is recommended to also provide an SN19 API key + url to avoid incurring API costs." - ) - if values.get("SCORING_ADMIN_KEY") is None: - raise Exception("You must provide an admin key to access the API.") - if values.get("PROXY_URL") is None: - logger.warning( - "You must provide a proxy URL to use the DuckDuckGo API - your vtrust might decrease if no DDG URL is provided." - ) - return values - - @cached_property - def WALLET(self): - wallet_name = self.WALLET_NAME # or config().wallet.name - hotkey = self.HOTKEY # or config().wallet.hotkey - logger.info(f"Instantiating wallet with name: {wallet_name}, hotkey: {hotkey}") - return bt.wallet(name=wallet_name, hotkey=hotkey) - - @cached_property - def SUBTENSOR(self) -> bt.subtensor: - subtensor_network = self.SUBTENSOR_NETWORK or os.environ.get("SUBTENSOR_NETWORK", "local") - # bt_config = config() - if subtensor_network.lower() == "local": - subtensor_network = os.environ.get("SUBTENSOR_CHAIN_ENDPOINT") # bt_config.subtensor.chain_endpoint or - else: - subtensor_network = subtensor_network.lower() # bt_config.subtensor.network or - logger.info(f"Instantiating subtensor with network: {subtensor_network}") - return bt.subtensor(network=subtensor_network) - - @cached_property - def METAGRAPH(self) -> bt.metagraph: - logger.info(f"Instantiating metagraph with NETUID: {self.NETUID}") - return self.SUBTENSOR.metagraph(netuid=self.NETUID) - - @cached_property - def DENDRITE(self) -> bt.dendrite: - logger.info(f"Instantiating dendrite with wallet: {self.WALLET}") - return bt.dendrite(wallet=self.WALLET) - - -logger.info("Settings class instantiated.") -settings: Optional[Settings] = None -try: - settings: Optional[Settings] = Settings.load(mode="mock") - pass -except Exception as e: - logger.exception(f"Error loading settings: {e}") - settings = None -logger.info("Settings loaded.") diff --git a/scripts/client.py b/scripts/client.py index aabe7e43..3ef6d36c 100644 --- a/scripts/client.py +++ b/scripts/client.py @@ -1,7 +1,7 @@ -from prompting import settings +from shared import settings -settings.settings = settings.Settings.load(mode="validator") -settings = settings.settings +settings.shared_settings = settings.SharedSettings.load(mode="validator") +shared_settings = settings.shared_settings import asyncio import json From 0194a5d99c9fbdb63da36b9708e33b27525c1341 Mon Sep 17 00:00:00 2001 From: Hollyqui Date: Fri, 20 Dec 2024 17:15:12 +0000 Subject: [PATCH 04/14] Fixing task reproducibility --- neurons/validator.py | 7 +++++-- prompting/llms/hf_llm.py | 13 ++++++++----- prompting/rewards/scoring.py | 1 + prompting/tasks/base_task.py | 3 +-- prompting/tasks/inference.py | 2 +- shared/logging.py | 2 ++ 6 files changed, 18 insertions(+), 10 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index a414b256..295b3ddb 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -9,8 +9,6 @@ import time import torch -from loguru import logger - from prompting.api.api import start_scoring_api from prompting.llms.model_manager import model_scheduler from prompting.llms.utils import GPUInfo @@ -20,6 +18,11 @@ from prompting.tasks.task_sending import task_sender from prompting.weight_setting.weight_setter import weight_setter from shared.profiling import profiler +import loguru + +# Add a handler to write logs to a file +loguru.logger.add("logfile.log", rotation="1000 MB", retention="10 days", level="DEBUG") +from loguru import logger torch.multiprocessing.set_start_method("spawn", force=True) diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index ad5d54c8..4d1e44a8 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -15,8 +15,8 @@ def __init__(self, model_id="hugging-quants/Meta-Llama-3.1-70B-Instruct-AWQ-INT4 Initialize Hugging Face model with reproducible settings and optimizations """ # Create a random seed for reproducibility - self.seed = random.randint(0, 1_000_000) - self.set_random_seeds(self.seed) + # self.seed = random.randint(0, 1_000_000) + # self.set_random_seeds(self.seed) self.model: PreTrainedModel = AutoModelForCausalLM.from_pretrained( model_id, torch_dtype=torch.float16, @@ -65,9 +65,12 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= )[0] logger.debug( - f"PROMPT: {messages}\n\nRESPONSES: {results}\n\n" - f"SAMPLING PARAMS: {params}\n\n" - f"TIME FOR RESPONSE: {timer.elapsed_time}" + f"""REPRODUCIBLEHF WAS QUERIED: + PROMPT: {messages}\n\n + RESPONSES: {results}\n\n + SAMPLING PARAMS: {params}\n\n + SEED: {seed}\n\n + TIME FOR RESPONSE: {timer.elapsed_time}""" ) return results if len(results) > 1 else results[0] diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 1405f517..1ba9d3cd 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -115,6 +115,7 @@ async def run_step(self) -> RewardLoggingEvent: block=scoring_config.block, step=scoring_config.step, task_id=scoring_config.task_id, + task_dict=scoring_config.task.model_dump(), ) ) logger.info("Adding scores to rewards_and_uids") diff --git a/prompting/tasks/base_task.py b/prompting/tasks/base_task.py index 4290c68f..9434df94 100644 --- a/prompting/tasks/base_task.py +++ b/prompting/tasks/base_task.py @@ -51,7 +51,7 @@ class BaseTextTask(BaseTask): reference: str | None = None llm_model: ModelConfig = None llm_model_id: str = None - seed: str = None + seed: int = Field(default_factory=lambda: random.randint(0, 1000000), allow_mutation=False) query_system_prompt: ClassVar[str | None] = None reference_system_prompt: ClassVar[str | None] = None augmentation_system_prompt: ClassVar[str | None] = None @@ -63,7 +63,6 @@ class BaseTextTask(BaseTask): def get_model_id_and_seed(self) -> "BaseTextTask": if self.llm_model: self.llm_model_id = self.llm_model.llm_model_id if self.llm_model else None - self.seed = random.randint(0, 1000000) return self def make_query(self, dataset_entry: DatasetEntry, **kwargs) -> str: diff --git a/prompting/tasks/inference.py b/prompting/tasks/inference.py index 84295ee9..07f1100c 100644 --- a/prompting/tasks/inference.py +++ b/prompting/tasks/inference.py @@ -38,7 +38,7 @@ class InferenceTask(BaseTextTask): reference: str | None = None llm_model: ModelConfig | None = None llm_model_id: ModelConfig | None = random.choice(ModelZoo.models_configs).llm_model_id - seed: int = Field(default_factory=lambda: random.randint(0, 1_000_000)) + seed: int = Field(default_factory=lambda: random.randint(0, 1_000_000), allow_mutation=False) sampling_params: dict[str, float] = shared_settings.SAMPLING_PARAMS @model_validator(mode="after") diff --git a/shared/logging.py b/shared/logging.py index daf4ee28..6da5e1de 100644 --- a/shared/logging.py +++ b/shared/logging.py @@ -180,6 +180,7 @@ class RewardLoggingEvent(BaseEvent): reference: str challenge: str task: str + task_dict: dict model_config = ConfigDict(arbitrary_types_allowed=True) @@ -222,6 +223,7 @@ def log_event(event: BaseEvent): reinit_wandb() unpacked_event = unpack_events(event) unpacked_event = convert_arrays_to_lists(unpacked_event) + logger.debug(f"""LOGGING WANDB EVENT: {unpacked_event}""") wandb.log(unpacked_event) From 3dbab910a4e463f1b5f9a64a6b3c966bfed1d599 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Sat, 21 Dec 2024 11:35:32 +0100 Subject: [PATCH 05/14] Fix formatting (#513) --- neurons/validator.py | 3 ++- prompting/llms/hf_llm.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 295b3ddb..e5a6f6ac 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -8,7 +8,9 @@ import multiprocessing as mp import time +import loguru import torch + from prompting.api.api import start_scoring_api from prompting.llms.model_manager import model_scheduler from prompting.llms.utils import GPUInfo @@ -18,7 +20,6 @@ from prompting.tasks.task_sending import task_sender from prompting.weight_setting.weight_setter import weight_setter from shared.profiling import profiler -import loguru # Add a handler to write logs to a file loguru.logger.add("logfile.log", rotation="1000 MB", retention="10 days", level="DEBUG") diff --git a/prompting/llms/hf_llm.py b/prompting/llms/hf_llm.py index 4d1e44a8..934071e3 100644 --- a/prompting/llms/hf_llm.py +++ b/prompting/llms/hf_llm.py @@ -65,7 +65,7 @@ def generate(self, messages: list[str] | list[dict], sampling_params=None, seed= )[0] logger.debug( - f"""REPRODUCIBLEHF WAS QUERIED: + f"""REPRODUCIBLEHF WAS QUERIED: PROMPT: {messages}\n\n RESPONSES: {results}\n\n SAMPLING PARAMS: {params}\n\n From 6b2e6ba7918c41ffcd75e142b40255ae6fa2b429 Mon Sep 17 00:00:00 2001 From: Hollyqui Date: Mon, 23 Dec 2024 09:27:34 +0000 Subject: [PATCH 06/14] Improving docs, fixing settings --- .env.api.example | 8 +++++--- api.config.js | 12 ++++++++++++ api_keys.json | 2 +- run_api.sh | 2 +- shared/settings.py | 27 --------------------------- validator_api/API_docs.md | 9 +++++++-- 6 files changed, 26 insertions(+), 34 deletions(-) create mode 100644 api.config.js diff --git a/.env.api.example b/.env.api.example index d2a046d7..eaf06982 100644 --- a/.env.api.example +++ b/.env.api.example @@ -1,3 +1,5 @@ -API_PORT = "8005" -API_HOST = "0.0.0.0" -# SCORING_KEY = "YOUR_SCORING_API_KEY_GOES_HERE" +API_PORT = "42170" # Port for the API server +API_HOST = "0.0.0.0" # Host for the API server +SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file) +SCORE_ORGANICS = True # Whether to score organics +VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring \ No newline at end of file diff --git a/api.config.js b/api.config.js new file mode 100644 index 00000000..b0308f84 --- /dev/null +++ b/api.config.js @@ -0,0 +1,12 @@ +module.exports = { + apps: [ + { + name: 'api_server', + script: 'poetry', + interpreter: 'none', + args: ['run', 'python', 'validator_api/api.py'], + min_uptime: '5m', + max_restarts: 5 + } + ] +}; diff --git a/api_keys.json b/api_keys.json index 0967ef42..9e26dfee 100644 --- a/api_keys.json +++ b/api_keys.json @@ -1 +1 @@ -{} +{} \ No newline at end of file diff --git a/run_api.sh b/run_api.sh index 6614e20f..40dcc610 100644 --- a/run_api.sh +++ b/run_api.sh @@ -29,7 +29,7 @@ echo "module.exports = { name: 'api_server', script: 'poetry', interpreter: 'none', - args: ['run', 'python', 'api/api.py'], + args: ['run', 'python', 'validator_api/api.py'], min_uptime: '5m', max_restarts: 5 } diff --git a/shared/settings.py b/shared/settings.py index 9de9c091..1fb9f84d 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -212,7 +212,6 @@ def load(cls, mode: Literal["miner", "validator", "mock", "api"]) -> "SharedSett def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: mode = values["mode"] netuid = values.get("NETUID", 61) - if netuid is None: raise ValueError("NETUID must be specified") values["TEST"] = netuid != 1 @@ -224,32 +223,6 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: logger.info("Running in mock mode. Bittensor objects will not be initialized.") return values - # load slow packages only if not in mock mode - import torch - - if not values.get("NEURON_DEVICE"): - values["NEURON_DEVICE"] = "cuda" if torch.cuda.is_available() else "cpu" - - # Ensure SAVE_PATH exists. - save_path = values.get("SAVE_PATH", "./storage") - if not os.path.exists(save_path): - os.makedirs(save_path) - if values.get("SN19_API_KEY") is None or values.get("SN19_API_URL") is None: - logger.warning( - "It is strongly recommended to provide an SN19 API KEY + URL to avoid incurring OpenAI API costs." - ) - if mode == "validator": - if values.get("OPENAI_API_KEY") is None: - raise Exception( - "You must provide an OpenAI API key as a backup. It is recommended to also provide an SN19 API key + url to avoid incurring API costs." - ) - if values.get("SCORING_ADMIN_KEY") is None and values.get("DEPLOY_SCORING_API"): - logger.warning("You must provide a SCORING_ADMIN_KEY to access the API. Disabling scoring endpoint") - values["DEPLOY_SCORING_API"] = False - if values.get("PROXY_URL") is None: - logger.warning( - "You must provide a proxy URL to use the DuckDuckGo API - your vtrust might decrease if no DDG URL is provided." - ) return values @cached_property diff --git a/validator_api/API_docs.md b/validator_api/API_docs.md index 71030a30..bf609989 100644 --- a/validator_api/API_docs.md +++ b/validator_api/API_docs.md @@ -20,10 +20,15 @@ This document describes the API endpoints available for [Subnet 1](https://githu ## Getting Started -Follow these steps to set up and run the API server: +SN1 can run either in validator mode or in API mode. Both modes will require the validator hotkey. + +As a validator, you MUST be running one instance in validator mode and can launch an arbitrary number of API instances. These API instances will proxy the responses from miners to the validator for scoring. + +To set up and run the API server: 1. **Install dependencies**: Ensure all required dependencies are installed using Poetry. -2. **Run the API server**: Start the server to access the API endpoints. +2. **Set up the .env.api file**: Copy the .env.api.example file to .env.api and fill in the validator hotkey. +3. **Run the API server**: Start the server to access the API endpoints. Use the following command: From f44d62ca3fc55dedce5df6a11c15e0a5b58d69c2 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Mon, 23 Dec 2024 09:37:41 +0000 Subject: [PATCH 07/14] Improving docs, fixing settings (#514) --- .env.api.example | 8 +++++--- api.config.js | 12 ++++++++++++ api_keys.json | 2 +- run_api.sh | 2 +- shared/settings.py | 27 --------------------------- validator_api/API_docs.md | 9 +++++++-- 6 files changed, 26 insertions(+), 34 deletions(-) create mode 100644 api.config.js diff --git a/.env.api.example b/.env.api.example index d2a046d7..eaf06982 100644 --- a/.env.api.example +++ b/.env.api.example @@ -1,3 +1,5 @@ -API_PORT = "8005" -API_HOST = "0.0.0.0" -# SCORING_KEY = "YOUR_SCORING_API_KEY_GOES_HERE" +API_PORT = "42170" # Port for the API server +API_HOST = "0.0.0.0" # Host for the API server +SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file) +SCORE_ORGANICS = True # Whether to score organics +VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring \ No newline at end of file diff --git a/api.config.js b/api.config.js new file mode 100644 index 00000000..b0308f84 --- /dev/null +++ b/api.config.js @@ -0,0 +1,12 @@ +module.exports = { + apps: [ + { + name: 'api_server', + script: 'poetry', + interpreter: 'none', + args: ['run', 'python', 'validator_api/api.py'], + min_uptime: '5m', + max_restarts: 5 + } + ] +}; diff --git a/api_keys.json b/api_keys.json index 0967ef42..9e26dfee 100644 --- a/api_keys.json +++ b/api_keys.json @@ -1 +1 @@ -{} +{} \ No newline at end of file diff --git a/run_api.sh b/run_api.sh index 6614e20f..40dcc610 100644 --- a/run_api.sh +++ b/run_api.sh @@ -29,7 +29,7 @@ echo "module.exports = { name: 'api_server', script: 'poetry', interpreter: 'none', - args: ['run', 'python', 'api/api.py'], + args: ['run', 'python', 'validator_api/api.py'], min_uptime: '5m', max_restarts: 5 } diff --git a/shared/settings.py b/shared/settings.py index 9de9c091..1fb9f84d 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -212,7 +212,6 @@ def load(cls, mode: Literal["miner", "validator", "mock", "api"]) -> "SharedSett def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: mode = values["mode"] netuid = values.get("NETUID", 61) - if netuid is None: raise ValueError("NETUID must be specified") values["TEST"] = netuid != 1 @@ -224,32 +223,6 @@ def complete_settings(cls, values: dict[str, Any]) -> dict[str, Any]: logger.info("Running in mock mode. Bittensor objects will not be initialized.") return values - # load slow packages only if not in mock mode - import torch - - if not values.get("NEURON_DEVICE"): - values["NEURON_DEVICE"] = "cuda" if torch.cuda.is_available() else "cpu" - - # Ensure SAVE_PATH exists. - save_path = values.get("SAVE_PATH", "./storage") - if not os.path.exists(save_path): - os.makedirs(save_path) - if values.get("SN19_API_KEY") is None or values.get("SN19_API_URL") is None: - logger.warning( - "It is strongly recommended to provide an SN19 API KEY + URL to avoid incurring OpenAI API costs." - ) - if mode == "validator": - if values.get("OPENAI_API_KEY") is None: - raise Exception( - "You must provide an OpenAI API key as a backup. It is recommended to also provide an SN19 API key + url to avoid incurring API costs." - ) - if values.get("SCORING_ADMIN_KEY") is None and values.get("DEPLOY_SCORING_API"): - logger.warning("You must provide a SCORING_ADMIN_KEY to access the API. Disabling scoring endpoint") - values["DEPLOY_SCORING_API"] = False - if values.get("PROXY_URL") is None: - logger.warning( - "You must provide a proxy URL to use the DuckDuckGo API - your vtrust might decrease if no DDG URL is provided." - ) return values @cached_property diff --git a/validator_api/API_docs.md b/validator_api/API_docs.md index 71030a30..bf609989 100644 --- a/validator_api/API_docs.md +++ b/validator_api/API_docs.md @@ -20,10 +20,15 @@ This document describes the API endpoints available for [Subnet 1](https://githu ## Getting Started -Follow these steps to set up and run the API server: +SN1 can run either in validator mode or in API mode. Both modes will require the validator hotkey. + +As a validator, you MUST be running one instance in validator mode and can launch an arbitrary number of API instances. These API instances will proxy the responses from miners to the validator for scoring. + +To set up and run the API server: 1. **Install dependencies**: Ensure all required dependencies are installed using Poetry. -2. **Run the API server**: Start the server to access the API endpoints. +2. **Set up the .env.api file**: Copy the .env.api.example file to .env.api and fill in the validator hotkey. +3. **Run the API server**: Start the server to access the API endpoints. Use the following command: From 7753f1acf1ffa12bd80966434be6632fb94c84cf Mon Sep 17 00:00:00 2001 From: Hollyqui Date: Mon, 23 Dec 2024 10:58:02 +0000 Subject: [PATCH 08/14] Changing transformers version --- poetry.lock | 64 +++++++++++++++++++++++------------------------ shared/logging.py | 1 - 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/poetry.lock b/poetry.lock index ca9c9733..07fc3c60 100644 --- a/poetry.lock +++ b/poetry.lock @@ -968,13 +968,13 @@ files = [ [[package]] name = "click" -version = "8.1.7" +version = "8.1.8" description = "Composable command line interface toolkit" optional = false python-versions = ">=3.7" files = [ - {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"}, - {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"}, + {file = "click-8.1.8-py3-none-any.whl", hash = "sha256:63c132bbbed01578a06712a2d1f497bb62d9c1c0d329b7903a866228027263b2"}, + {file = "click-8.1.8.tar.gz", hash = "sha256:ed53c9d8990d83c2a27deae68e4ee337473f6330c040a31d4225c9574d16096a"}, ] [package.dependencies] @@ -1953,13 +1953,13 @@ test = ["flaky", "ipyparallel", "pre-commit", "pytest (>=7.0)", "pytest-asyncio [[package]] name = "ipython" -version = "8.30.0" +version = "8.31.0" description = "IPython: Productive Interactive Computing" optional = false python-versions = ">=3.10" files = [ - {file = "ipython-8.30.0-py3-none-any.whl", hash = "sha256:85ec56a7e20f6c38fce7727dcca699ae4ffc85985aa7b23635a8008f918ae321"}, - {file = "ipython-8.30.0.tar.gz", hash = "sha256:cb0a405a306d2995a5cbb9901894d240784a9f341394c6ba3f4fe8c6eb89ff6e"}, + {file = "ipython-8.31.0-py3-none-any.whl", hash = "sha256:46ec58f8d3d076a61d128fe517a51eb730e3aaf0c184ea8c17d16e366660c6a6"}, + {file = "ipython-8.31.0.tar.gz", hash = "sha256:b6a2274606bec6166405ff05e54932ed6e5cfecaca1fc05f2cacde7bb074d70b"}, ] [package.dependencies] @@ -2024,13 +2024,13 @@ testing = ["Django", "attrs", "colorama", "docopt", "pytest (<9.0.0)"] [[package]] name = "jinja2" -version = "3.1.4" +version = "3.1.5" description = "A very fast and expressive template engine." optional = false python-versions = ">=3.7" files = [ - {file = "jinja2-3.1.4-py3-none-any.whl", hash = "sha256:bc5dd2abb727a5319567b7a813e6a2e7318c39f4f487cfe6c89c6f9c7d25197d"}, - {file = "jinja2-3.1.4.tar.gz", hash = "sha256:4a3aee7acbbe7303aede8e9648d13b8bf88a429282aa6122a993f0ac800cb369"}, + {file = "jinja2-3.1.5-py3-none-any.whl", hash = "sha256:aba0f4dc9ed8013c424088f68a5c226f7d6097ed89b246d7749c2ec4175c6adb"}, + {file = "jinja2-3.1.5.tar.gz", hash = "sha256:8fefff8dc3034e27bb80d67c671eb8a9bc424c0ef4c0826edbff304cceff43bb"}, ] [package.dependencies] @@ -3679,32 +3679,32 @@ files = [ [[package]] name = "psutil" -version = "6.1.0" +version = "6.1.1" description = "Cross-platform lib for process and system monitoring in Python." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7" files = [ - {file = "psutil-6.1.0-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:ff34df86226c0227c52f38b919213157588a678d049688eded74c76c8ba4a5d0"}, - {file = "psutil-6.1.0-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:c0e0c00aa18ca2d3b2b991643b799a15fc8f0563d2ebb6040f64ce8dc027b942"}, - {file = "psutil-6.1.0-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:000d1d1ebd634b4efb383f4034437384e44a6d455260aaee2eca1e9c1b55f047"}, - {file = "psutil-6.1.0-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:5cd2bcdc75b452ba2e10f0e8ecc0b57b827dd5d7aaffbc6821b2a9a242823a76"}, - {file = "psutil-6.1.0-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:045f00a43c737f960d273a83973b2511430d61f283a44c96bf13a6e829ba8fdc"}, - {file = "psutil-6.1.0-cp27-none-win32.whl", hash = "sha256:9118f27452b70bb1d9ab3198c1f626c2499384935aaf55388211ad982611407e"}, - {file = "psutil-6.1.0-cp27-none-win_amd64.whl", hash = "sha256:a8506f6119cff7015678e2bce904a4da21025cc70ad283a53b099e7620061d85"}, - {file = "psutil-6.1.0-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:6e2dcd475ce8b80522e51d923d10c7871e45f20918e027ab682f94f1c6351688"}, - {file = "psutil-6.1.0-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:0895b8414afafc526712c498bd9de2b063deaac4021a3b3c34566283464aff8e"}, - {file = "psutil-6.1.0-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9dcbfce5d89f1d1f2546a2090f4fcf87c7f669d1d90aacb7d7582addece9fb38"}, - {file = "psutil-6.1.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:498c6979f9c6637ebc3a73b3f87f9eb1ec24e1ce53a7c5173b8508981614a90b"}, - {file = "psutil-6.1.0-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d905186d647b16755a800e7263d43df08b790d709d575105d419f8b6ef65423a"}, - {file = "psutil-6.1.0-cp36-cp36m-win32.whl", hash = "sha256:6d3fbbc8d23fcdcb500d2c9f94e07b1342df8ed71b948a2649b5cb060a7c94ca"}, - {file = "psutil-6.1.0-cp36-cp36m-win_amd64.whl", hash = "sha256:1209036fbd0421afde505a4879dee3b2fd7b1e14fee81c0069807adcbbcca747"}, - {file = "psutil-6.1.0-cp37-abi3-win32.whl", hash = "sha256:1ad45a1f5d0b608253b11508f80940985d1d0c8f6111b5cb637533a0e6ddc13e"}, - {file = "psutil-6.1.0-cp37-abi3-win_amd64.whl", hash = "sha256:a8fb3752b491d246034fa4d279ff076501588ce8cbcdbb62c32fd7a377d996be"}, - {file = "psutil-6.1.0.tar.gz", hash = "sha256:353815f59a7f64cdaca1c0307ee13558a0512f6db064e92fe833784f08539c7a"}, + {file = "psutil-6.1.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:9ccc4316f24409159897799b83004cb1e24f9819b0dcf9c0b68bdcb6cefee6a8"}, + {file = "psutil-6.1.1-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:ca9609c77ea3b8481ab005da74ed894035936223422dc591d6772b147421f777"}, + {file = "psutil-6.1.1-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:8df0178ba8a9e5bc84fed9cfa61d54601b371fbec5c8eebad27575f1e105c0d4"}, + {file = "psutil-6.1.1-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:1924e659d6c19c647e763e78670a05dbb7feaf44a0e9c94bf9e14dfc6ba50468"}, + {file = "psutil-6.1.1-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:018aeae2af92d943fdf1da6b58665124897cfc94faa2ca92098838f83e1b1bca"}, + {file = "psutil-6.1.1-cp27-none-win32.whl", hash = "sha256:6d4281f5bbca041e2292be3380ec56a9413b790579b8e593b1784499d0005dac"}, + {file = "psutil-6.1.1-cp27-none-win_amd64.whl", hash = "sha256:c777eb75bb33c47377c9af68f30e9f11bc78e0f07fbf907be4a5d70b2fe5f030"}, + {file = "psutil-6.1.1-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:fc0ed7fe2231a444fc219b9c42d0376e0a9a1a72f16c5cfa0f68d19f1a0663e8"}, + {file = "psutil-6.1.1-cp36-abi3-macosx_11_0_arm64.whl", hash = "sha256:0bdd4eab935276290ad3cb718e9809412895ca6b5b334f5a9111ee6d9aff9377"}, + {file = "psutil-6.1.1-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b6e06c20c05fe95a3d7302d74e7097756d4ba1247975ad6905441ae1b5b66003"}, + {file = "psutil-6.1.1-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:97f7cb9921fbec4904f522d972f0c0e1f4fabbdd4e0287813b21215074a0f160"}, + {file = "psutil-6.1.1-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33431e84fee02bc84ea36d9e2c4a6d395d479c9dd9bba2376c1f6ee8f3a4e0b3"}, + {file = "psutil-6.1.1-cp36-cp36m-win32.whl", hash = "sha256:384636b1a64b47814437d1173be1427a7c83681b17a450bfc309a1953e329603"}, + {file = "psutil-6.1.1-cp36-cp36m-win_amd64.whl", hash = "sha256:8be07491f6ebe1a693f17d4f11e69d0dc1811fa082736500f649f79df7735303"}, + {file = "psutil-6.1.1-cp37-abi3-win32.whl", hash = "sha256:eaa912e0b11848c4d9279a93d7e2783df352b082f40111e078388701fd479e53"}, + {file = "psutil-6.1.1-cp37-abi3-win_amd64.whl", hash = "sha256:f35cfccb065fff93529d2afb4a2e89e363fe63ca1e4a5da22b603a85833c2649"}, + {file = "psutil-6.1.1.tar.gz", hash = "sha256:cf8496728c18f2d0b45198f06895be52f36611711746b7f30c464b422b50e2f5"}, ] [package.extras] -dev = ["black", "check-manifest", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pytest-cov", "requests", "rstcheck", "ruff", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "virtualenv", "wheel"] +dev = ["abi3audit", "black", "check-manifest", "coverage", "packaging", "pylint", "pyperf", "pypinfo", "pytest-cov", "requests", "rstcheck", "ruff", "sphinx", "sphinx_rtd_theme", "toml-sort", "twine", "virtualenv", "vulture", "wheel"] test = ["pytest", "pytest-xdist", "setuptools"] [[package]] @@ -5937,13 +5937,13 @@ devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3) [[package]] name = "urllib3" -version = "2.2.3" +version = "2.3.0" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false -python-versions = ">=3.8" +python-versions = ">=3.9" files = [ - {file = "urllib3-2.2.3-py3-none-any.whl", hash = "sha256:ca899ca043dcb1bafa3e262d73aa25c465bfb49e0bd9dd5d59f1d0acba2f8fac"}, - {file = "urllib3-2.2.3.tar.gz", hash = "sha256:e7d814a81dad81e6caf2ec9fdedb284ecc9c73076b62654547cc64ccdcae26e9"}, + {file = "urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df"}, + {file = "urllib3-2.3.0.tar.gz", hash = "sha256:f8c5449b3cf0861679ce7e0503c7b44b5ec981bec0d1d3795a07f1ba96f0204d"}, ] [package.extras] diff --git a/shared/logging.py b/shared/logging.py index 6da5e1de..f2fdbd15 100644 --- a/shared/logging.py +++ b/shared/logging.py @@ -223,7 +223,6 @@ def log_event(event: BaseEvent): reinit_wandb() unpacked_event = unpack_events(event) unpacked_event = convert_arrays_to_lists(unpacked_event) - logger.debug(f"""LOGGING WANDB EVENT: {unpacked_event}""") wandb.log(unpacked_event) From 4c75c4a600b2410ae40a0a7fca87430d022c9129 Mon Sep 17 00:00:00 2001 From: Hollyqui Date: Mon, 23 Dec 2024 17:04:41 +0000 Subject: [PATCH 09/14] Adding API key checking --- validator_api/gpt_endpoints.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 34681f0e..9edb5203 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -1,6 +1,7 @@ +import json import random -from fastapi import APIRouter, Request +from fastapi import APIRouter, Request, Depends, Header, HTTPException from loguru import logger from starlette.responses import StreamingResponse @@ -9,9 +10,19 @@ router = APIRouter() +# load api keys from api_keys.json +with open("api_keys.json", "r") as f: + _keys = json.load(f) + + +def validate_api_key(api_key: str = Header(...)): + if api_key not in _keys: + raise HTTPException(status_code=403, detail="Invalid API key") + return _keys[api_key] + @router.post("/v1/chat/completions") -async def completions(request: Request): +async def completions(request: Request, api_key: str = Depends(validate_api_key)): """Main endpoint that handles both regular and mixture of miners chat completion.""" try: body = await request.json() From 9977853c44b7906cd8d9fe849a9b251cab1986fd Mon Sep 17 00:00:00 2001 From: richwardle Date: Thu, 2 Jan 2025 02:51:37 +0000 Subject: [PATCH 10/14] run precommit fix --- .env.api.example | 2 +- api_keys.json | 2 +- validator_api/gpt_endpoints.py | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.env.api.example b/.env.api.example index eaf06982..dcd8ea1d 100644 --- a/.env.api.example +++ b/.env.api.example @@ -2,4 +2,4 @@ API_PORT = "42170" # Port for the API server API_HOST = "0.0.0.0" # Host for the API server SCORING_KEY = "123" # The scoring key for the validator (must match the scoring key in the .env.validator file) SCORE_ORGANICS = True # Whether to score organics -VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring \ No newline at end of file +VALIDATOR_API = "0.0.0.0:8094" # The validator API to forward responses to for scoring diff --git a/api_keys.json b/api_keys.json index 9e26dfee..0967ef42 100644 --- a/api_keys.json +++ b/api_keys.json @@ -1 +1 @@ -{} \ No newline at end of file +{} diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 833e588f..3614e865 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -1,7 +1,7 @@ import json import random -from fastapi import APIRouter, Request, Depends, Header, HTTPException +from fastapi import APIRouter, Depends, Header, HTTPException, Request from loguru import logger from starlette.responses import StreamingResponse @@ -14,6 +14,7 @@ with open("api_keys.json", "r") as f: _keys = json.load(f) + def validate_api_key(api_key: str = Header(...)): if api_key not in _keys: raise HTTPException(status_code=403, detail="Invalid API key") From ebb4f8536990c9f8b4930f9113e04b9e6fadc344 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:39:47 +0100 Subject: [PATCH 11/14] SN1-361: Query multiple miners to return fastest response + Scoring bug fix (#521) --- neurons/validator.py | 7 +- prompting/api/api.py | 5 +- prompting/weight_setting/weight_setter.py | 2 +- scripts/client.py | 1 - shared/misc.py | 2 +- validator_api/chat_completion.py | 151 +++++++++++++++++----- 6 files changed, 128 insertions(+), 40 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 9d7e6305..6062ec8f 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -81,11 +81,12 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) -def start_api(): +def start_api(scoring_queue, reward_events): async def start(): from prompting.api.api import start_scoring_api # noqa: F401 - await start_scoring_api() + await start_scoring_api(scoring_queue, reward_events) + while True: await asyncio.sleep(10) logger.debug("Running API...") @@ -125,7 +126,7 @@ async def main(): if shared_settings.DEPLOY_SCORING_API: # Use multiprocessing to bypass API blocking issue - api_process = mp.Process(target=start_api, name="API_Process") + api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process") api_process.start() processes.append(api_process) diff --git a/prompting/api/api.py b/prompting/api/api.py index 63678ab7..825b19c7 100644 --- a/prompting/api/api.py +++ b/prompting/api/api.py @@ -4,6 +4,7 @@ from prompting.api.miner_availabilities.api import router as miner_availabilities_router from prompting.api.scoring.api import router as scoring_router +from prompting.rewards.scoring import task_scorer from shared.settings import shared_settings app = FastAPI() @@ -17,7 +18,9 @@ def health(): return {"status": "healthy"} -async def start_scoring_api(): +async def start_scoring_api(scoring_queue, reward_events): + task_scorer.scoring_queue = scoring_queue + task_scorer.reward_events = reward_events logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}") uvicorn.run( "prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False diff --git a/prompting/weight_setting/weight_setter.py b/prompting/weight_setting/weight_setter.py index 28c195b1..12fc97c0 100644 --- a/prompting/weight_setting/weight_setter.py +++ b/prompting/weight_setting/weight_setter.py @@ -102,7 +102,7 @@ def set_weights( "weights": processed_weights.flatten(), "raw_weights": str(list(weights.flatten())), "averaged_weights": str(list(averaged_weights.flatten())), - "block": ttl_get_block(), + "block": ttl_get_block(subtensor=subtensor), } ) step_filename = "weights.csv" diff --git a/scripts/client.py b/scripts/client.py index 3ef6d36c..29e72a0d 100644 --- a/scripts/client.py +++ b/scripts/client.py @@ -9,7 +9,6 @@ from loguru import logger from shared.epistula import query_miners -from shared.settings import shared_settings """ This has assumed you have: diff --git a/shared/misc.py b/shared/misc.py index e58f313c..858765b7 100644 --- a/shared/misc.py +++ b/shared/misc.py @@ -100,7 +100,7 @@ def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int: efficiently reduces the workload on the blockchain interface. Example: - current_block = ttl_get_block(self) + current_block = ttl_get_block(subtensor=subtensor) Note: self here is the miner or validator instance """ diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index b0cef872..ee670e70 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -1,7 +1,7 @@ import asyncio import json import random -from typing import AsyncGenerator +from typing import AsyncGenerator, List, Optional import httpx from fastapi import HTTPException @@ -14,14 +14,15 @@ async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): - uid = int(uid) # sometimes uid is type np.uint64 - logger.info(f"Forwarding response to scoring with body: {body}") - if not shared_settings.SCORE_ORGANICS: # Allow disabling of scoring by default + uid = int(uid) + logger.info(f"Forwarding response from uid {uid} to scoring with body: {body} and chunks: {chunks}") + if not shared_settings.SCORE_ORGANICS: return if body.get("task") != "InferenceTask": logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") return + url = f"http://{shared_settings.VALIDATOR_API}/scoring" payload = {"body": body, "chunks": chunks, "uid": uid} try: @@ -32,62 +33,124 @@ async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): ) if response.status_code == 200: logger.info(f"Forwarding response completed with status {response.status_code}") - else: logger.exception( f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" ) - except Exception as e: logger.error(f"Tried to forward response to {url} with payload {payload}") logger.exception(f"Error while forwarding response: {e}") -async def stream_response( - response, collected_chunks: list[str], body: dict[str, any], uid: int +async def stream_from_first_response( + responses: List[asyncio.Task], collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] ) -> AsyncGenerator[str, None]: - chunks_received = False + first_valid_response = None try: - async for chunk in response: + # Wait for the first valid response + while responses and first_valid_response is None: + done, pending = await asyncio.wait(responses, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + response = await task + if response and not isinstance(response, Exception): + first_valid_response = response + break + except Exception as e: + logger.error(f"Error in miner response: {e}") + responses.remove(task) + + if first_valid_response is None: + logger.error("No valid response received from any miner") + yield 'data: {"error": "502 - No valid response received"}\n\n' + return + + # Stream the first valid response + chunks_received = False + async for chunk in first_valid_response: chunks_received = True - collected_chunks.append(chunk.choices[0].delta.content) + collected_chunks_list[0].append(chunk.choices[0].delta.content) yield f"data: {json.dumps(chunk.model_dump())}\n\n" if not chunks_received: logger.error("Stream is empty: No chunks were received") yield 'data: {"error": "502 - Response is empty"}\n\n' + yield "data: [DONE]\n\n" - # Forward the collected chunks after streaming is complete - asyncio.create_task(forward_response(uid=uid, body=body, chunks=collected_chunks)) + # Continue collecting remaining responses in background for scoring + remaining = asyncio.gather(*pending, return_exceptions=True) + asyncio.create_task(collect_remaining_responses(remaining, collected_chunks_list, body, uids)) + except asyncio.CancelledError: logger.info("Client disconnected, streaming cancelled") + for task in responses: + task.cancel() raise except Exception as e: logger.exception(f"Error during streaming: {e}") yield 'data: {"error": "Internal server Error"}\n\n' -async def chat_completion(body: dict[str, any], uid: int | None = None) -> tuple | StreamingResponse: - """Handle regular chat completion without mixture of miners.""" - if uid is None: - uid = random.choice(get_uids(sampling_mode="top_incentive", k=100)) +async def collect_remaining_responses( + remaining: asyncio.Task, collected_chunks_list: List[List[str]], body: dict[str, any], uids: List[int] +): + """Collect remaining responses for scoring without blocking the main response.""" + try: + responses = await remaining + logger.debug(f"responses to forward: {responses}") + for i, response in enumerate(responses): + if isinstance(response, Exception): + logger.error(f"Error collecting response from uid {uids[i+1]}: {response}") + continue + + async for chunk in response: + collected_chunks_list[i + 1].append(chunk.choices[0].delta.content) + for uid, chunks in zip(uids, collected_chunks_list): + # Forward for scoring + asyncio.create_task(forward_response(uid, body, chunks)) - if uid is None: - logger.error("No available miner found") - raise HTTPException(status_code=503, detail="No available miner found") + except Exception as e: + logger.exception(f"Error collecting remaining responses: {e}") - logger.debug(f"Querying uid {uid}") - STREAM = body.get("stream", False) - collected_chunks: list[str] = [] +async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple: + """Get response from a single miner.""" + return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False) - logger.info(f"Making {'streaming' if STREAM else 'non-streaming'} openai query with body: {body}") - response = await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=STREAM) + +async def chat_completion( + body: dict[str, any], uids: Optional[int] = None, num_miners: int = 3 +) -> tuple | StreamingResponse: + """Handle chat completion with multiple miners in parallel.""" + # Get multiple UIDs if none specified + if uids is None: + uids = list(get_uids(sampling_mode="top_incentive", k=100)) + if uids is None or len(uids) == 0: # if not uids throws error, figure out how to fix + logger.error("No available miners found") + raise HTTPException(status_code=503, detail="No available miners found") + selected_uids = random.sample(uids, min(num_miners, len(uids))) + else: + selected_uids = uids[:num_miners] # If UID is specified, only use that one + + logger.debug(f"Querying uids {selected_uids}") + STREAM = body.get("stream", False) + + # Initialize chunks collection for each miner + collected_chunks_list = [[] for _ in selected_uids] if STREAM: + # Create tasks for all miners + response_tasks = [ + asyncio.create_task( + make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=True) + ) + for uid in selected_uids + ] + return StreamingResponse( - stream_response(response, collected_chunks, body, uid), + stream_from_first_response(response_tasks, collected_chunks_list, body, selected_uids), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -95,10 +158,32 @@ async def chat_completion(body: dict[str, any], uid: int | None = None) -> tuple }, ) else: - asyncio.create_task(forward_response(uid=uid, body=body, chunks=response[1])) - return response[0] - - -async def get_response_from_miner(body: dict[str, any], uid: int) -> tuple: - """Get response from a single miner.""" - return await make_openai_query(shared_settings.METAGRAPH, shared_settings.WALLET, body, uid, stream=False) + # For non-streaming requests, wait for first valid response + response_tasks = [asyncio.create_task(get_response_from_miner(body, uid)) for uid in selected_uids] + + first_valid_response = None + collected_responses = [] + + while response_tasks and first_valid_response is None: + done, pending = await asyncio.wait(response_tasks, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + response = await task + if response and isinstance(response, tuple): + if first_valid_response is None: + first_valid_response = response + collected_responses.append(response) + except Exception as e: + logger.error(f"Error in miner response: {e}") + response_tasks.remove(task) + + if first_valid_response is None: + raise HTTPException(status_code=502, detail="No valid response received") + + # Forward all collected responses for scoring in the background + for i, response in enumerate(collected_responses): + if response and isinstance(response, tuple): + asyncio.create_task(forward_response(uid=selected_uids[i], body=body, chunks=response[1])) + + return first_valid_response[0] # Return only the response object, not the chunks From 667e264e0bed235e0f65bf989affc98ac2b4d3f1 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Fri, 3 Jan 2025 16:40:17 +0100 Subject: [PATCH 12/14] Remove py3.9 from github actions (#527) --- .github/workflows/python-package.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 296f9b5c..d601319e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.9", "3.10"] + python-version: ["3.10"] steps: - uses: actions/checkout@v3 From 4654d3a92f08f2520c98416b1c5ceafee7f9ec43 Mon Sep 17 00:00:00 2001 From: Felix Quinque <40171911+Hollyqui@users.noreply.github.com> Date: Fri, 3 Jan 2025 19:53:56 +0100 Subject: [PATCH 13/14] Adding web retrieval endpoint (#528) --- prompting/api/scoring/api.py | 72 ++++++++++++++++++++-------- prompting/datasets/random_website.py | 4 +- shared/dendrite.py | 2 +- shared/epistula.py | 2 +- shared/uids.py | 3 +- validator_api/chat_completion.py | 31 +----------- validator_api/gpt_endpoints.py | 51 ++++++++++++++++++++ validator_api/utils.py | 35 ++++++++++++++ 8 files changed, 146 insertions(+), 54 deletions(-) create mode 100644 validator_api/utils.py diff --git a/prompting/api/scoring/api.py b/prompting/api/scoring/api.py index ee18ade1..d7c981c1 100644 --- a/prompting/api/scoring/api.py +++ b/prompting/api/scoring/api.py @@ -4,9 +4,11 @@ from fastapi import APIRouter, Depends, Header, HTTPException, Request from loguru import logger +from prompting.datasets.random_website import DDGDatasetEntry from prompting.llms.model_zoo import ModelZoo from prompting.rewards.scoring import task_scorer from prompting.tasks.inference import InferenceTask +from prompting.tasks.web_retrieval import WebRetrievalTask from shared.base import DatasetEntry from shared.dendrite import DendriteResponseEvent from shared.epistula import SynapseStreamResult @@ -37,22 +39,54 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate uid = int(payload.get("uid")) chunks = payload.get("chunks") llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None - task_scorer.add_to_queue( - task=InferenceTask( - messages=[msg["content"] for msg in body.get("messages")], - llm_model=llm_model, - llm_model_id=body.get("model"), - seed=int(body.get("seed", 0)), - sampling_params=body.get("sampling_params", {}), - ), - response=DendriteResponseEvent( - uids=[uid], - stream_results=[SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])], - timeout=shared_settings.NEURON_TIMEOUT, - ), - dataset_entry=DatasetEntry(), - block=shared_settings.METAGRAPH.block, - step=-1, - task_id=str(uuid.uuid4()), - ) - logger.info("Organic tas appended to scoring queue") + task = body.get("task") + if task == "InferenceTask": + logger.info(f"Received Organic InferenceTask with body: {body}") + task_scorer.add_to_queue( + task=InferenceTask( + messages=[msg["content"] for msg in body.get("messages")], + llm_model=llm_model, + llm_model_id=body.get("model"), + seed=int(body.get("seed", 0)), + sampling_params=body.get("sampling_params", {}), + ), + response=DendriteResponseEvent( + uids=[uid], + stream_results=[ + SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) + ], + timeout=shared_settings.NEURON_TIMEOUT, + ), + dataset_entry=DatasetEntry(), + block=shared_settings.METAGRAPH.block, + step=-1, + task_id=str(uuid.uuid4()), + ) + elif task == "WebRetrievalTask": + logger.info(f"Received Organic WebRetrievalTask with body: {body}") + try: + search_term = body.get("messages")[0].get("content") + except Exception as ex: + logger.error(f"Failed to get search term from messages: {ex}, can't score WebRetrievalTask") + return + + task_scorer.add_to_queue( + task=WebRetrievalTask( + messages=[msg["content"] for msg in body.get("messages")], + seed=int(body.get("seed", 0)), + sampling_params=body.get("sampling_params", {}), + query=search_term, + ), + response=DendriteResponseEvent( + uids=[uid], + stream_results=[ + SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None]) + ], + timeout=shared_settings.NEURON_TIMEOUT, + ), + dataset_entry=DDGDatasetEntry(search_term=search_term), + block=shared_settings.METAGRAPH.block, + step=-1, + task_id=str(uuid.uuid4()), + ) + logger.info("Organic task appended to scoring queue") diff --git a/prompting/datasets/random_website.py b/prompting/datasets/random_website.py index 3058812a..18fd3c8d 100644 --- a/prompting/datasets/random_website.py +++ b/prompting/datasets/random_website.py @@ -15,8 +15,8 @@ class DDGDatasetEntry(DatasetEntry): search_term: str - website_url: str - website_content: str + website_url: str = None + website_content: str = None class DDGDataset(BaseDataset): diff --git a/shared/dendrite.py b/shared/dendrite.py index 254a6c79..ccfd5c86 100644 --- a/shared/dendrite.py +++ b/shared/dendrite.py @@ -35,9 +35,9 @@ def model_dump(self): class DendriteResponseEvent(BaseModel): uids: np.ndarray | list[float] - axons: list[str] timeout: float stream_results: list[SynapseStreamResult] + axons: list[str] = [] completions: list[str] = [] status_messages: list[str] = [] status_codes: list[int] = [] diff --git a/shared/epistula.py b/shared/epistula.py index 44a59f6f..5af06485 100644 --- a/shared/epistula.py +++ b/shared/epistula.py @@ -111,7 +111,7 @@ async def merged_stream(responses: list[AsyncGenerator]): logger.error(f"Error while streaming: {e}") -async def query_miners(uids, body: dict[str, Any]): +async def query_miners(uids, body: dict[str, Any]) -> list[SynapseStreamResult]: try: tasks = [] for uid in uids: diff --git a/shared/uids.py b/shared/uids.py index ef494ba2..a8c99b5d 100644 --- a/shared/uids.py +++ b/shared/uids.py @@ -114,7 +114,8 @@ def get_top_incentive_uids(k: int, vpermit_tao_limit: int) -> np.ndarray: # Extract the top uids. top_k_uids = [uid for uid, incentive in uid_incentive_pairs_sorted[:k]] - return np.array(top_k_uids).astype(int) + return list(np.array(top_k_uids).astype(int)) + # return [int(k) for k in top_k_uids] def get_uids( diff --git a/validator_api/chat_completion.py b/validator_api/chat_completion.py index ee670e70..fd05a174 100644 --- a/validator_api/chat_completion.py +++ b/validator_api/chat_completion.py @@ -3,7 +3,6 @@ import random from typing import AsyncGenerator, List, Optional -import httpx from fastapi import HTTPException from fastapi.responses import StreamingResponse from loguru import logger @@ -11,35 +10,7 @@ from shared.epistula import make_openai_query from shared.settings import shared_settings from shared.uids import get_uids - - -async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): - uid = int(uid) - logger.info(f"Forwarding response from uid {uid} to scoring with body: {body} and chunks: {chunks}") - if not shared_settings.SCORE_ORGANICS: - return - - if body.get("task") != "InferenceTask": - logger.debug(f"Skipping forwarding for non-inference task: {body.get('task')}") - return - - url = f"http://{shared_settings.VALIDATOR_API}/scoring" - payload = {"body": body, "chunks": chunks, "uid": uid} - try: - timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) - async with httpx.AsyncClient(timeout=timeout) as client: - response = await client.post( - url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} - ) - if response.status_code == 200: - logger.info(f"Forwarding response completed with status {response.status_code}") - else: - logger.exception( - f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" - ) - except Exception as e: - logger.error(f"Tried to forward response to {url} with payload {payload}") - logger.exception(f"Error while forwarding response: {e}") +from validator_api.utils import forward_response async def stream_from_first_response( diff --git a/validator_api/gpt_endpoints.py b/validator_api/gpt_endpoints.py index 3614e865..b363c1b1 100644 --- a/validator_api/gpt_endpoints.py +++ b/validator_api/gpt_endpoints.py @@ -1,12 +1,18 @@ +import asyncio import json import random +import numpy as np from fastapi import APIRouter, Depends, Header, HTTPException, Request from loguru import logger from starlette.responses import StreamingResponse +from shared.epistula import SynapseStreamResult, query_miners +from shared.settings import shared_settings +from shared.uids import get_uids from validator_api.chat_completion import chat_completion from validator_api.mixture_of_miners import mixture_of_miners +from validator_api.utils import forward_response router = APIRouter() @@ -37,3 +43,48 @@ async def completions(request: Request, api_key: str = Depends(validate_api_key) except Exception as e: logger.exception(f"Error in chat completion: {e}") return StreamingResponse(content="Internal Server Error", status_code=500) + + +@router.post("/web_retrieval") +async def web_retrieval(search_query: str, n_miners: int = 10, uids: list[int] = None): + uids = list(get_uids(sampling_mode="random", k=n_miners)) + logger.debug(f"🔍 Querying uids: {uids}") + if len(uids) == 0: + logger.warning("No available miners. This should already have been caught earlier.") + return + + body = { + "seed": random.randint(0, 1_000_000), + "sampling_parameters": shared_settings.SAMPLING_PARAMS, + "task": "WebRetrievalTask", + "messages": [ + {"role": "user", "content": search_query}, + ], + } + stream_results = await query_miners(uids, body) + results = [ + "".join(res.accumulated_chunks) + for res in stream_results + if isinstance(res, SynapseStreamResult) and res.accumulated_chunks + ] + distinct_results = list(np.unique(results)) + logger.info( + f"🔍 Collected responses from {len(stream_results)} miners. {len(results)} responded successfully with a total of {len(distinct_results)} distinct results" + ) + loaded_results = [] + for result in distinct_results: + try: + loaded_results.append(json.loads(result)) + logger.info(f"🔍 Result: {result}") + except Exception: + logger.error(f"🔍 Result: {result}") + if len(loaded_results) == 0: + raise HTTPException(status_code=500, detail="No miner responded successfully") + + for uid, res in zip(uids, stream_results): + asyncio.create_task( + forward_response( + uid=uid, body=body, chunks=res.accumulated_chunks if res and res.accumulated_chunks else [] + ) + ) + return loaded_results diff --git a/validator_api/utils.py b/validator_api/utils.py new file mode 100644 index 00000000..b83e819b --- /dev/null +++ b/validator_api/utils.py @@ -0,0 +1,35 @@ +import httpx +from loguru import logger + +from shared.settings import shared_settings + + +# TODO: Modify this so that all the forwarded responses are sent in a single request. This is both more efficient but +# also means that on the validator side all responses are scored at once, speeding up the scoring process. +async def forward_response(uid: int, body: dict[str, any], chunks: list[str]): + uid = int(uid) + logger.info(f"Forwarding response from uid {uid} to scoring with body: {body} and chunks: {chunks}") + if not shared_settings.SCORE_ORGANICS: + return + + if body.get("task") != "InferenceTask" and body.get("task") != "WebRetrievalTask": + logger.debug(f"Skipping forwarding for non- inference/web retrieval task: {body.get('task')}") + return + + url = f"http://{shared_settings.VALIDATOR_API}/scoring" + payload = {"body": body, "chunks": chunks, "uid": uid} + try: + timeout = httpx.Timeout(timeout=120.0, connect=60.0, read=30.0, write=30.0, pool=5.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url, json=payload, headers={"api-key": shared_settings.SCORING_KEY, "Content-Type": "application/json"} + ) + if response.status_code == 200: + logger.info(f"Forwarding response completed with status {response.status_code}") + else: + logger.exception( + f"Forwarding response uid {uid} failed with status {response.status_code} and payload {payload}" + ) + except Exception as e: + logger.error(f"Tried to forward response to {url} with payload {payload}") + logger.exception(f"Error while forwarding response: {e}") From f58965ebcdd7e13c19e39c4572b0c61872d5e7a9 Mon Sep 17 00:00:00 2001 From: bkb2135 <98138173+bkb2135@users.noreply.github.com> Date: Tue, 7 Jan 2025 08:10:52 -0500 Subject: [PATCH 14/14] Update miner.py --- neurons/miners/epistula_miner/miner.py | 28 +++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/neurons/miners/epistula_miner/miner.py b/neurons/miners/epistula_miner/miner.py index d6887954..7ed556f9 100644 --- a/neurons/miners/epistula_miner/miner.py +++ b/neurons/miners/epistula_miner/miner.py @@ -39,7 +39,7 @@ def __init__(self): self.client = httpx.AsyncClient( base_url="https://api.openai.com/v1", headers={ - "Authorization": f"Bearer {settings.OPENAI_API_KEY}", + "Authorization": f"Bearer {shared_settings.OPENAI_API_KEY}", "Content-Type": "application/json", }, ) @@ -107,14 +107,14 @@ async def verify_request( signed_by = request.headers.get("Epistula-Signed-By") signed_for = request.headers.get("Epistula-Signed-For") - if signed_for != settings.WALLET.hotkey.ss58_address: + if signed_for != shared_settings.WALLET.hotkey.ss58_address: raise HTTPException(status_code=400, detail="Bad Request, message is not intended for self") - if signed_by not in settings.METAGRAPH.hotkeys: + if signed_by not in shared_settings.METAGRAPH.hotkeys: raise HTTPException(status_code=401, detail="Signer not in metagraph") - uid = settings.METAGRAPH.hotkeys.index(signed_by) - stake = settings.METAGRAPH.S[uid].item() - if not settings.NETUID == 61 and stake < 10000: + uid = shared_settings.METAGRAPH.hotkeys.index(signed_by) + stake = shared_settings.METAGRAPH.S[uid].item() + if not shared_settings.NETUID == 61 and stake < 10000: logger.warning(f"Blacklisting request from {signed_by} [uid={uid}], not enough stake -- {stake}") raise HTTPException(status_code=401, detail="Stake below minimum: {stake}") @@ -133,7 +133,7 @@ async def verify_request( raise HTTPException(status_code=400, detail=err) def run(self): - external_ip = None # settings.EXTERNAL_IP + external_ip = None # shared_settings.EXTERNAL_IP if not external_ip or external_ip == "[::]": try: external_ip = requests.get("https://checkip.amazonaws.com").text.strip() @@ -142,16 +142,16 @@ def run(self): logger.error("Failed to get external IP") logger.info( - f"Serving miner endpoint {external_ip}:{settings.AXON_PORT} on network: {settings.SUBTENSOR_NETWORK} with netuid: {settings.NETUID}" + f"Serving miner endpoint {external_ip}:{shared_settings.AXON_PORT} on network: {shared_settings.SUBTENSOR_NETWORK} with netuid: {shared_settings.NETUID}" ) serve_success = serve_extrinsic( - subtensor=settings.SUBTENSOR, - wallet=settings.WALLET, + subtensor=shared_settings.SUBTENSOR, + wallet=shared_settings.WALLET, ip=external_ip, - port=settings.AXON_PORT, + port=shared_settings.AXON_PORT, protocol=4, - netuid=settings.NETUID, + netuid=shared_settings.NETUID, ) if not serve_success: logger.error("Failed to serve endpoint") @@ -174,7 +174,7 @@ def run(self): fast_config = uvicorn.Config( app, host="0.0.0.0", - port=settings.AXON_PORT, + port=shared_settings.AXON_PORT, log_level="info", loop="asyncio", workers=4, @@ -182,7 +182,7 @@ def run(self): self.fast_api = FastAPIThreadedServer(config=fast_config) self.fast_api.start() - logger.info(f"Miner starting at block: {settings.SUBTENSOR.block}") + logger.info(f"Miner starting at block: {shared_settings.SUBTENSOR.block}") # Main execution loop. try: