Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,8 @@ environment_variables: dict[str, Callable[[], Any]] = {

# Worker process health check timeout when waiting for responses in seconds (default: 30)
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),

# Whether to use PD REORDER, can set 0 or 1
"ENABLE_PD_REORDER": lambda: int(os.getenv("ENABLE_PD_REORDER", "0")),
}
```
3 changes: 3 additions & 0 deletions docs/zh/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,8 @@ environment_variables: dict[str, Callable[[], Any]] = {

# Worker 进程响应等待时的健康检查超时时间(秒),默认 30 秒
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),

# 是否启用 PD REORDER,可以设置为 0 或 1
"ENABLE_PD_REORDER": lambda: int(os.getenv("ENABLE_PD_REORDER", "0")),
}
```
2 changes: 2 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@
"FD_WORKER_ALIVE_TIMEOUT": lambda: int(os.getenv("FD_WORKER_ALIVE_TIMEOUT", "30")),
# File path for file storage backend
"FILE_BACKEND_STORAGE_DIR": lambda: str(os.getenv("FILE_BACKEND_STORAGE_DIR", "/tmp/fastdeploy")),
# Whether to use PD REORDER, can set 0 or 1
"ENABLE_PD_REORDER": lambda: int(os.getenv("ENABLE_PD_REORDER", "0")),
}


Expand Down
20 changes: 13 additions & 7 deletions fastdeploy/model_executor/pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,21 +445,27 @@ def save_output_normal(
)
async_output_queue.put(output)
else:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx"],
)
if sampler_output.logprobs_tensors is None:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx", "sampled_token_ids"],
)
save_output(
share_inputs["sampled_token_ids"],
recover_share_inputs_map["sampled_token_ids"],
model_output.not_need_stop,
recover_share_inputs_map["last_preempted_idx"],
model_output.mp_rank,
save_each_rank,
)
else:
recover_share_inputs_map = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["last_preempted_idx"],
)
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
Expand Down
4 changes: 3 additions & 1 deletion fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,9 @@ def _prepare_inputs(self, last_token_num=-1, is_dummy_or_profile_run=False) -> N
return token_num_event

