From 19a0f7fb23265e8affc2a50bc979d94e7b7191a1 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Fri, 3 Jan 2025 14:52:58 +0000 Subject: [PATCH 1/5] Remove redundant loop creation --- neurons/validator.py | 14 +++++++------- prompting/rewards/scoring.py | 4 ++-- shared/loop_runner.py | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 9d7e6305..6775686e 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -46,21 +46,21 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): # -------- Duplicate of create_task_loop ---------- logger.info("Starting AvailabilityCheckingLoop...") - asyncio.create_task(availability_checking_loop.start()) + availability_checking_loop.start() logger.info("Starting TaskSender...") - asyncio.create_task(task_sender.start(task_queue, scoring_queue)) + task_sender.start(task_queue, scoring_queue) logger.info("Starting TaskLoop...") - asyncio.create_task(task_loop.start(task_queue, scoring_queue)) + task_loop.start(task_queue, scoring_queue) # ------------------------------------------------- logger.info("Starting ModelScheduler...") - asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler"), + model_scheduler.start(scoring_queue, name="ModelScheduler") logger.info("Starting TaskScorer...") - asyncio.create_task(task_scorer.start(scoring_queue, reward_events), name="TaskScorer"), + task_scorer.start(scoring_queue, reward_events, name="TaskScorer") logger.info("Starting WeightSetter...") - asyncio.create_task(weight_setter.start(reward_events)) + weight_setter.start(reward_events) # Main monitoring loop start = time.time() @@ -78,7 +78,7 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}") logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}") - asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) + spawn_loops(task_queue, scoring_queue, reward_events) def start_api(): diff --git a/prompting/rewards/scoring.py b/prompting/rewards/scoring.py index 5da5d299..c314b6ef 100644 --- a/prompting/rewards/scoring.py +++ b/prompting/rewards/scoring.py @@ -27,10 +27,10 @@ class TaskScorer(AsyncLoopRunner): model_config = ConfigDict(arbitrary_types_allowed=True) - async def start(self, scoring_queue, reward_events): + async def start(self, scoring_queue, reward_events, name: str | None = None): self.scoring_queue = scoring_queue self.reward_events = reward_events - return await super().start() + return await super().start(name=name) def add_to_queue( self, diff --git a/shared/loop_runner.py b/shared/loop_runner.py index 636f2205..af63570e 100644 --- a/shared/loop_runner.py +++ b/shared/loop_runner.py @@ -104,14 +104,14 @@ async def run_loop(self): logger.info("Loop has been cleaned up.") logger.debug("Exiting run_loop") - async def start(self): + async def start(self, name: str | None = None): """Start the loop.""" if self.running: logger.warning("Loop is already running.") return self.running = True logger.debug(f"{self.name}: Starting loop with {'synchronized' if self.sync else 'non-synchronized'} mode") - self._task = asyncio.create_task(self.run_loop()) + self._task = asyncio.create_task(self.run_loop(), name=name) async def stop(self): """Stop the loop.""" From 4ee7f1b1dfde5547a018f0e4d832d4c33de2054d Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Mon, 6 Jan 2025 22:09:07 +0000 Subject: [PATCH 2/5] Fix test_api; fix loops --- neurons/validator.py | 14 +++++++------- scripts/test_api.py | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 6775686e..ca40ffc4 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -46,21 +46,21 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): # -------- Duplicate of create_task_loop ---------- logger.info("Starting AvailabilityCheckingLoop...") - availability_checking_loop.start() + await availability_checking_loop.start() logger.info("Starting TaskSender...") - task_sender.start(task_queue, scoring_queue) + await task_sender.start(task_queue, scoring_queue) logger.info("Starting TaskLoop...") - task_loop.start(task_queue, scoring_queue) + await task_loop.start(task_queue, scoring_queue) # ------------------------------------------------- logger.info("Starting ModelScheduler...") - model_scheduler.start(scoring_queue, name="ModelScheduler") + await model_scheduler.start(scoring_queue, name="ModelScheduler") logger.info("Starting TaskScorer...") - task_scorer.start(scoring_queue, reward_events, name="TaskScorer") + await task_scorer.start(scoring_queue, reward_events, name="TaskScorer") logger.info("Starting WeightSetter...") - weight_setter.start(reward_events) + await weight_setter.start(reward_events) # Main monitoring loop start = time.time() @@ -78,7 +78,7 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): logger.debug(f"Number of tasks in Scoring Queue: {len(scoring_queue)}") logger.debug(f"Number of tasks in Reward Events: {len(reward_events)}") - spawn_loops(task_queue, scoring_queue, reward_events) + asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events)) def start_api(): diff --git a/scripts/test_api.py b/scripts/test_api.py index 07fc7580..f38dd4cb 100644 --- a/scripts/test_api.py +++ b/scripts/test_api.py @@ -50,11 +50,12 @@ async def main(): # Example API key, replace with yours: API_KEY = "0566dbe21ee33bba9419549716cd6f1f" client = openai.AsyncOpenAI( - base_url=f"http://localhost:{PORT}/v1", + base_url=f"http://0.0.0.0:{PORT}/v1", max_retries=0, timeout=Timeout(90, connect=30, read=60), api_key=API_KEY, ) + client._client.headers["api-key"] = API_KEY response = await make_completion(client=client, prompt="Say 10 random numbers between 1 and 100", stream=True) print(response) From 00c03abf603f75a68715e2666da202bf54145991 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Tue, 7 Jan 2025 12:40:27 +0000 Subject: [PATCH 3/5] Remove unused settings params --- scripts/test_api.py | 2 +- shared/settings.py | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/scripts/test_api.py b/scripts/test_api.py index f38dd4cb..0d6a265b 100644 --- a/scripts/test_api.py +++ b/scripts/test_api.py @@ -30,7 +30,7 @@ async def make_completion(client: openai.AsyncOpenAI, prompt: str, stream: bool "do_sample": True, "seed": None, }, - "task": "QuestionAnsweringTask", + "task": "InferenceTask", "mixture": False, }, ) diff --git a/shared/settings.py b/shared/settings.py index 736d01ce..8f650099 100644 --- a/shared/settings.py +++ b/shared/settings.py @@ -28,9 +28,6 @@ class SharedSettings(BaseSettings): _instance: Optional["SharedSettings"] = None _instance_mode: Optional[str] = None - # API - VALIDATOR_API: str = Field("0.0.0.0:8094", env="VALIDATOR_API") - VALIDATOR_SCORING_KEY: str = Field("1234567890", env="VALIDATOR_SCORING_KEY") mode: Literal["api", "validator", "miner", "mock"] = Field("validator", env="MODE") MOCK: bool = False @@ -87,6 +84,10 @@ class SharedSettings(BaseSettings): HF_TOKEN: Optional[str] = Field(None, env="HF_TOKEN") DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR") + # ==== API ===== + # API key used to access validator organic scoring mechanism (both .env.validator and .env.api). + SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # Validator scoring API (.env.validator). DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API") SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT") @@ -96,12 +97,13 @@ class SharedSettings(BaseSettings): # API Management (.env.api). API_PORT: int = Field(8005, env="API_PORT") API_HOST: str = Field("0.0.0.0", env="API_HOST") + # Validator scoring API address. + VALIDATOR_API: str = Field("0.0.0.0:8094", env="VALIDATOR_API") # File with keys used to access API. API_KEYS_FILE: str = Field("api_keys.json", env="API_KEYS_FILE") # Admin key used to generate API keys. ADMIN_KEY: str | None = Field(None, env="ADMIN_KEY") - # API key used to access validator organic scoring mechanism. - SCORING_KEY: str | None = Field(None, env="SCORING_KEY") + # ============== # Additional Validator Fields. NETUID: Optional[int] = Field(61, env="NETUID") From 6b0413116220e5ffce63d06f785062f8462218fb Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Tue, 7 Jan 2025 12:40:52 +0000 Subject: [PATCH 4/5] Add scoring key into validator example --- .env.validator.example | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.env.validator.example b/.env.validator.example index 79bf81c4..0e1bee89 100644 --- a/.env.validator.example +++ b/.env.validator.example @@ -24,7 +24,9 @@ SN19_API_URL = "e.g. http://24.199.112.174:4051/" OPENAI_API_KEY = "your_openai_api_key_here" HF_TOKEN = "your_huggingface_token_here" -# Scoring API. +# Scoring API (optional). DEPLOY_SCORING_API = true SCORING_ADMIN_KEY = "123456" SCORING_API_PORT = 8094 +# Scoring key must match the scoring key in the .env.api. +# SCORING_KEY="..." From 267978966faf68f971dfa8733f3f16a69339f3b7 Mon Sep 17 00:00:00 2001 From: Dmytro Bobrenko <17252809+dbobrenko@users.noreply.github.com> Date: Tue, 7 Jan 2025 15:58:45 +0000 Subject: [PATCH 5/5] Revert loop removal --- neurons/validator.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index a3bf56cb..6062ec8f 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -46,21 +46,21 @@ async def spawn_loops(task_queue, scoring_queue, reward_events): # -------- Duplicate of create_task_loop ---------- logger.info("Starting AvailabilityCheckingLoop...") - await availability_checking_loop.start(name="AvailabilityCheckingLoop") + asyncio.create_task(availability_checking_loop.start()) logger.info("Starting TaskSender...") - await task_sender.start(task_queue, scoring_queue) + asyncio.create_task(task_sender.start(task_queue, scoring_queue)) logger.info("Starting TaskLoop...") - await task_loop.start(task_queue, scoring_queue) + asyncio.create_task(task_loop.start(task_queue, scoring_queue)) # ------------------------------------------------- logger.info("Starting ModelScheduler...") - await model_scheduler.start(scoring_queue, name="ModelScheduler") + asyncio.create_task(model_scheduler.start(scoring_queue), name="ModelScheduler"), logger.info("Starting TaskScorer...") - await task_scorer.start(scoring_queue, reward_events, name="TaskScorer") + asyncio.create_task(task_scorer.start(scoring_queue, reward_events), name="TaskScorer"), logger.info("Starting WeightSetter...") - await weight_setter.start(reward_events, name="WeightSetter") + asyncio.create_task(weight_setter.start(reward_events)) # Main monitoring loop start = time.time() @@ -122,7 +122,7 @@ async def main(): processes = [] try: - # Start checking the availability of miners at regular intervals + # # Start checking the availability of miners at regular intervals if shared_settings.DEPLOY_SCORING_API: # Use multiprocessing to bypass API blocking issue