forked from THUDM/slime
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrain.py
More file actions
107 lines (84 loc) · 4.18 KB
/
train.py
File metadata and controls
107 lines (84 loc) · 4.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import ray
from sglang.srt.constants import GPU_MEMORY_TYPE_KV_CACHE, GPU_MEMORY_TYPE_WEIGHTS
try:
from sglang.srt.constants import GPU_MEMORY_TYPE_CUDA_GRAPH
except ImportError:
GPU_MEMORY_TYPE_CUDA_GRAPH = None
from slime.ray.placement_group import create_placement_groups, create_rollout_manager, create_training_models
from slime.utils.arguments import parse_args
from slime.utils.wandb_utils import init_wandb_primary
def train(args):
# allocate the GPUs
pgs = create_placement_groups(args)
wandb_run_id = init_wandb_primary(args)
# create the rollout manager, with sglang engines inside.
# need to initialize rollout manager first to calculate num_rollout
rollout_manager, num_rollout_per_epoch = create_rollout_manager(args, pgs["rollout"], wandb_run_id=wandb_run_id)
# create the actor and critic models
actor_model, critic_model = create_training_models(args, pgs, rollout_manager, wandb_run_id=wandb_run_id)
if args.offload_rollout:
ray.get(rollout_manager.onload.remote(tags=[GPU_MEMORY_TYPE_WEIGHTS]))
# always update weight first so that sglang has the loaded weights from training.
actor_model.update_weights()
if args.offload_rollout:
if GPU_MEMORY_TYPE_CUDA_GRAPH is not None:
ray.get(rollout_manager.onload.remote(tags=[GPU_MEMORY_TYPE_CUDA_GRAPH]))
ray.get(rollout_manager.onload.remote(tags=[GPU_MEMORY_TYPE_KV_CACHE]))
# special case for eval-only
if args.num_rollout == 0 and args.eval_interval is not None:
ray.get(rollout_manager.eval.remote(rollout_id=0))
def offload_train():
if args.offload_train:
if args.use_critic:
critic_model.offload()
if rollout_id >= args.num_critic_only_steps:
actor_model.offload()
else:
actor_model.offload()
else:
actor_model.clear_memory()
def onload_rollout():
if args.offload_rollout:
ray.get(rollout_manager.onload.remote(tags=[GPU_MEMORY_TYPE_WEIGHTS]))
# train loop.
# note that for async training, one can change the position of the sync operation(ray.get).
for rollout_id in range(args.start_rollout_id, args.num_rollout):
# TODO extract the duplicated eval logic
if args.eval_interval is not None and rollout_id == 0:
ray.get(rollout_manager.eval.remote(rollout_id))
rollout_data_ref = ray.get(rollout_manager.generate.remote(rollout_id))
if args.offload_rollout:
ray.get(rollout_manager.offload.remote())
if args.use_critic:
critic_train_handle = critic_model.async_train(rollout_id, rollout_data_ref)
if rollout_id >= args.num_critic_only_steps:
ray.get(actor_model.async_train(rollout_id, rollout_data_ref))
ray.get(critic_train_handle)
else:
ray.get(actor_model.async_train(rollout_id, rollout_data_ref))
if args.save_interval is not None and (
(rollout_id + 1) % args.save_interval == 0
or (num_rollout_per_epoch is not None and (rollout_id + 1) % num_rollout_per_epoch == 0)
):
if (not args.use_critic) or (rollout_id >= args.num_critic_only_steps):
actor_model.save_model(rollout_id)
if args.use_critic:
critic_model.save_model(rollout_id)
if args.rollout_global_dataset:
ray.get(rollout_manager.save.remote(rollout_id))
offload_train()
onload_rollout()
actor_model.update_weights()
if args.offload_rollout:
if GPU_MEMORY_TYPE_CUDA_GRAPH is not None:
ray.get(rollout_manager.onload.remote(tags=[GPU_MEMORY_TYPE_CUDA_GRAPH]))
ray.get(rollout_manager.onload.remote(tags=[GPU_MEMORY_TYPE_KV_CACHE]))
if args.eval_interval is not None and (
(rollout_id + 1) % args.eval_interval == 0
or (num_rollout_per_epoch is not None and (rollout_id + 1) % num_rollout_per_epoch == 0)
):
ray.get(rollout_manager.eval.remote(rollout_id))
ray.get(rollout_manager.dispose.remote())
if __name__ == "__main__":
args = parse_args()
train(args)