Refactor pipeline parallelism for training #4050
Replies: 3 comments 4 replies
-
How to integrate with ShardFormerIssues to resolve
Process group managementAs process groups are used in other component (like pipeline schedule and optimizer), we'd better initialize them out of class ShardFormer:
def __init__(self, shard_config, pg_mesh, tp_axis=None, pp_stage_manager=None):
pass Model forward replacementWe can implement this by simply using Model initialization logic
class Policy:
def get_held_layers(self) -> List[Module]:
"""Get layers that should be held in current stage. This method should be implemented by subclass.
Returns:
List[Module]: List of layers that should be hold in current stage
"""
raise NotImplementedError
def get_shared_params(self) -> List[Dict[int, Tensor]]:
"""Get parameters that should be shared across stages. This method should be implemented by subclass.
Returns:
List[Dict[int, Tensor]]: List of parameters that should be shared across stages. E.g. [{0: module.model.embed_tokens.weight, 3: module.lm_head.weight}]
"""
raise NotImplementedError
def set_pipeline_stage_manager(self, pp_stage_manager):
pass And the class Sharder:
def shard(self) -> List[Dict[int, Tensor]]:
r"""
Shard the model according to the policy
"""
self.policy.set_model(self.model)
self.policy.set_shard_config(self.shard_config)
self.policy.set_pipeline_stage_manager(self.pp_stage_manager)
self._preprocess()
self._release_unheld_layers() # new
self._replace_model_class()
self._replace_module()
self._materialize_model() # new
self._postprocess()
return self.policy.get_shared_params() This method should return sharded params list, which would be used in pipeline schedule. The newly added |
Beta Was this translation helpful? Give feedback.
-
I have put the tensor parallel process group in the |
Beta Was this translation helpful? Give feedback.
-
The implementation of Policy, take Bert model for exampleIn the actual development, I came over some bad cases and thus I revised the mindmap and properties of the subclass of policy: in the case that the repeated layers can't be divided evenly to every stage, we should build new method. So I add 2 simple features:
Now the subclass of Policy will be like this : class BertModelPolicy(Policy):
def __init__(self, stage_manager: PipelineStageManager, num_layers: int, num_stages: int):
self.stage_manager = stage_manager
self.layers_per_stage = self.distribute_layers(num_layers, num_stages)
def get_hold_layers(self, module: BertModel) -> List[Module]:
def get_shared_params(self, module: BertModel) -> List[Dict[int, Tensor]]:
def replace_forward(self, module: Module) -> None:
def distribute_layers(self, num, stage_num) -> List[int]: The policy need num_layers and num_stages to initialize, and these are from model config and stage_manager(?) The method may be added into the Base class def distribute_layers(self, num, stage_num) -> List[int]: |
Beta Was this translation helpful? Give feedback.
-
Motivation
Old pipeline parallelism is deeply coupled with old
Engine
and trainer, which is not recommended any longer.We should refactor pipeline parallelism to fit new booster API.
Goal
To keep the implemention simple, we only focus on:
huggingface/transformers
modelsNote that this pipeline parallelism is not intended for LM generation.
Design
There are below main components:
Pipeline stage manager
Pipeline stage manager relies on process group mesh (described in #4038). It manages pipeline stages and process groups.
Pseudo-code of class:
Pipeline parallel policy
The policy handles:
The policy should be applied on top-level module, and it may have a method:
It receives a pipeline stage manager and determine which layers should be held. And then we get
hold_params
andhold_buffers
. Other params and buffers will be set toNone
. For LMs, there are few shared parameters. A typical one is tied embedding weight, which setslm_head.weight=input_embedding.weight
. This parameter is shared accross the first stage and the last stage, and it's gradient should be all-reduced after backward.For sake of simplicity, the new forward method should obey below rules:
For memory efficiency, the model partition should be coupled with ShardFormer.
P2P communication
P2P communication encapsulates basic communication methods of pipeline parallism.
Pseudo-code:
This can be implemented by various base communication methods, including isend/irecv, braodcast and rpc. We found broadcast is robust and efficient. However, this base class may be compatible with all theses base communication methods.
Pipeline schedule
The most codes can be reused based on old pipeline schedule.
Pseudo-code:
forward_backward_step()
method should return a dict with two keys"loss"
and"outputs"
.Pipeline plugin
It will compose all other core components. The most important methods of it is
execute_pipeline()
.Pseudo-cude:
Compatability with other parallel methods
no_sync()
andsync_grads()
methodsBeta Was this translation helpful? Give feedback.
All reactions