Skip to content

Commit

Permalink
Update settings comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbobrenko committed Jan 7, 2025
2 parents 4ee7f1b + 4654d3a commit e235f0b
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10"]
python-version: ["3.10"]

steps:
- uses: actions/checkout@v3
Expand Down
13 changes: 7 additions & 6 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):

# -------- Duplicate of create_task_loop ----------
logger.info("Starting AvailabilityCheckingLoop...")
await availability_checking_loop.start()
await availability_checking_loop.start(name="AvailabilityCheckingLoop")

logger.info("Starting TaskSender...")
await task_sender.start(task_queue, scoring_queue)
Expand All @@ -60,7 +60,7 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
logger.info("Starting TaskScorer...")
await task_scorer.start(scoring_queue, reward_events, name="TaskScorer")
logger.info("Starting WeightSetter...")
await weight_setter.start(reward_events)
await weight_setter.start(reward_events, name="WeightSetter")

# Main monitoring loop
start = time.time()
Expand All @@ -81,11 +81,12 @@ async def spawn_loops(task_queue, scoring_queue, reward_events):
asyncio.run(spawn_loops(task_queue, scoring_queue, reward_events))


def start_api():
def start_api(scoring_queue, reward_events):
async def start():
from prompting.api.api import start_scoring_api # noqa: F401

await start_scoring_api()
await start_scoring_api(scoring_queue, reward_events)

while True:
await asyncio.sleep(10)
logger.debug("Running API...")
Expand Down Expand Up @@ -121,11 +122,11 @@ async def main():
processes = []

try:
# # Start checking the availability of miners at regular intervals
# Start checking the availability of miners at regular intervals

if shared_settings.DEPLOY_SCORING_API:
# Use multiprocessing to bypass API blocking issue
api_process = mp.Process(target=start_api, name="API_Process")
api_process = mp.Process(target=start_api, args=(scoring_queue, reward_events), name="API_Process")
api_process.start()
processes.append(api_process)

Expand Down
5 changes: 4 additions & 1 deletion prompting/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from prompting.api.miner_availabilities.api import router as miner_availabilities_router
from prompting.api.scoring.api import router as scoring_router
from prompting.rewards.scoring import task_scorer
from shared.settings import shared_settings

app = FastAPI()
Expand All @@ -17,7 +18,9 @@ def health():
return {"status": "healthy"}


async def start_scoring_api():
async def start_scoring_api(scoring_queue, reward_events):
task_scorer.scoring_queue = scoring_queue
task_scorer.reward_events = reward_events
logger.info(f"Starting Scoring API on https://0.0.0.0:{shared_settings.SCORING_API_PORT}")
uvicorn.run(
"prompting.api.api:app", host="0.0.0.0", port=shared_settings.SCORING_API_PORT, loop="asyncio", reload=False
Expand Down
72 changes: 53 additions & 19 deletions prompting/api/scoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from fastapi import APIRouter, Depends, Header, HTTPException, Request
from loguru import logger

from prompting.datasets.random_website import DDGDatasetEntry
from prompting.llms.model_zoo import ModelZoo
from prompting.rewards.scoring import task_scorer
from prompting.tasks.inference import InferenceTask
from prompting.tasks.web_retrieval import WebRetrievalTask
from shared.base import DatasetEntry
from shared.dendrite import DendriteResponseEvent
from shared.epistula import SynapseStreamResult
Expand Down Expand Up @@ -37,22 +39,54 @@ async def score_response(request: Request, api_key_data: dict = Depends(validate
uid = int(payload.get("uid"))
chunks = payload.get("chunks")
llm_model = ModelZoo.get_model_by_id(model) if (model := body.get("model")) else None
task_scorer.add_to_queue(
task=InferenceTask(
messages=[msg["content"] for msg in body.get("messages")],
llm_model=llm_model,
llm_model_id=body.get("model"),
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_params", {}),
),
response=DendriteResponseEvent(
uids=[uid],
stream_results=[SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])],
timeout=shared_settings.NEURON_TIMEOUT,
),
dataset_entry=DatasetEntry(),
block=shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)
logger.info("Organic tas appended to scoring queue")
task = body.get("task")
if task == "InferenceTask":
logger.info(f"Received Organic InferenceTask with body: {body}")
task_scorer.add_to_queue(
task=InferenceTask(
messages=[msg["content"] for msg in body.get("messages")],
llm_model=llm_model,
llm_model_id=body.get("model"),
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_params", {}),
),
response=DendriteResponseEvent(
uids=[uid],
stream_results=[
SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])
],
timeout=shared_settings.NEURON_TIMEOUT,
),
dataset_entry=DatasetEntry(),
block=shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)
elif task == "WebRetrievalTask":
logger.info(f"Received Organic WebRetrievalTask with body: {body}")
try:
search_term = body.get("messages")[0].get("content")
except Exception as ex:
logger.error(f"Failed to get search term from messages: {ex}, can't score WebRetrievalTask")
return

