From dd563b2aa08b0396b00e9cccc3647e4957542eb7 Mon Sep 17 00:00:00 2001 From: Sid Date: Wed, 18 Dec 2024 17:23:36 -0800 Subject: [PATCH 1/7] Set weights on a separate thread. --- neurons/validator.py | 123 ++++++++++++++++++++++++++++--------------- 1 file changed, 82 insertions(+), 41 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index b911eea..85465c9 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -218,9 +218,9 @@ def __init__(self): self._new_wandb_run() # === Running args === + self.weight_lock = threading.RLock() self.weights = torch.zeros_like(torch.from_numpy(self.metagraph.S)) self.global_step = 0 - self.last_epoch = self.metagraph.block.item() self.uids_to_eval: typing.Dict[CompetitionId, typing.Set] = defaultdict(set) @@ -300,6 +300,22 @@ def __init__(self): f"Failed to load competition tracker state. Reason: {e}. Starting from scratch." ) + # Also update our internal weights based on the tracker. + cur_block = self._get_current_block() + + # Get the competition schedule for the current block. + # This is a list of competitions + competition_schedule: typing.List[Competition] = ( + competition_utils.get_competition_schedule_for_block( + block=cur_block, + schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + ) + with self.weight_lock: + self.weights = self.competition_tracker.get_subnet_weights( + competition_schedule + ) + # Initialize the UIDs to eval. if not os.path.exists(self.uids_filepath): logging.warning("No uids state file found. Starting from scratch.") @@ -720,38 +736,72 @@ def clean_models(self): logging.info("Exiting clean models loop.") - async def try_set_weights(self, block: int, ttl: int): + def set_weights(self): + """Set weights on the chain regularly.""" + + # Check that we have some weights internally for startup situations. + all_zero_weights = True + while all_zero_weights is True: + # Technically returns a tensor but it evaluates to true. + with self.weight_lock: + all_zero_weights = torch.all(self.weights == 0) + bt.logging.trace( + "Waiting 60 seconds for internal weights before continuing to try set weights." + ) + time.sleep(60) + + while not self.stop_event.is_set(): + try: + set_weights_success = False + while not set_weights_success: + set_weights_success, _ = asyncio.run(self.try_set_weights(ttl=60)) + # Wait for 60 seconds before we try to set weights again. + if set_weights_success: + bt.logging.info("Successfully set weights.") + else: + time.sleep(60) + except Exception as e: + bt.logging.error(f"Error in set weights: {e}") + + # Only try at most once every 20 minutes + time.sleep(60 * 20) + + bt.logging.info("Exiting set weights loop.") + + async def try_set_weights(self, ttl: int) -> typing.Tuple[bool, str]: """Sets the weights on the chain with ttl, without raising exceptions if it times out.""" - async def _try_set_weights(): + async def _try_set_weights() -> typing.Tuple[bool, str]: with self.metagraph_lock: uids = self.metagraph.uids try: - weight_subtensor = bt.subtensor(config=self.config) - success, message = weight_subtensor.set_weights( + with self.weight_lock: + self.weights.nan_to_num(0.0) + weights_to_set = self.weights + + return self.subtensor.set_weights( netuid=self.config.netuid, wallet=self.wallet, uids=uids, - weights=self.weights.numpy(), + weights=weights_to_set.numpy(), wait_for_inclusion=False, version_key=constants.weights_version_key, + max_retries=1, ) - if not success: - logging.warning( - f"Failed to set weights (will retry later): {message}" - ) - else: - # We only update the last epoch when we successfully set weights. - self.last_epoch = block - except: - logging.warning("Failed to set weights. Trying again later.") + except Exception as e: + bt.logging.warning( + f"Failed to set weights due to {e}. Trying again later." + ) + return (False, str(e)) try: logging.debug(f"Setting weights.") - await asyncio.wait_for(_try_set_weights(), ttl) - logging.debug(f"Finished setting weights.") + status = await asyncio.wait_for(_try_set_weights(), ttl) + bt.logging.debug(f"Finished setting weights with status: {status}.") + return status except asyncio.TimeoutError: - logging.error(f"Failed to set weights after {ttl} seconds") + bt.logging.error(f"Failed to set weights after {ttl} seconds") + return (False, f"Timeout after {ttl} seconds") def _get_current_block(self) -> int: """Returns the current block.""" @@ -1140,10 +1190,11 @@ async def run_step(self): # Align competition_tracker to only track active competitions. self.competition_tracker.reset_competitions(active_competition_ids) # Update self.weights to the merged values across active competitions. - self.weights = self.competition_tracker.get_subnet_weights( - competitions=competition_schedule, - min_comp_weight_threshold=constants.MIN_WEIGHT_THRESHOLD, - ) + with self.weight_lock: + self.weights = self.competition_tracker.get_subnet_weights( + competitions=competition_schedule, + min_comp_weight_threshold=constants.MIN_WEIGHT_THRESHOLD, + ) # Prioritize models for keeping up to the sample_min for the next eval loop. # If the model has any significant weight, prioritize by weight with greater weights being kept first. @@ -1280,6 +1331,11 @@ def log_step( "uids": uids, "uid_data": {}, } + + # Get a copy of weights to print. + with self.weight_lock: + log_weights = self.weights + for uid in uids: step_log["uid_data"][str(uid)] = { "uid": uid, @@ -1294,7 +1350,7 @@ def log_step( ), "win_rate": win_rate[uid], "win_total": wins[uid], - "weight": self.weights[uid].item(), + "weight": log_weights[uid].item(), "norm_weight": competition_weights[ uid ].item(), # Named norm_weight for leaderboard pipeline compatibilty. @@ -1329,7 +1385,7 @@ def log_step( str(round(step_log["uid_data"][str(uid)]["average_loss"], 4)), str(round(step_log["uid_data"][str(uid)]["epsilon_adv"], 4)), str(round(step_log["uid_data"][str(uid)]["win_rate"], 4)), - str(round(self.weights[uid].item(), 4)), + str(round(log_weights[uid].item(), 4)), str(round(competition_weights[uid].item(), 4)), str(step_log["uid_data"][str(uid)]["block"]), str(step_log["uid_data"][str(uid)]["competition_id"]), @@ -1339,7 +1395,7 @@ def log_step( console = Console() console.print(table) - ws, ui = self.weights.topk(len(self.weights)) + ws, ui = log_weights.topk(len(log_weights)) table = Table(title=f"Weights >= {constants.WEIGHT_SYNC_MINER_MIN_PERCENT}") table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("weight", style="magenta") @@ -1390,7 +1446,7 @@ def log_step( "win_total_data": { str(uid): uid_data[str(uid)]["win_total"] for uid in uids }, - "weight_data": {str(uid): self.weights[uid].item() for uid in uids}, + "weight_data": {str(uid): log_weights[uid].item() for uid in uids}, "competition_weight_data": { str(uid): competition_weights[uid].item() for uid in uids }, @@ -1480,24 +1536,9 @@ async def run(self): while True: try: - - # First run a step. await self.try_run_step(ttl=75 * 60) self.global_step += 1 - block = self._get_current_block() - - # Then check if we should set weights and do so if needed. - if not self.config.offline: - blocks_until_epoch = block - self.last_epoch - - if blocks_until_epoch >= self.config.blocks_per_epoch: - await self.try_set_weights(block=block, ttl=60) - else: - logging.debug( - f"{blocks_until_epoch} / {self.config.blocks_per_epoch} blocks until next epoch." - ) - except KeyboardInterrupt: logging.info( "KeyboardInterrupt caught, gracefully closing the wandb run..." From 7f2dbd119aaf52d89cf2d54c93555b2004942ec1 Mon Sep 17 00:00:00 2001 From: Sid Date: Sat, 21 Dec 2024 08:29:37 -0800 Subject: [PATCH 2/7] Actually start the thread (if not offline). --- neurons/validator.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/neurons/validator.py b/neurons/validator.py index 85465c9..03d0b9d 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -388,6 +388,11 @@ def __init__(self): ) self.clean_thread.start() + # == Initialize the weight setting thread == + if not self.config.offline: + self.weight_thread = threading.Thread(target=self.set_weights, daemon=True) + self.weight_thread.start() + def __del__(self): if hasattr(self, "stop_event"): self.stop_event.set() From 528a01460b23898c01fb098b1a0b36568f6bd239 Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Sat, 21 Dec 2024 09:36:41 -0800 Subject: [PATCH 3/7] Give every thread its own subtensor --- neurons/validator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 03d0b9d..b9a2f76 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -159,8 +159,9 @@ def __init__(self): torch.backends.cudnn.benchmark = True # Setup metagraph syncer for the subnet based on config. This is non-lite for getting weights by vali. + syncer_subtensor = bt.subtensor(config=self.config) self.subnet_metagraph_syncer = MetagraphSyncer( - self.subtensor, + syncer_subtensor, config={ self.config.netuid: dt.timedelta(minutes=20).total_seconds(), }, @@ -353,8 +354,9 @@ def __init__(self): self.miner_iterator = MinerIterator(self.metagraph.uids.tolist()) # Setup a ModelMetadataStore + chain_store_subtensor = bt.subtensor(config=self.config) self.metadata_store = ChainModelMetadataStore( - subtensor=self.subtensor, + subtensor=chain_store_subtensor, subnet_uid=self.config.netuid, wallet=self.wallet, ) From 9e903575bae5968b76ef3cae150e2c0724646137 Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Sun, 22 Dec 2024 10:13:13 -0800 Subject: [PATCH 4/7] Use taoverse logger --- neurons/validator.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index b9a2f76..a06cb09 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -752,7 +752,7 @@ def set_weights(self): # Technically returns a tensor but it evaluates to true. with self.weight_lock: all_zero_weights = torch.all(self.weights == 0) - bt.logging.trace( + logging.trace( "Waiting 60 seconds for internal weights before continuing to try set weights." ) time.sleep(60) @@ -764,16 +764,16 @@ def set_weights(self): set_weights_success, _ = asyncio.run(self.try_set_weights(ttl=60)) # Wait for 60 seconds before we try to set weights again. if set_weights_success: - bt.logging.info("Successfully set weights.") + logging.info("Successfully set weights.") else: time.sleep(60) except Exception as e: - bt.logging.error(f"Error in set weights: {e}") + logging.error(f"Error in set weights: {e}") # Only try at most once every 20 minutes time.sleep(60 * 20) - bt.logging.info("Exiting set weights loop.") + logging.info("Exiting set weights loop.") async def try_set_weights(self, ttl: int) -> typing.Tuple[bool, str]: """Sets the weights on the chain with ttl, without raising exceptions if it times out.""" @@ -796,7 +796,7 @@ async def _try_set_weights() -> typing.Tuple[bool, str]: max_retries=1, ) except Exception as e: - bt.logging.warning( + logging.warning( f"Failed to set weights due to {e}. Trying again later." ) return (False, str(e)) @@ -804,10 +804,10 @@ async def _try_set_weights() -> typing.Tuple[bool, str]: try: logging.debug(f"Setting weights.") status = await asyncio.wait_for(_try_set_weights(), ttl) - bt.logging.debug(f"Finished setting weights with status: {status}.") + logging.debug(f"Finished setting weights with status: {status}.") return status except asyncio.TimeoutError: - bt.logging.error(f"Failed to set weights after {ttl} seconds") + logging.error(f"Failed to set weights after {ttl} seconds") return (False, f"Timeout after {ttl} seconds") def _get_current_block(self) -> int: From b97c8072eb233cb72c6889d38c7181b0bc0f7ee7 Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Sun, 22 Dec 2024 10:24:26 -0800 Subject: [PATCH 5/7] Use a separate subtensor for weight setting --- neurons/validator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neurons/validator.py b/neurons/validator.py index a06cb09..27ae894 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -150,6 +150,7 @@ def __init__(self): # === Bittensor objects ==== self.wallet = bt.wallet(config=self.config) self.subtensor = bt.subtensor(config=self.config) + self.weights_subtensor = bt.subtensor(config=self.config) # If running on testnet, default to using finney for the dataset subtensor. if self.config.using_test_subtensor: self.dataset_subtensor = bt.subtensor() @@ -786,7 +787,7 @@ async def _try_set_weights() -> typing.Tuple[bool, str]: self.weights.nan_to_num(0.0) weights_to_set = self.weights - return self.subtensor.set_weights( + return self.weights_subtensor.set_weights( netuid=self.config.netuid, wallet=self.wallet, uids=uids, From 034c56f52e27af0934240f3f05193ca38e42dbea Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Sun, 22 Dec 2024 10:33:24 -0800 Subject: [PATCH 6/7] Set weights once an hour --- neurons/validator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 27ae894..d4257bf 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -771,8 +771,8 @@ def set_weights(self): except Exception as e: logging.error(f"Error in set weights: {e}") - # Only try at most once every 20 minutes - time.sleep(60 * 20) + # Only set weights once every hour + time.sleep(60 * 60) logging.info("Exiting set weights loop.") From 6cf6436bb528d0105f6ac148b963bc29d07168c0 Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Sun, 22 Dec 2024 12:05:34 -0800 Subject: [PATCH 7/7] Wait for inclusion --- neurons/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neurons/validator.py b/neurons/validator.py index d4257bf..aea4b1e 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -792,7 +792,7 @@ async def _try_set_weights() -> typing.Tuple[bool, str]: wallet=self.wallet, uids=uids, weights=weights_to_set.numpy(), - wait_for_inclusion=False, + wait_for_inclusion=True, version_key=constants.weights_version_key, max_retries=1, )