Skip to content

Commit

Permalink
Fix error for PT backend when pytorch.distributed is not available (#…
Browse files Browse the repository at this point in the history
…3652)

By default, `pytorch.distributed` is not enabled on macOS by default
(see
[`pytorch.distributed.is_available()`](https://pytorch.org/docs/stable/distributed.html#torch.distributed.is_available)).

In this case,
* Testing `pytorch.distributed.is_initialized()` will fail. To fix this
issue, `pytorch.distributed.is_available()` needs to be tested first.
* Distributed dataloader is not available.

This pull request allows passing the unittests while `distributed` is
disabled.

`USE_DISTRIBUTED=1` to manually enable distributed training /
dataloading on macOS. Set it to `0` to disable it on other platforms.
Maybe this behavior needs to be documented somewhere?

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
chazeon and pre-commit-ci[bot] authored Apr 8, 2024
1 parent ea98506 commit e71085c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
2 changes: 1 addition & 1 deletion deepmd/pt/entrypoints/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def prepare_trainer_input_single(
stat_file_path_single,
)

rank = dist.get_rank() if dist.is_initialized() else 0
rank = dist.get_rank() if dist.is_available() and dist.is_initialized() else 0
if not multi_task:
(
train_data,
Expand Down
2 changes: 1 addition & 1 deletion deepmd/pt/optimizer/LKF.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(
# the first param, because this helps with casting in load_state_dict
self._state = self.state[self._params[0]]
self._state.setdefault("kalman_lambda", kalman_lambda)
self.dist_init = dist.is_initialized()
self.dist_init = dist.is_available() and dist.is_initialized()
self.rank = dist.get_rank() if self.dist_init else 0
self.dindex = []
self.remainder = 0
Expand Down
50 changes: 39 additions & 11 deletions deepmd/pt/train/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ def __init__(
self.model_keys = (
list(model_params["model_dict"]) if self.multi_task else ["Default"]
)
self.rank = dist.get_rank() if dist.is_initialized() else 0
self.world_size = dist.get_world_size() if dist.is_initialized() else 1
self.rank = (
dist.get_rank() if dist.is_available() and dist.is_initialized() else 0
)
self.world_size = (
dist.get_world_size()
if dist.is_available() and dist.is_initialized()
else 1
)
self.num_model = len(self.model_keys)

# Iteration config
Expand Down Expand Up @@ -169,7 +175,9 @@ def get_dataloader_and_buffer(_data, _params):
_data,
sampler=_sampler,
batch_size=None,
num_workers=NUM_WORKERS, # setting to 0 diverges the behavior of its iterator; should be >=1
num_workers=NUM_WORKERS
if dist.is_available()
else 0, # setting to 0 diverges the behavior of its iterator; should be >=1
drop_last=False,
pin_memory=True,
)
Expand Down Expand Up @@ -607,7 +615,7 @@ def single_model_finetune(
if shared_links is not None:
self.wrapper.share_params(shared_links, resume=resuming or self.rank != 0)

if dist.is_initialized():
if dist.is_available() and dist.is_initialized():
torch.cuda.set_device(LOCAL_RANK)
# DDP will guarantee the model parameters are identical across all processes
self.wrapper = DDP(
Expand Down Expand Up @@ -673,7 +681,7 @@ def run(self):
record_file = f"Sample_rank_{self.rank}.txt"
fout1 = open(record_file, mode="w", buffering=1)
log.info("Start to train %d steps.", self.num_steps)
if dist.is_initialized():
if dist.is_available() and dist.is_initialized():
log.info(f"Rank: {dist.get_rank()}/{dist.get_world_size()}")
if self.enable_tensorboard:
from torch.utils.tensorboard import (
Expand Down Expand Up @@ -734,7 +742,11 @@ def step(_step_id, task_key="Default"):
elif self.opt_type == "LKF":
if isinstance(self.loss, EnergyStdLoss):
KFOptWrapper = KFOptimizerWrapper(
self.wrapper, self.optimizer, 24, 6, dist.is_initialized()
self.wrapper,
self.optimizer,
24,
6,
dist.is_available() and dist.is_initialized(),
)
pref_e = self.opt_param["kf_start_pref_e"] * (
self.opt_param["kf_limit_pref_e"]
Expand All @@ -753,7 +765,9 @@ def step(_step_id, task_key="Default"):
# [coord, atype, natoms, mapping, shift, nlist, box]
model_pred = {"energy": p_energy, "force": p_force}
module = (
self.wrapper.module if dist.is_initialized() else self.wrapper
self.wrapper.module
if dist.is_available() and dist.is_initialized()
else self.wrapper
)

def fake_model():
Expand All @@ -768,10 +782,16 @@ def fake_model():
)
elif isinstance(self.loss, DenoiseLoss):
KFOptWrapper = KFOptimizerWrapper(
self.wrapper, self.optimizer, 24, 6, dist.is_initialized()
self.wrapper,
self.optimizer,
24,
6,
dist.is_available() and dist.is_initialized(),
)
module = (
self.wrapper.module if dist.is_initialized() else self.wrapper
self.wrapper.module
if dist.is_available() and dist.is_initialized()
else self.wrapper
)
model_pred = KFOptWrapper.update_denoise_coord(
input_dict,
Expand Down Expand Up @@ -924,7 +944,11 @@ def log_loss_valid(_task_key="Default"):
# Handle the case if rank 0 aborted and re-assigned
self.latest_model = Path(self.save_ckpt + f"-{_step_id + 1}.pt")

module = self.wrapper.module if dist.is_initialized() else self.wrapper
module = (
self.wrapper.module
if dist.is_available() and dist.is_initialized()
else self.wrapper
)
self.save_model(self.latest_model, lr=cur_lr, step=_step_id)
log.info(f"Saved model to {self.latest_model}")
symlink_prefix_files(self.latest_model.stem, self.save_ckpt)
Expand Down Expand Up @@ -990,7 +1014,11 @@ def log_loss_valid(_task_key="Default"):
prof.stop()

def save_model(self, save_path, lr=0.0, step=0):
module = self.wrapper.module if dist.is_initialized() else self.wrapper
module = (
self.wrapper.module
if dist.is_available() and dist.is_initialized()
else self.wrapper
)
module.train_infos["lr"] = lr
module.train_infos["step"] = step
torch.save(
Expand Down
11 changes: 8 additions & 3 deletions deepmd/pt/utils/dataloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ def construct_dataset(system):

with Pool(
os.cpu_count()
// (int(os.environ["LOCAL_WORLD_SIZE"]) if dist.is_initialized() else 1)
// (
int(os.environ["LOCAL_WORLD_SIZE"])
if dist.is_available() and dist.is_initialized()
else 1
)
) as pool:
self.systems = pool.map(construct_dataset, systems)

Expand Down Expand Up @@ -127,7 +131,7 @@ def construct_dataset(system):
self.batch_sizes = batch_size * np.ones(len(systems), dtype=int)
assert len(self.systems) == len(self.batch_sizes)
for system, batch_size in zip(self.systems, self.batch_sizes):
if dist.is_initialized():
if dist.is_available() and dist.is_initialized():
system_sampler = DistributedSampler(system)
self.sampler_list.append(system_sampler)
else:
Expand All @@ -138,7 +142,8 @@ def construct_dataset(system):
num_workers=0, # Should be 0 to avoid too many threads forked
sampler=system_sampler,
collate_fn=collate_batch,
shuffle=(not dist.is_initialized()) and shuffle,
shuffle=(not (dist.is_available() and dist.is_initialized()))
and shuffle,
)
self.dataloaders.append(system_dataloader)
self.index.append(len(system_dataloader))
Expand Down

0 comments on commit e71085c

Please sign in to comment.