-
Notifications
You must be signed in to change notification settings - Fork 300
feat: Add linear CE loss fusion for DPO #2139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
78247a6
7ed04cc
d4c86ef
f7ac212
eaeb51b
dccf08e
ed54695
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| defaults: ../../dpo.yaml | ||
| dpo: | ||
| max_num_steps: 10 | ||
| checkpointing: | ||
| enabled: false | ||
| policy: | ||
| model_name: Qwen/Qwen2.5-Math-7B | ||
| tokenizer: | ||
| name: ${policy.model_name} | ||
| train_global_batch_size: 32 | ||
| train_micro_batch_size: 1 | ||
| max_total_sequence_length: 6000 | ||
| dtensor_cfg: | ||
| enabled: false | ||
| megatron_cfg: | ||
| enabled: true | ||
| use_linear_ce_fusion_loss: true | ||
| linear_ce_fusion_chunk_size: 128 | ||
| tensor_model_parallel_size: 4 | ||
| pipeline_model_parallel_size: 2 | ||
| attention_backend: unfused | ||
| freeze_moe_router: true | ||
| moe_router_bias_update_rate: 0.0 | ||
| moe_permute_fusion: true | ||
| optimizer: | ||
| lr: 1.0e-06 | ||
| min_lr: 1.0e-06 | ||
| adam_beta2: 0.999 | ||
| use_distributed_optimizer: false | ||
| use_precision_aware_optimizer: false | ||
| scheduler: | ||
| lr_warmup_iters: 10 | ||
| lr_warmup_init: 1.0e-11 | ||
| lr_decay_iters: 32 | ||
| make_sequence_length_divisible_by: 8 | ||
| data: | ||
| num_workers: 8 | ||
| logger: | ||
| wandb: | ||
| project: nemo-rl | ||
| name: dpo-qwen2.5-math-7b-megatron-chunked-linear-ce-loss-1n8g | ||
| tensorboard: | ||
| log_dir: tb_logs-dpo-qwen2.5-math-7b-megatron-chunked-linear-ce-loss-1n8g | ||
| mlflow: | ||
| run_name: dpo-qwen2.5-math-7b-megatron-chunked-linear-ce-loss-1n8g | ||
| cluster: | ||
| gpus_per_node: 8 |
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -400,9 +400,11 @@ def __init__( | |||
| self, | ||||
| cfg: PolicyConfig, | ||||
| sampling_params: Optional[TrainingSamplingParams] = None, | ||||
| use_linear_ce_fusion: bool = False, | ||||
| ): | ||||
| self.cfg = cfg | ||||
| self.sampling_params = sampling_params | ||||
| self.use_linear_ce_fusion = use_linear_ce_fusion | ||||
|
|
||||
| def __call__( | ||||
| self, | ||||
|
|
@@ -427,10 +429,13 @@ def __call__( | |||
| original_seq_length = unpacked_input_ids.shape[1] | ||||
|
|
||||
| def processor_fn_inner(output_tensor): | ||||
| tp_grp = get_tensor_model_parallel_group() | ||||
| tp_rank = get_tensor_model_parallel_rank() | ||||
| logprob_chunk_size = self.cfg.get("logprob_chunk_size", None) | ||||
| if self.cfg["sequence_packing"]["enabled"]: | ||||
| if self.use_linear_ce_fusion: | ||||
| token_logprobs = output_tensor.to(torch.float32) | ||||
| token_logprobs = token_logprobs[:, : original_seq_length - 1] | ||||
|
Comment on lines
+432
to
+434
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. am I understand correctly that
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @yuki-97 , thank you for your question! Nope, if I understand correctly, currently, DPO and sequence packing are mutually exclusive: Line 133 in b0da493
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh I see, thanks for pointing this. wdyt about adding an assert after that? so that if we support sequence packing in dpo, people can know we can't use sequence packing and loss fusion together. if master_config["policy"]["sequence_packing"]["enabled"]:
assert xxx # assert not using loss fusion
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds a great idea! Added this guardrail and we can remove it once DPO + sequence packing + linear fusion loss all 3 are compatible. |
||||
| elif self.cfg["sequence_packing"]["enabled"]: | ||||
| tp_grp = get_tensor_model_parallel_group() | ||||
| tp_rank = get_tensor_model_parallel_rank() | ||||
| logprob_chunk_size = self.cfg.get("logprob_chunk_size", None) | ||||
| token_logprobs = from_parallel_logits_to_logprobs_packed_sequences( | ||||
| output_tensor, | ||||
| target=input_ids, | ||||
|
|
@@ -445,6 +450,9 @@ def processor_fn_inner(output_tensor): | |||
| sampling_params=self.sampling_params, | ||||
| ) | ||||
| else: | ||||
| tp_grp = get_tensor_model_parallel_group() | ||||
| tp_rank = get_tensor_model_parallel_rank() | ||||
| logprob_chunk_size = self.cfg.get("logprob_chunk_size", None) | ||||
| token_logprobs = from_parallel_logits_to_logprobs( | ||||
| output_tensor, | ||||
| target=unpacked_input_ids, | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| #!/bin/bash | ||
| SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) | ||
| source $SCRIPT_DIR/common.env | ||
|
|
||
| # ===== BEGIN CONFIG ===== | ||
| NUM_NODES=1 | ||
| STEPS_PER_RUN=10 | ||
| MAX_STEPS=10 | ||
| NUM_RUNS=$(( (MAX_STEPS + STEPS_PER_RUN - 1) / STEPS_PER_RUN )) # Round up | ||
| NUM_MINUTES=25 | ||
| # ===== END CONFIG ===== | ||
|
|
||
| exit_if_max_steps_reached | ||
|
|
||
| # Run the experiment | ||
| cd $PROJECT_ROOT | ||
| uv run examples/run_dpo.py \ | ||
| --config $CONFIG_PATH \ | ||
| dpo.max_num_steps=$MAX_STEPS \ | ||
| logger.log_dir=$LOG_DIR \ | ||
| logger.wandb_enabled=True \ | ||
| logger.wandb.project=nemo-rl \ | ||
| logger.wandb.name=$EXP_NAME \ | ||
| logger.monitor_gpus=True \ | ||
| logger.tensorboard_enabled=True \ | ||
| checkpointing.enabled=true \ | ||
| checkpointing.checkpoint_dir=$CKPT_DIR \ | ||
| $@ \ | ||
| 2>&1 | tee $RUN_LOG | ||
|
|
||
| # Convert tensorboard logs to json | ||
| uv run tests/json_dump_tb_logs.py $LOG_DIR --output_path $JSON_METRICS | ||
|
|
||
| # Only run metrics if the target step is reached | ||
| if [[ $(jq 'to_entries | .[] | select(.key == "train/loss") | .value | keys | map(tonumber) | max' $JSON_METRICS) -ge $MAX_STEPS ]]; then | ||
| # Smoke checks: run completed and loss is finite/reasonable. | ||
| uv run tests/check_metrics.py $JSON_METRICS \ | ||
| 'data["train/loss"]["10"] > 0.0' \ | ||
| 'data["train/loss"]["10"] < 20.0' | ||
|
|
||
| # Clean up checkpoint directory after successful run to save space. | ||
| rm -rf "$CKPT_DIR" | ||
| fi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Show the backend switch in the YAML snippet.
examples/configs/dpo.yamlkeepspolicy.dtensor_cfg.enabled: trueby default, so copying only this block can leave both backends enabled. Please includepolicy.dtensor_cfg.enabled: falsehere, or call it out explicitly, so the enablement instructions switch to Megatron cleanly.✏️ Suggested doc fix
🤖 Prompt for AI Agents