-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
190 lines (157 loc) · 7.02 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import os
import deepspeed
import logging
import random
import numpy as np
import torch
from transformers import set_seed
from deepspeed import comm as dist
import time
from model_hook import * # 从model_hook.py中加载自定义的函数
# 定义日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
encoding='utf-8')
def log_dist(message: str, level: int = logging.INFO) -> None:
"""定义日志函数,只给特定rank的进程记录日志"""
my_rank = int(os.environ.get("RANK", "0"))
if my_rank % 8 == 0:
if level == logging.INFO:
logging.info(f"[rank{my_rank}] {message}")
if level == logging.ERROR:
logging.error(f"[rank{my_rank}] {message}")
if level == logging.DEBUG:
logging.debug(f"[rank{my_rank}] {message}")
def get_ds_model(args, dataloader_dict):
## 获取deepspeed配置
ds_config = get_ds_model(args)
## 加载模型
model = get_model_common(args)
## 计算模型参数量
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
log_dist(f"Finally total_params: {total_params} trainable_params: {trainable_params} ratio: {trainable_params/total_params if total_params > 0 else -1:.4%}")
## 获取自定义的优化器和学习率调度器
op_lr_dict = get_op_lr(args, model, dataloader_dict)
if op_lr_dict is None:
lr_scheduler = None
optimizer = None
else:
lr_scheduler = op_lr_dict.get("lr_scheduler", None)
optimizer = op_lr_dict.get("optimizer", None)
## 初始化deepspeed
model, _, _, lr_scheduler = deepspeed.initialize(
model=model,
lr_scheduler=lr_scheduler,
optimizer=optimizer,
model_parameters=filter(lambda p: p.requires_grad, model.parameters()),
config=ds_config
)
log_dist("deepspeed initialize finished.")
## 设置梯度检查点
if args.gradient_checkpointing:
model.gradient_checkpointing_enable()
return model
# 设置所有随机种子,保证运行结果可复现
def seed_all(seed):
if seed is not None:
set_seed(seed)
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
def save_hf_format(model, tokenizer, args, sub_folder=""):
"""
保存模型HuggingFace格式,以便后续可以用hf.from_pretrained加载
"""
model_to_save = model.module if hasattr(model, 'module') else model
output_dir=os.path.join(args.save_name, sub_folder)
os.makedirs(output_dir, exist_ok=True)
output_model_file = os.path.join(output_dir, "pytorch_model.bin")
output_config_file = os.path.join(output_dir, "config.json")
state_dict = model_to_save.state_dict()
config = model_to_save.config()
torch.save(state_dict, output_model_file) ### 保存模型权重:pytorch_model.bin
config.to_json_file(output_config_file) ### 保存config配置文件:config.json
tokenizer.save_pretrained(output_dir) ### 保存tokenizer
print("=========================================")
print(f"Model save at: {output_dir}")
print("=========================================")
def main():
## 解析命令行参数
args = parse_args()
if args.local_rank == 0:
## 创建保存模型的文件夹
os.makedirs(args.save_name, exist_ok=True)
## 设置所有随机种子,保证运行结果可复现
seed_all(args.seed)
## 初始化deepspeed分布式训练环境
if args.local_rank > -1 and torch.cuda.is_available():
## 如果是分布式训练,则使用cuda
torch.cuda.set_device(args.local_rank)
device = torch.device("cuda", args.local_rank)
print(f"local_rank={args.local_rank} device={device}")
deepspeed.init_distributed()
args.global_rank = dist.get_rank()
print(f"global rank: {args.global_rank} local rank: {args.local_rank}")
else:
## 如果不是分布式训练则使用cpu
device = torch.device("cpu")
## 加载dataloader,获取训练数据
dataloader_dict = get_dataloader_common(args)
## 加载模型
model = get_ds_model(args, dataloader_dict)
model.train() ## 设置为train模式
dataloader_dict["device"] = device
## 在训练开始前运行用户自定义函数
before_train(args, model, dataloader_dict)
if args.gradient_accumulation_steps >= 1:
args.log_steps = args.log_steps * args.gradient_accumulation_steps
args.save_steps = args.save_steps * args.gradient_accumulation_steps
for epoch in range(0, args.epochs):
dataloader_dict["sampler"].set_epoch(epoch) ### 为sampler设置epoch
train_dataloader = dataloader_dict["train_dataloader"]
st = time.time()
num_total_steps = len(train_dataloader)
for step, batch in enumerate(train_dataloader):
batch = {k: v.to(device) for k, v in batch.items()} ### 将batch中的数据转移到device上
outputs = model(use_cache=False, **batch) ### 向前计算
loss = outputs["loss"] ### 获取loss
model.backward(loss) ### 反向传播
model.step() ### deepspeed更新模型参数
## 每隔一定step打印一次日志
if step % args.log_steps == 0:
real_step = step
## 如果使用了梯度累积,则需要将step除以梯度累积步数
if args.gradient_accumulation_steps >= 1:
real_step = step / args.gradient_accumulation_steps
log_dist(f"epoch{epoch} step{int(real_step)}/{num_total_steps} loss: {loss:.4f}")
st = time.time() ### 重制计时器
## 每隔一定step保存一次模型
if step > 0 and args.save_steps > 0 and step % args.save_steps == 0:
## 保存模型
log_dist(f"save model at epoch {epoch} step {step}")
if args.global_rank == 0:
save_hf_format(
model, dataloader_dict['tokenizer'], args,
sub_folder=f"epoch{epoch}_step--{step}-hf"
)
## 在每个step结束时运行用户自定义函数
on_step_end(args, model, dataloader_dict, step, epoch, outputs)
## epoch结束时保存模型
log_dist(f"save model at end of epoch {epoch}")
if args.global_rank == 0:
save_hf_format(
model, dataloader_dict['tokenizer'], args,
sub_folder=f"epoch{epoch}_step--{step}-hf"
)
## 在每个epoch结束时运行用户自定义函数
on_epoch_end(args, model, dataloader_dict, epoch)
log_dist("Training finished")
# 在训练结束时用户运行自定义的函数
after_train(args, model, dataloader_dict)
if __name__ == "__main__":
main()