diff --git a/xuance/common/common_tools.py b/xuance/common/common_tools.py index b0decf18..564b1f73 100644 --- a/xuance/common/common_tools.py +++ b/xuance/common/common_tools.py @@ -163,17 +163,20 @@ def get_runner(method, else: device = args.device distributed_training = True if args.distributed_training else False - if distributed_training: - rank = int(os.environ['RANK']) - num_gpus = int(os.environ['WORLD_SIZE']) - if rank == 0: - if num_gpus > 1: - print(f"Calculating devices: {num_gpus} visible GPUs for distributed training.") - else: - print(f"Calculating device: {num_gpus} visible GPU for distributed training.") - else: - rank = 0 - print(f"Calculating device: {device}") + # if distributed_training: + # rank = int(os.environ['RANK']) + # num_gpus = int(os.environ['WORLD_SIZE']) + # if rank == 0: + # if num_gpus > 1: + # print(f"Calculating devices: {num_gpus} visible GPUs for distributed training.") + # else: + # print(f"Calculating device: {num_gpus} visible GPU for distributed training.") + # else: + # rank = 0 + # print(f"Calculating device: {device}") + + rank = 0 + print(f"Calculating device: {device}") dl_toolbox = args[0].dl_toolbox if type(args) == list else args.dl_toolbox if dl_toolbox == "torch": diff --git a/xuance/environment/__init__.py b/xuance/environment/__init__.py index 0893c9d2..9e8b1264 100644 --- a/xuance/environment/__init__.py +++ b/xuance/environment/__init__.py @@ -23,7 +23,8 @@ def _thunk(env_seed: int = None): raise AttributeError(f"The environment named {config.env_name} cannot be created.") if config.distributed_training: - rank = int(os.environ['RANK']) + # rank = int(os.environ['RANK']) # for torch.nn.parallel.DistributedDataParallel + rank = 1 config.env_seed += rank * config.parallels if config.vectorize in REGISTRY_VEC_ENV.keys(): diff --git a/xuance/tensorflow/agents/base/agent.py b/xuance/tensorflow/agents/base/agent.py index 776cf6e8..e2256297 100644 --- a/xuance/tensorflow/agents/base/agent.py +++ b/xuance/tensorflow/agents/base/agent.py @@ -28,6 +28,7 @@ def __init__(self, self.config = config self.use_rnn = config.use_rnn if hasattr(config, "use_rnn") else False self.use_actions_mask = config.use_actions_mask if hasattr(config, "use_actions_mask") else False + self.distributed_training = config.distributed_training self.gamma = config.gamma self.start_training = config.start_training if hasattr(config, "start_training") else 1 diff --git a/xuance/tensorflow/agents/qlearning_family/dqn_agent.py b/xuance/tensorflow/agents/qlearning_family/dqn_agent.py index b1c204c9..db6a985c 100644 --- a/xuance/tensorflow/agents/qlearning_family/dqn_agent.py +++ b/xuance/tensorflow/agents/qlearning_family/dqn_agent.py @@ -38,7 +38,7 @@ def _build_policy(self) -> Module: policy = REGISTRY_Policy["Basic_Q_network"]( action_space=self.action_space, representation=representation, hidden_size=self.config.q_hidden_size, normalize=normalize_fn, initialize=initializer, activation=activation, - use_distributed_training=self.config.use_distributed_training, gamma=self.config) + use_distributed_training=self.distributed_training) else: raise AttributeError(f"{self.config.agent} does not support the policy named {self.config.policy}.") diff --git a/xuance/tensorflow/learners/learner.py b/xuance/tensorflow/learners/learner.py index bb44ff93..d981e885 100644 --- a/xuance/tensorflow/learners/learner.py +++ b/xuance/tensorflow/learners/learner.py @@ -15,6 +15,7 @@ def __init__(self, self.os_name = platform.platform() self.value_normalizer = None self.config = config + self.distributed_training = config.distributed_training self.episode_length = config.episode_length self.use_rnn = config.use_rnn if hasattr(config, 'use_rnn') else False diff --git a/xuance/tensorflow/learners/qlearning_family/dqn_learner.py b/xuance/tensorflow/learners/qlearning_family/dqn_learner.py index 37afc3be..85c9ed2d 100644 --- a/xuance/tensorflow/learners/qlearning_family/dqn_learner.py +++ b/xuance/tensorflow/learners/qlearning_family/dqn_learner.py @@ -15,41 +15,57 @@ def __init__(self, policy: Module): super(DQN_Learner, self).__init__(config, policy) if ("macOS" in self.os_name) and ("arm" in self.os_name): # For macOS with Apple's M-series chips. - self.optimizer = tk.optimizers.legacy.Adam(config.learning_rate) + if self.distributed_training: + with self.policy.mirrored_strategy.scope(): + self.optimizer = tk.optimizers.legacy.Adam(config.learning_rate) + else: + self.optimizer = tk.optimizers.legacy.Adam(config.learning_rate) else: - self.optimizer = tk.optimizers.Adam(config.learning_rate) + if self.distributed_training: + with self.policy.mirrored_strategy.scope(): + self.optimizer = tk.optimizers.Adam(config.learning_rate) + else: + self.optimizer = tk.optimizers.Adam(config.learning_rate) self.gamma = config.gamma self.sync_frequency = config.sync_frequency self.n_actions = self.policy.action_dim @tf.function - def learn(self, obs_batch, act_batch, next_batch, rew_batch, ter_batch): - with self.policy.mirrored_strategy.scope(): - with tf.GradientTape() as tape: - _, _, evalQ = self.policy(obs_batch) - _, _, targetQ = self.policy.target(next_batch) - targetQ = tf.math.reduce_max(targetQ, axis=-1) - targetQ = rew_batch + self.gamma * (1 - ter_batch) * targetQ - targetQ = tf.stop_gradient(targetQ) + def forward_fn(self, obs_batch, act_batch, next_batch, rew_batch, ter_batch): + with tf.GradientTape() as tape: + _, _, evalQ = self.policy(obs_batch) + _, _, targetQ = self.policy.target(next_batch) + targetQ = tf.math.reduce_max(targetQ, axis=-1) + targetQ = rew_batch + self.gamma * (1 - ter_batch) * targetQ + targetQ = tf.stop_gradient(targetQ) - predictQ = tf.math.reduce_sum(evalQ * tf.one_hot(act_batch, evalQ.shape[1]), axis=-1) + predictQ = tf.math.reduce_sum(evalQ * tf.one_hot(act_batch, evalQ.shape[1]), axis=-1) - loss = tk.losses.mean_squared_error(targetQ, predictQ) - gradients = tape.gradient(loss, self.policy.trainable_variables) - if self.use_grad_clip: - self.optimizer.apply_gradients([ - (tf.clip_by_norm(grad, self.grad_clip_norm), var) - for (grad, var) in zip(gradients, self.policy.trainable_variables) - if grad is not None - ]) - else: - self.optimizer.apply_gradients([ - (grad, var) - for (grad, var) in zip(gradients, self.policy.trainable_variables) - if grad is not None - ]) + loss = tk.losses.mean_squared_error(targetQ, predictQ) + gradients = tape.gradient(loss, self.policy.trainable_variables) + if self.use_grad_clip: + self.optimizer.apply_gradients([ + (tf.clip_by_norm(grad, self.grad_clip_norm), var) + for (grad, var) in zip(gradients, self.policy.trainable_variables) + if grad is not None + ]) + else: + self.optimizer.apply_gradients([ + (grad, var) + for (grad, var) in zip(gradients, self.policy.trainable_variables) + if grad is not None + ]) return predictQ, loss + @tf.function + def learn(self, *inputs): + if self.distributed_training: + predictQ, loss = self.policy.mirrored_strategy.run(self.forward_fn, args=inputs) + return predictQ, self.policy.mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, loss, axis=None) + else: + predictQ, loss = self.forward_fn(inputs) + return predictQ, loss + def update(self, **samples): self.iterations += 1 obs_batch = samples['obs']