diff --git a/constants/__init__.py b/constants/__init__.py index d3501c6..2db6d72 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -33,7 +33,7 @@ # Project Constants. # --------------------------------- -__version__ = "2.7.1" +__version__ = "2.7.2" version_split = __version__.split(".") __spec_version__ = ( (1000 * int(version_split[0])) @@ -140,56 +140,12 @@ ), } -INSTRUCT_8B_BLOCK = 4_451_695 -IF_EVAL_V2_BLOCK = 4_523_592 +INSTRUCT_8B_TO_25_WEIGHT_BLOCK = 4_552_883 # Schedule of competitions by block. COMPETITION_SCHEDULE_BY_BLOCK: List[Tuple[int, List[Competition]]] = [ ( 0, - [ - Competition( - CompetitionId.B7_MULTI_CHOICE, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MULTI_CHOICE], - 1.0, - eval_tasks=[ - EvalTask( - name="SYNTHETIC_MMLU", - method_id=EvalMethodId.MULTIPLE_CHOICE, - dataset_id=DatasetId.SYNTHETIC_MMLU, - normalization_id=NormalizationId.NONE, - weight=0.85, - ), - EvalTask( - name="WORD_SORTING", - method_id=EvalMethodId.REFERENCE_LOSS, - dataset_id=DatasetId.WORD_SORTING, - normalization_id=NormalizationId.INVERSE_EXPONENTIAL, - normalization_kwargs={"ceiling": 40.0}, - weight=0.05, - ), - EvalTask( - name="FINEWEB", - method_id=EvalMethodId.TEXT_LOSS, - dataset_id=DatasetId.FINEWEB, - normalization_id=NormalizationId.INVERSE_EXPONENTIAL, - normalization_kwargs={"ceiling": 20.0}, - weight=0.05, - ), - EvalTask( - name="IF_EVAL_V1", - method_id=EvalMethodId.IF_EVAL, - dataset_id=DatasetId.SYNTHETIC_IF_EVAL, - normalization_id=NormalizationId.NONE, - dataset_kwargs={"if_eval_version": IfEvalVersion.V1}, - weight=0.05, - ), - ], - ), - ], - ), - ( - INSTRUCT_8B_BLOCK, [ Competition( CompetitionId.B7_MULTI_CHOICE, @@ -201,7 +157,7 @@ method_id=EvalMethodId.MULTIPLE_CHOICE, dataset_id=DatasetId.SYNTHETIC_MMLU, normalization_id=NormalizationId.NONE, - weight=0.8, + weight=0.75, ), EvalTask( name="WORD_SORTING", @@ -220,12 +176,12 @@ weight=0.1, ), EvalTask( - name="IF_EVAL_V1", + name="IF_EVAL_V2", method_id=EvalMethodId.IF_EVAL, dataset_id=DatasetId.SYNTHETIC_IF_EVAL, normalization_id=NormalizationId.NONE, - dataset_kwargs={"if_eval_version": IfEvalVersion.V1}, - weight=0.05, + dataset_kwargs={"if_eval_version": IfEvalVersion.V2}, + weight=0.1, ), ], ), @@ -239,7 +195,7 @@ method_id=EvalMethodId.MULTIPLE_CHOICE, dataset_id=DatasetId.SYNTHETIC_MMLU, normalization_id=NormalizationId.NONE, - weight=0.8, + weight=0.75, ), EvalTask( name="WORD_SORTING", @@ -258,24 +214,24 @@ weight=0.1, ), EvalTask( - name="IF_EVAL_V1", + name="IF_EVAL_V2", method_id=EvalMethodId.IF_EVAL, dataset_id=DatasetId.SYNTHETIC_IF_EVAL, normalization_id=NormalizationId.NONE, - dataset_kwargs={"if_eval_version": IfEvalVersion.V1}, - weight=0.05, + dataset_kwargs={"if_eval_version": IfEvalVersion.V2}, + weight=0.1, ), ], ), ], ), ( - IF_EVAL_V2_BLOCK, + INSTRUCT_8B_TO_25_WEIGHT_BLOCK, [ Competition( CompetitionId.B7_MULTI_CHOICE, MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MULTI_CHOICE], - 0.9, + 0.75, eval_tasks=[ EvalTask( name="SYNTHETIC_MMLU", @@ -313,7 +269,7 @@ Competition( CompetitionId.INSTRUCT_8B, MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.INSTRUCT_8B], - 0.1, + 0.25, eval_tasks=[ EvalTask( name="SYNTHETIC_MMLU", diff --git a/neurons/config.py b/neurons/config.py index 63e56df..f41828f 100644 --- a/neurons/config.py +++ b/neurons/config.py @@ -69,7 +69,7 @@ def validator_config(): help="Where to store downloaded models", ) parser.add_argument( - "--netuid", type=str, default=constants.SUBNET_UID, help="The subnet UID." + "--netuid", type=int, default=constants.SUBNET_UID, help="The subnet UID." ) parser.add_argument( "--do_sample", diff --git a/neurons/validator.py b/neurons/validator.py index b492ecb..aea4b1e 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -150,6 +150,7 @@ def __init__(self): # === Bittensor objects ==== self.wallet = bt.wallet(config=self.config) self.subtensor = bt.subtensor(config=self.config) + self.weights_subtensor = bt.subtensor(config=self.config) # If running on testnet, default to using finney for the dataset subtensor. if self.config.using_test_subtensor: self.dataset_subtensor = bt.subtensor() @@ -159,8 +160,9 @@ def __init__(self): torch.backends.cudnn.benchmark = True # Setup metagraph syncer for the subnet based on config. This is non-lite for getting weights by vali. + syncer_subtensor = bt.subtensor(config=self.config) self.subnet_metagraph_syncer = MetagraphSyncer( - self.subtensor, + syncer_subtensor, config={ self.config.netuid: dt.timedelta(minutes=20).total_seconds(), }, @@ -218,9 +220,9 @@ def __init__(self): self._new_wandb_run() # === Running args === + self.weight_lock = threading.RLock() self.weights = torch.zeros_like(torch.from_numpy(self.metagraph.S)) self.global_step = 0 - self.last_epoch = self.metagraph.block.item() self.uids_to_eval: typing.Dict[CompetitionId, typing.Set] = defaultdict(set) @@ -300,6 +302,22 @@ def __init__(self): f"Failed to load competition tracker state. Reason: {e}. Starting from scratch." ) + # Also update our internal weights based on the tracker. + cur_block = self._get_current_block() + + # Get the competition schedule for the current block. + # This is a list of competitions + competition_schedule: typing.List[Competition] = ( + competition_utils.get_competition_schedule_for_block( + block=cur_block, + schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + ) + with self.weight_lock: + self.weights = self.competition_tracker.get_subnet_weights( + competition_schedule + ) + # Initialize the UIDs to eval. if not os.path.exists(self.uids_filepath): logging.warning("No uids state file found. Starting from scratch.") @@ -337,8 +355,9 @@ def __init__(self): self.miner_iterator = MinerIterator(self.metagraph.uids.tolist()) # Setup a ModelMetadataStore + chain_store_subtensor = bt.subtensor(config=self.config) self.metadata_store = ChainModelMetadataStore( - subtensor=self.subtensor, + subtensor=chain_store_subtensor, subnet_uid=self.config.netuid, wallet=self.wallet, ) @@ -372,6 +391,11 @@ def __init__(self): ) self.clean_thread.start() + # == Initialize the weight setting thread == + if not self.config.offline: + self.weight_thread = threading.Thread(target=self.set_weights, daemon=True) + self.weight_thread.start() + def __del__(self): if hasattr(self, "stop_event"): self.stop_event.set() @@ -720,38 +744,72 @@ def clean_models(self): logging.info("Exiting clean models loop.") - async def try_set_weights(self, block: int, ttl: int): + def set_weights(self): + """Set weights on the chain regularly.""" + + # Check that we have some weights internally for startup situations. + all_zero_weights = True + while all_zero_weights is True: + # Technically returns a tensor but it evaluates to true. + with self.weight_lock: + all_zero_weights = torch.all(self.weights == 0) + logging.trace( + "Waiting 60 seconds for internal weights before continuing to try set weights." + ) + time.sleep(60) + + while not self.stop_event.is_set(): + try: + set_weights_success = False + while not set_weights_success: + set_weights_success, _ = asyncio.run(self.try_set_weights(ttl=60)) + # Wait for 60 seconds before we try to set weights again. + if set_weights_success: + logging.info("Successfully set weights.") + else: + time.sleep(60) + except Exception as e: + logging.error(f"Error in set weights: {e}") + + # Only set weights once every hour + time.sleep(60 * 60) + + logging.info("Exiting set weights loop.") + + async def try_set_weights(self, ttl: int) -> typing.Tuple[bool, str]: """Sets the weights on the chain with ttl, without raising exceptions if it times out.""" - async def _try_set_weights(): + async def _try_set_weights() -> typing.Tuple[bool, str]: with self.metagraph_lock: uids = self.metagraph.uids try: - weight_subtensor = bt.subtensor(config=self.config) - success, message = weight_subtensor.set_weights( + with self.weight_lock: + self.weights.nan_to_num(0.0) + weights_to_set = self.weights + + return self.weights_subtensor.set_weights( netuid=self.config.netuid, wallet=self.wallet, uids=uids, - weights=self.weights.numpy(), - wait_for_inclusion=False, + weights=weights_to_set.numpy(), + wait_for_inclusion=True, version_key=constants.weights_version_key, + max_retries=1, ) - if not success: - logging.warning( - f"Failed to set weights (will retry later): {message}" - ) - else: - # We only update the last epoch when we successfully set weights. - self.last_epoch = block - except: - logging.warning("Failed to set weights. Trying again later.") + except Exception as e: + logging.warning( + f"Failed to set weights due to {e}. Trying again later." + ) + return (False, str(e)) try: logging.debug(f"Setting weights.") - await asyncio.wait_for(_try_set_weights(), ttl) - logging.debug(f"Finished setting weights.") + status = await asyncio.wait_for(_try_set_weights(), ttl) + logging.debug(f"Finished setting weights with status: {status}.") + return status except asyncio.TimeoutError: logging.error(f"Failed to set weights after {ttl} seconds") + return (False, f"Timeout after {ttl} seconds") def _get_current_block(self) -> int: """Returns the current block.""" @@ -929,7 +987,8 @@ async def run_step(self): logging.info("Starting evaluation for competition: " + str(competition.id)) # If the competition's eval tasks have changed, make sure all models are re-evaluated. - self._maybe_reset_eval_history(competition) + # Commenting out for now. Churn from IfEval changes should not actual require a reset. + # self._maybe_reset_eval_history(competition) # Add uids with newly updated models to the upcoming batch of evaluations. with self.pending_uids_to_eval_lock: @@ -1139,10 +1198,11 @@ async def run_step(self): # Align competition_tracker to only track active competitions. self.competition_tracker.reset_competitions(active_competition_ids) # Update self.weights to the merged values across active competitions. - self.weights = self.competition_tracker.get_subnet_weights( - competitions=competition_schedule, - min_comp_weight_threshold=constants.MIN_WEIGHT_THRESHOLD, - ) + with self.weight_lock: + self.weights = self.competition_tracker.get_subnet_weights( + competitions=competition_schedule, + min_comp_weight_threshold=constants.MIN_WEIGHT_THRESHOLD, + ) # Prioritize models for keeping up to the sample_min for the next eval loop. # If the model has any significant weight, prioritize by weight with greater weights being kept first. @@ -1279,6 +1339,11 @@ def log_step( "uids": uids, "uid_data": {}, } + + # Get a copy of weights to print. + with self.weight_lock: + log_weights = self.weights + for uid in uids: step_log["uid_data"][str(uid)] = { "uid": uid, @@ -1293,7 +1358,10 @@ def log_step( ), "win_rate": win_rate[uid], "win_total": wins[uid], - "weight": self.weights[uid].item(), + "weight": log_weights[uid].item(), + "norm_weight": competition_weights[ + uid + ].item(), # Named norm_weight for leaderboard pipeline compatibilty. } for task in eval_tasks: step_log["uid_data"][str(uid)][f"{task.name}.raw_score"] = ( @@ -1325,7 +1393,7 @@ def log_step( str(round(step_log["uid_data"][str(uid)]["average_loss"], 4)), str(round(step_log["uid_data"][str(uid)]["epsilon_adv"], 4)), str(round(step_log["uid_data"][str(uid)]["win_rate"], 4)), - str(round(self.weights[uid].item(), 4)), + str(round(log_weights[uid].item(), 4)), str(round(competition_weights[uid].item(), 4)), str(step_log["uid_data"][str(uid)]["block"]), str(step_log["uid_data"][str(uid)]["competition_id"]), @@ -1335,7 +1403,7 @@ def log_step( console = Console() console.print(table) - ws, ui = self.weights.topk(len(self.weights)) + ws, ui = log_weights.topk(len(log_weights)) table = Table(title=f"Weights >= {constants.WEIGHT_SYNC_MINER_MIN_PERCENT}") table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("weight", style="magenta") @@ -1386,7 +1454,7 @@ def log_step( "win_total_data": { str(uid): uid_data[str(uid)]["win_total"] for uid in uids }, - "weight_data": {str(uid): self.weights[uid].item() for uid in uids}, + "weight_data": {str(uid): log_weights[uid].item() for uid in uids}, "competition_weight_data": { str(uid): competition_weights[uid].item() for uid in uids }, @@ -1476,24 +1544,9 @@ async def run(self): while True: try: - - # First run a step. await self.try_run_step(ttl=75 * 60) self.global_step += 1 - block = self._get_current_block() - - # Then check if we should set weights and do so if needed. - if not self.config.offline: - blocks_until_epoch = block - self.last_epoch - - if blocks_until_epoch >= self.config.blocks_per_epoch: - await self.try_set_weights(block=block, ttl=60) - else: - logging.debug( - f"{blocks_until_epoch} / {self.config.blocks_per_epoch} blocks until next epoch." - ) - except KeyboardInterrupt: logging.info( "KeyboardInterrupt caught, gracefully closing the wandb run..." diff --git a/requirements.txt b/requirements.txt index c65fdbf..10d1377 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -bittensor==8.4.3 +bittensor==8.5.1 huggingface_hub nltk numpy==2.0.2