task_scorer.add_to_queue(
task=WebRetrievalTask(
messages=[msg["content"] for msg in body.get("messages")],
seed=int(body.get("seed", 0)),
sampling_params=body.get("sampling_params", {}),
query=search_term,
),
response=DendriteResponseEvent(
uids=[uid],
stream_results=[
SynapseStreamResult(accumulated_chunks=[chunk for chunk in chunks if chunk is not None])
],
timeout=shared_settings.NEURON_TIMEOUT,
),
dataset_entry=DDGDatasetEntry(search_term=search_term),
block=shared_settings.METAGRAPH.block,
step=-1,
task_id=str(uuid.uuid4()),
)
logger.info("Organic task appended to scoring queue")
4 changes: 2 additions & 2 deletions prompting/datasets/random_website.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

class DDGDatasetEntry(DatasetEntry):
search_term: str
website_url: str
website_content: str
website_url: str = None
website_content: str = None


class DDGDataset(BaseDataset):
Expand Down
4 changes: 2 additions & 2 deletions prompting/llms/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ class AsyncModelScheduler(AsyncLoopRunner):
interval: int = 14400
scoring_queue: list | None = None

async def start(self, scoring_queue: list):
async def start(self, scoring_queue: list, name: str | None = None):
self.scoring_queue = scoring_queue
return await super().start()
return await super().start(name=name)

async def initialise_loop(self):
model_manager.load_always_active_models()
Expand Down
6 changes: 3 additions & 3 deletions prompting/weight_setting/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def set_weights(
"weights": processed_weights.flatten(),
"raw_weights": str(list(weights.flatten())),
"averaged_weights": str(list(averaged_weights.flatten())),
"block": ttl_get_block(),
"block": ttl_get_block(subtensor=subtensor),
}
)
step_filename = "weights.csv"
Expand Down Expand Up @@ -146,7 +146,7 @@ class WeightSetter(AsyncLoopRunner):
class Config:
arbitrary_types_allowed = True

async def start(self, reward_events):
async def start(self, reward_events, name: str | None = None):
self.reward_events = reward_events
global PAST_WEIGHTS

Expand All @@ -159,7 +159,7 @@ async def start(self, reward_events):
PAST_WEIGHTS = []
except Exception as ex:
logger.error(f"Couldn't load weights from file: {ex}")
return await super().start()
return await super().start(name=name)

async def run_step(self):
await asyncio.sleep(0.01)
Expand Down
1 change: 0 additions & 1 deletion scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from loguru import logger

from shared.epistula import query_miners
from shared.settings import shared_settings

"""
This has assumed you have:
Expand Down
2 changes: 1 addition & 1 deletion shared/dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ def model_dump(self):

class DendriteResponseEvent(BaseModel):
uids: np.ndarray | list[float]
axons: list[str]
timeout: float
stream_results: list[SynapseStreamResult]
axons: list[str] = []
completions: list[str] = []
status_messages: list[str] = []
status_codes: list[int] = []
Expand Down
2 changes: 1 addition & 1 deletion shared/epistula.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async def merged_stream(responses: list[AsyncGenerator]):
logger.error(f"Error while streaming: {e}")


async def query_miners(uids, body: dict[str, Any]):
async def query_miners(uids, body: dict[str, Any]) -> list[SynapseStreamResult]:
try:
tasks = []
for uid in uids:
Expand Down
2 changes: 1 addition & 1 deletion shared/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def ttl_get_block(subtensor: bt.Subtensor | None = None) -> int:
efficiently reduces the workload on the blockchain interface.
Example:
current_block = ttl_get_block(self)
current_block = ttl_get_block(subtensor=subtensor)
Note: self here is the miner or validator instance
"""
Expand Down
12 changes: 8 additions & 4 deletions shared/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,23 @@ class SharedSettings(BaseSettings):
HF_TOKEN: Optional[str] = Field(None, env="HF_TOKEN")
DEPLOY_VALIDATOR: bool = Field(True, env="DEPLOY_VALDITAOR")

# Validator scoring API (.env.validator).
DEPLOY_SCORING_API: bool = Field(False, env="DEPLOY_SCORING_API")
SCORING_API_PORT: int = Field(8094, env="SCORING_API_PORT")
SCORING_ADMIN_KEY: str | None = Field(None, env="SCORING_ADMIN_KEY")
SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS")

# API Management (.env.api).
API_PORT: int = Field(8005, env="API_PORT")
API_HOST: str = Field("0.0.0.0", env="API_HOST")

# API Management.
# File with keys used to access API.
API_KEYS_FILE: str = Field("api_keys.json", env="API_KEYS_FILE")
# Admin key used to generate API keys.
ADMIN_KEY: str | None = Field(None, env="ADMIN_KEY")
# API key used to access validator organic scoring mechanism.
SCORING_KEY: str | None = Field(None, env="SCORING_KEY")
SCORE_ORGANICS: bool = Field(False, env="SCORE_ORGANICS")

# Additional Fields.
# Additional Validator Fields.
NETUID: Optional[int] = Field(61, env="NETUID")
TEST: bool = False
OPENAI_API_KEY: Optional[str] = Field(None, env="OPENAI_API_KEY")
Expand Down
3 changes: 2 additions & 1 deletion shared/uids.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def get_top_incentive_uids(k: int, vpermit_tao_limit: int) -> np.ndarray:
# Extract the top uids.
top_k_uids = [uid for uid, incentive in uid_incentive_pairs_sorted[:k]]

return np.array(top_k_uids).astype(int)
return list(np.array(top_k_uids).astype(int))
# return [int(k) for k in top_k_uids]


def get_uids(
Expand Down
Loading

0 comments on commit e235f0b

Please sign in to comment.