def _process_reorder(self) -> None:
if self.attn_backends and getattr(self.attn_backends[0], "enable_ids_reorder", False):
if self.attn_backends and (
getattr(self.attn_backends[0], "enable_ids_reorder", False) or envs.ENABLE_PD_REORDER
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么要另外增加一个开启方式呢

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不然没法加单测,现在attn 默认不重排的,又没发直接给append attn开重排,就加了个环境变量

if (
self.enable_mm
and not envs.ENABLE_V1_KVCACHE_SCHEDULER
Expand Down
53 changes: 22 additions & 31 deletions fastdeploy/worker/input_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,18 @@ def reorder_split_prefill_and_decode(input_batch: InputBatch):
right -= 1


def _recover_tensor(recover_tensor, index_to_batch_id_list):
sort_len = len(index_to_batch_id_list)
if isinstance(recover_tensor.place, paddle.CUDAPinnedPlace):
recover_res_tensor = paddle.empty_like(recover_tensor, device="cpu")
else:
recover_res_tensor = paddle.empty_like(recover_tensor)
recover_res_tensor[:sort_len] = recover_tensor[:sort_len][index_to_batch_id_list]
if sort_len < recover_res_tensor.shape[0]:
recover_res_tensor[sort_len:] = recover_tensor[sort_len:]
return recover_res_tensor


def recover_batch_index_for_output(output_cls, index_to_batch_id, enable_pd_reorder, recover_list):
"""
Reorder model_output according to index_to_batch_id mapping.
Expand All @@ -750,10 +762,8 @@ def recover_batch_index_for_output(output_cls, index_to_batch_id, enable_pd_reor
res_map = {}
is_not_swapped = all(i == v for i, v in index_to_batch_id.items()) or not enable_pd_reorder
# Create a new tensor to store the reordered results
sorted_keys = sorted(index_to_batch_id.keys())
if not is_not_swapped:
index_to_batch_id_tmp = [index_to_batch_id[key] for key in sorted_keys]
index_to_batch_id_tensor = paddle.to_tensor(index_to_batch_id_tmp, dtype="int64")
src_order = [k for k, v in sorted(index_to_batch_id.items(), key=lambda x: x[1])]
for recover_name in recover_list:
if isinstance(output_cls, dict):
recover_tensor = output_cls[recover_name]
Expand All @@ -765,9 +775,7 @@ def recover_batch_index_for_output(output_cls, index_to_batch_id, enable_pd_reor

if isinstance(recover_tensor, paddle.Tensor):
# Create a new tensor to store the reordered results
res_map[recover_name] = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), recover_tensor, recover_tensor.shape
)
res_map[recover_name] = _recover_tensor(recover_tensor, src_order)
elif isinstance(recover_tensor, list):
real_recover_tensor = recover_tensor.copy()
for i1, i2 in enumerate(index_to_batch_id):
Expand Down Expand Up @@ -795,48 +803,31 @@ def recover_batch_index_for_sampler_output(sampler_output, index_to_batch_id, en

sampled_token_ids = sampler_output.sampled_token_ids
# Create a new tensor to store the reordered results
sorted_keys = sorted(index_to_batch_id.keys())
index_to_batch_id_tmp = [index_to_batch_id[key] for key in sorted_keys]
index_to_batch_id_tensor = paddle.to_tensor(index_to_batch_id_tmp, dtype="int64")

real_token_ids = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), sampled_token_ids, sampled_token_ids.shape
)
src_order = [k for k, v in sorted(index_to_batch_id.items(), key=lambda x: x[1])]
real_token_ids = _recover_tensor(sampled_token_ids, src_order)
sampler_output.sampled_token_ids = real_token_ids

if sampler_output.logprobs_tensors is not None:
logprob_token_ids = sampler_output.logprobs_tensors.logprob_token_ids
logprobs = sampler_output.logprobs_tensors.logprobs
selected_token_ranks = sampler_output.logprobs_tensors.selected_token_ranks
real_logprob_token_ids = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), logprob_token_ids, sampled_token_ids.shape
)

real_logprobs = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), logprobs, sampled_token_ids.shape
)
real_selected_token_ranks = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), selected_token_ranks, sampled_token_ids.shape
)
real_logprob_token_ids = _recover_tensor(logprob_token_ids, src_order)
real_logprobs = _recover_tensor(logprobs, src_order)
real_selected_token_ranks = _recover_tensor(selected_token_ranks, src_order)
sampler_output.logprobs_tensors.logprob_token_ids = real_logprob_token_ids
sampler_output.logprobs_tensors.logprobs = real_logprobs
sampler_output.logprobs_tensors.sampled_token_ranks = real_selected_token_ranks

if sampler_output.token_num_per_batch is not None:
token_num_per_batch = sampler_output.token_num_per_batch
real_token_num_per_batch = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), token_num_per_batch, sampled_token_ids.shape
)
real_token_num_per_batch = _recover_tensor(token_num_per_batch, src_order)
sampler_output.token_num_per_batch = real_token_num_per_batch

if sampler_output.cu_batch_token_offset is not None:
cu_batch_token_offset = sampler_output.cu_batch_token_offset
real_cu_batch_token_offset = paddle.scatter_nd(
paddle.unsqueeze(index_to_batch_id_tensor, axis=-1), cu_batch_token_offset, sampled_token_ids.shape
)
real_cu_batch_token_offset = _recover_tensor(cu_batch_token_offset, src_order)
sampler_output.cu_batch_token_offset = real_cu_batch_token_offset

if sampler_output.logits is not None:
logits = sampler_output.logits
real_logits = paddle.gather(logits, index_to_batch_id_tensor, axis=0)
real_logits = _recover_tensor(logits, src_order)
sampler_output.logits = real_logits
108 changes: 108 additions & 0 deletions tests/e2e/test_pd_reorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys

import pytest

current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, ".."))
if project_root not in sys.path:
sys.path.insert(0, project_root)

from model_loader.utils import (
form_model_get_output_topp0,
get_paddle_model_path,
run_with_timeout,
)

os.environ["ENABLE_PD_REORDER"] = "1"

model_param_map = {
"ernie-4_5-21b-a3b-bf16-paddle": {
"tensor_parallel_size": 2,
"quantizations": [None],
"max_num_seqs": 3,
"graph_optimization_config": {"use_cudagraph": False},
"env": {"ENABLE_PD_REORDER": "1"},
}
}
prompts = [
"解释下温故而知新",
"Hello, my name is",
"鲁迅是谁",
"将李白的静夜思改为现代诗歌",
"你好",
"请介绍下你自己",
]

params = []
for model, cfg in model_param_map.items():
for q in cfg["quantizations"]:
if isinstance(q, dict):
quant, backend, env = q["quant_type"], q.get("backend", "default"), q.get("env", {})
else:
quant, backend, env = q, "default", {}
params.append(
pytest.param(
model,
cfg.get("tensor_parallel_size", 1),
cfg.get("max_num_seqs", 1),
cfg.get("max_model_len", 1024),
quant,
cfg.get("max_tokens", 128),
env,
marks=[pytest.mark.core_model],
id=f"{model}.{quant}.{backend}",
)
)


@pytest.mark.parametrize(
"model_name_or_path,tensor_parallel_size,max_num_seqs,max_model_len,quantization,max_tokens,env",
params,
)
def test_model_against_baseline(
fd_runner,
model_name_or_path: str,
tensor_parallel_size: int,
max_num_seqs: int,
max_model_len: int,
max_tokens: int,
quantization: str,
env,
monkeypatch,
) -> None:
"""
Test that model output matches baseline file.
"""
model_path = get_paddle_model_path(model_name_or_path)
for k, v in env.items():
monkeypatch.setenv(k, v)
# Run model
_ = run_with_timeout(
target=form_model_get_output_topp0,
args=(
fd_runner,
model_path,
tensor_parallel_size,
max_num_seqs,
max_model_len,
max_tokens,
quantization,
"dummy",
prompts,
),
)
Loading