Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PEP compliance changes #22

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[flake8]
# Intial set of rules
# Feel Free to add any new rule here with description of what it does.

# E203 Whitespace before ':'
# E266 Too many leading '#' for block comment
# E501 Line too long (82 > 79 characters)
# W503 Line break occurred before a binary operator
# F405 '<function>' may be undefined, or defined from star imports
# E402 module level import not at top of file
# E731 do not assign a lambda expression, use a def
# F821 undefined name 'get_ipython' --> from generated python files using nbconvert

ignore = E203, E266, E501, W503, F405, E402, E731, F821
max-line-length = 79
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
repos:
- repo: https://github.com/ambv/black
rev: stable
hooks:
- id: black
language_version: python3.6
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v1.2.3
hooks:
- id: flake8
32 changes: 32 additions & 0 deletions Contributing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Contribution Guidelines

## Steps to Contributing

Here are the basic steps to get started with your first contribution. Please reach out with any questions.
1. [Fork the repo](https://help.github.com/articles/fork-a-repo/) so you can make and test local changes.
1. Create a new branch for the issue. We suggest prefixing the branch with your username and then the master branch name: (e.g. username/bertonazureml)
1. Make code changes.
1. We use [pre-commit](https://pre-commit.com/) package to run our pre-commit hooks. We use black formatter and flake8 linting on each commit. In order to set up pre-commit on your machine, follow the steps here, please note that you only need to run these steps the first time you use pre-commit for this project.

* Update your conda environment, pre-commit is part of the yaml file or just do
```
$ pip install pre-commit
```
* Set up pre-commit by running following command, this will put pre-commit under your .git/hooks directory.
```
$ pre-commit install
```
```
$ git commit -m "message"
```
* Each time you commit, git will run the pre-commit hooks (black and flake8 for now) on any python files that are getting committed and are part of the git index. If black modifies/formats the file, or if flake8 finds any linting errors, the commit will not succeed. You will need to stage the file again if black changed the file, or fix the issues identified by flake8 and and stage it again.

* To run pre-commit on all files just run
```
$ pre-commit run --all-files
```
1. Create a pull request against <b>bertonazureml</b> branch.

Note: We use the bertonazureml branch to land all new features, so please remember to create the Pull Request against staging.

Once the features included in a milestone are complete we will merge staging into master and make a release.
60 changes: 41 additions & 19 deletions finetune/PyTorch/azureml_bert_util.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
# flake8: noqa
from horovod.torch.mpi_ops import allreduce, allreduce_async_, synchronize
from horovod.torch.compression import Compression
import horovod.torch as hvd
import torch
import time

from collections import OrderedDict
try:

try:
from apex_C import flatten
from apex_C import unflatten
except ImportError:
try:
_ = warned_flatten
except NameError:
print("Warning: apex was installed without --cpp_ext. Falling back to Python flatten and unflatten.")
print(
"Warning: apex was installed without --cpp_ext. Falling back to Python flatten and unflatten."
)
warned_flatten = True
from torch._utils import _flatten_dense_tensors as flatten
from torch._utils import _unflatten_dense_tensors as unflatten


def warmup_linear(x, warmup=0.002):
if x < warmup:
return x/warmup
return x / warmup
return 1.0 - x


def adjust_gradient_accumulation_steps(x, initial_steps, target_steps, warmup):
return min(max(int(x/warmup*target_steps), initial_steps), target_steps)
return min(
max(int(x / warmup * target_steps), initial_steps), target_steps
)


class DistributedCommunicator:
Expand All @@ -38,12 +44,15 @@ def __init__(self, accumulation_step=1):
self.node_count = self.world_size // self.n_gpu
self.accumulation_step = accumulation_step
self.count_down = accumulation_step - 1
self._multi_node = self.node_count > 1
self._multi_node = self.node_count > 1
if not self._multi_node:
# use PyTorch build-in NCCL backend for single node training
torch.distributed.init_process_group(backend='nccl', init_method='tcp://127.0.0.1:6000',
world_size=self.n_gpu, rank=self.local_rank)

torch.distributed.init_process_group(
backend="nccl",
init_method="tcp://127.0.0.1:6000",
world_size=self.n_gpu,
rank=self.local_rank,
)

def register_model(self, model, fp16):
# broadcast model parameters
Expand All @@ -54,12 +63,16 @@ def register_model(self, model, fp16):
torch.distributed.broadcast_multigpu([param], 0)

# register hook for reduce when backpropagate
self._parameter_names = {v: k for k, v in sorted(model.named_parameters())}
self._parameter_names = {
v: k for k, v in sorted(model.named_parameters())
}
self._handles = {}
self._requires_update = set()
self._grad_accs = []
self._grad = []
self._compression = hvd.Compression.fp16 if fp16 else hvd.Compression.none
self._compression = (
hvd.Compression.fp16 if fp16 else hvd.Compression.none
)
for p in model.parameters():
if p.requires_grad:
p.grad = p.data.new(p.size()).zero_()
Expand All @@ -69,26 +82,26 @@ def register_model(self, model, fp16):
grad_acc.register_hook(self._make_hook(p))
self._grad_accs.append(grad_acc)


def _allreduce_tensor(self, p):
assert p not in self._handles
assert not p.grad.requires_grad
tensor = p.grad
name = self._parameter_names.get(p)
if self._multi_node:
if self._multi_node:
tensor_compressed, ctx = self._compression.compress(tensor)
handle = allreduce_async_(tensor_compressed, average=True, name=name)
handle = allreduce_async_(
tensor_compressed, average=True, name=name
)
self._handles[p] = (handle, ctx)
else:
self._handles[p] = tensor


def _make_hook(self, p):
def hook(*ignore):
if self.count_down == 0:
self._allreduce_tensor(p)
return hook

return hook

def synchronize(self):
synced = False
Expand All @@ -101,7 +114,10 @@ def synchronize(self):
for p, value in self._handles.items():
handle, ctx = value
output = synchronize(handle)
p.grad.set_(self._compression.decompress(output, ctx) / self.accumulation_step)
p.grad.set_(
self._compression.decompress(output, ctx)
/ self.accumulation_step
)
else:
buckets = OrderedDict()
for tensor in self._handles.values():
Expand All @@ -111,9 +127,15 @@ def synchronize(self):
buckets[tp].append(tensor)
for tp in buckets:
bucket = buckets[tp]
coalesced = flatten(bucket) / self.world_size / self.accumulation_step
coalesced = (
flatten(bucket)
/ self.world_size
/ self.accumulation_step
)
torch.distributed.all_reduce_multigpu([coalesced])
for buf, synced in zip(bucket, unflatten(coalesced, bucket)):
for buf, synced in zip(
bucket, unflatten(coalesced, bucket)
):
buf.copy_(synced)
self._handles.clear()
synced = True
Expand All @@ -124,4 +146,4 @@ def synchronize(self):

def set_accumulation_step(self, accumulation_step):
self.accumulation_step = accumulation_step
self.count_down = self.accumulation_step - 1
self.count_down = self.accumulation_step - 1
Loading