Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions custom_ops/gpu_ops/reasoning_phase_token_constraint.cu
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ __global__ void apply_token_enforce_generation_scores_kernel(
int tid = threadIdx.x;

const int bs_idx = batch_id_per_token_output[token_idx];
if (bs_idx < 0) return;
const int query_start_token_idx = cu_seqlens_q_output[bs_idx];
bool is_batch_first_token = (token_idx == query_start_token_idx);

Expand Down
1 change: 1 addition & 0 deletions custom_ops/gpu_ops/rebuild_padding.cu
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ __global__ void RebuildAppendPaddingKernel(T *output_data,
i += gridDim.x * blockDim.x * VecSize) {
const int out_token_id = i / dim_embed;
const int bi = batch_id_per_token_output[out_token_id];
if (bi < 0) continue;
if (seq_len_this_time[bi] == 0) continue;
if (seq_len_decoder[bi] == 0 && seq_len_encoder[bi] == 0) continue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ void DraftModelPreprocess(const paddle::Tensor& draft_tokens,
int pre_ids_len = pre_ids.shape()[1];
auto cu_stream = seq_lens_this_time.stream();
int target_model_draft_tokens_len = target_model_draft_tokens.shape()[1];
auto not_need_stop_gpu =
not_need_stop.copy_to(seq_lens_this_time.place(), false);

draft_model_preprocess_kernel<kBlockSize><<<1, kBlockSize, 0, cu_stream>>>(
const_cast<int64_t*>(draft_tokens.data<int64_t>()),
Expand All @@ -187,7 +185,7 @@ void DraftModelPreprocess(const paddle::Tensor& draft_tokens,
const_cast<int*>(seq_lens_encoder.data<int>()),
const_cast<int*>(seq_lens_decoder.data<int>()),
const_cast<int64_t*>(step_idx.data<int64_t>()),
const_cast<bool*>(not_need_stop_gpu.data<bool>()),
const_cast<bool*>(not_need_stop.data<bool>()),
const_cast<int64_t*>(pre_ids.data<int64_t>()),
accept_tokens.data<int64_t>(),
accept_num.data<int>(),
Expand All @@ -205,10 +203,6 @@ void DraftModelPreprocess(const paddle::Tensor& draft_tokens,
target_model_draft_tokens_len,
pre_ids_len,
is_splitwise_prefill);
auto not_need_stop_cpu =
not_need_stop_gpu.copy_to(not_need_stop.place(), false);
bool* not_need_stop_data = const_cast<bool*>(not_need_stop.data<bool>());
not_need_stop_data[0] = not_need_stop_cpu.data<bool>()[0];
}

PD_BUILD_STATIC_OP(draft_model_preprocess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ void DraftModelUpdate(const paddle::Tensor& inter_next_tokens,
auto seq_lens_this_time_shape = seq_lens_this_time.shape();
auto cu_stream = seq_lens_this_time.stream();
const int real_bsz = seq_lens_this_time_shape[0];
auto not_need_stop_gpu =
not_need_stop.copy_to(seq_lens_this_time.place(), false);
const int end_ids_len = end_ids.shape()[0];
const int max_draft_token = draft_tokens.shape()[1];
const int pre_id_length = pre_ids.shape()[1];
Expand All @@ -149,7 +147,7 @@ void DraftModelUpdate(const paddle::Tensor& inter_next_tokens,
const_cast<int64_t*>(step_idx.data<int64_t>()),
cu_seqlens_q_output.data<int>(),
const_cast<bool*>(stop_flags.data<bool>()),
not_need_stop_gpu.data<bool>(),
const_cast<bool*>(not_need_stop.data<bool>()),
max_dec_len.data<int64_t>(),
end_ids.data<int64_t>(),
const_cast<int64_t*>(base_model_draft_tokens.data<int64_t>()),
Expand All @@ -161,11 +159,6 @@ void DraftModelUpdate(const paddle::Tensor& inter_next_tokens,
max_seq_len,
substep,
prefill_one_step_stop);

auto not_need_stop_cpu =
not_need_stop_gpu.copy_to(not_need_stop.place(), false);
bool* not_need_stop_data = const_cast<bool*>(not_need_stop.data<bool>());
not_need_stop_data[0] = not_need_stop_cpu.data<bool>()[0];
}

PD_BUILD_STATIC_OP(draft_model_update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ __global__ inline void min_length_logits_process(
const int token_idx = threadIdx.x;
if (token_idx >= token_num) return;
const int bi = batch_id_per_token_output[token_idx];
if (bi < 0) return;
if (bi >= bs) return;
const int query_start_token_idx = cu_seqlens_q_output[bi];

Expand Down Expand Up @@ -59,6 +60,7 @@ __global__ inline void min_length_logits_process<half>(
const int token_idx = threadIdx.x;
if (token_idx >= token_num) return;
const int bi = batch_id_per_token_output[token_idx];
if (bi < 0) return;
if (bi >= bs) return;
const int query_start_token_idx = cu_seqlens_q_output[bi];

Expand All @@ -85,6 +87,7 @@ __global__ void update_repeat_times(const int64_t *token_ids_all,
const int token_idx = blockIdx.x;
if (token_idx >= token_num) return;
const int bi = batch_id_per_token_output[token_idx];
if (bi < 0) return;
if (bi >= bs) return;
if (cur_len[bi] < 0) {
return;
Expand Down Expand Up @@ -115,6 +118,7 @@ __global__ void update_value_by_repeat_times(
const int token_idx = blockIdx.x;
if (token_idx >= token_num) return;
const int bi = batch_id_per_token_output[token_idx];
if (bi < 0) return;
if (bi >= bs) return;
int tid = threadIdx.x;
T *logits_now = logits + token_idx * length;
Expand Down Expand Up @@ -146,7 +150,7 @@ __global__ void ban_bad_words(T *logits,
const int token_idx = blockIdx.x;
if (token_idx >= token_num) return;
const int bi = batch_id_per_token_output[token_idx];

if (bi < 0) return;
if (bi >= bs) return;
int tid = threadIdx.x;
T *logits_now = logits + token_idx * length;
Expand Down
15 changes: 8 additions & 7 deletions custom_ops/gpu_ops/speculate_decoding/speculate_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ std::vector<paddle::Tensor> SpeculatePreProcess(
const int bsz = seq_len.shape()[0];
const int max_seq_len = input_ids_shape[1];
const int token_num_data = cpu_token_num;
auto ids_remove_padding = paddle::empty(
{token_num_data}, paddle::DataType::INT64, input_ids.place());
auto batch_id_per_token = paddle::empty(
{token_num_data}, paddle::DataType::INT32, input_ids.place());
auto ids_remove_padding = paddle::full(
{token_num_data}, 2, paddle::DataType::INT64, input_ids.place());
auto batch_id_per_token = paddle::full(
{token_num_data}, -1, paddle::DataType::INT32, input_ids.place());
auto cu_seqlens_q =
paddle::empty({bsz + 1}, paddle::DataType::INT32, input_ids.place());
auto cu_seqlens_k =
Expand All @@ -170,9 +170,10 @@ std::vector<paddle::Tensor> SpeculatePreProcess(
auto cu_seq_lens_q_output =
paddle::empty({bsz + 1}, paddle::DataType::INT32, input_ids.place());
auto batch_id_per_token_output =
paddle::empty({bsz * max_draft_tokens_per_batch},
paddle::DataType::INT32,
input_ids.place());
paddle::full({bsz * max_draft_tokens_per_batch},
-1,
paddle::DataType::INT32,
input_ids.place());
auto real_output_token_num =
paddle::empty({1}, paddle::DataType::INT32, input_ids.place());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ void SpeculateScheduleCache(const paddle::Tensor &draft_tokens,
prefill_one_step_stop = true;
}
}
auto not_need_stop_gpu = not_need_stop.copy_to(stop_flags.place(), false);
speculate_schedula_cache<BlockSize>
<<<1, BlockSize, 0, seq_lens_this_time.stream()>>>(
draft_tokens.data<int64_t>(),
Expand All @@ -150,7 +149,7 @@ void SpeculateScheduleCache(const paddle::Tensor &draft_tokens,
const_cast<int *>(accept_num.data<int>()),
const_cast<int64_t *>(accept_tokens.data<int64_t>()),
const_cast<bool *>(is_block_step.data<bool>()),
const_cast<bool *>(not_need_stop_gpu.data<bool>()),
const_cast<bool *>(not_need_stop.data<bool>()),
real_bsz,
max_bsz,
max_next_step_tokens,
Expand All @@ -159,11 +158,6 @@ void SpeculateScheduleCache(const paddle::Tensor &draft_tokens,
block_size,
block_num_per_seq,
prefill_one_step_stop);

auto not_need_stop_cpu =
not_need_stop_gpu.copy_to(not_need_stop.place(), true);
bool *not_need_stop_data = const_cast<bool *>(not_need_stop.data<bool>());
not_need_stop_data[0] = not_need_stop_cpu.data<bool>()[0];
}

PD_BUILD_STATIC_OP(speculate_schedule_cache)
Expand Down
1 change: 1 addition & 0 deletions custom_ops/gpu_ops/speculate_decoding/top_p_candidates.cu
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ __global__ void KeMatrixTopPBeamTopKFt(
const int lane = tid % 32;
const int token_id = blockIdx.x;
const int bid = batch_id_per_token_output[token_id];
if (bid < 0) return;

int top_num = TopPBeamTopK;
float top_p_value = static_cast<float>(top_ps[bid]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,11 @@ void UnifiedUpdateModelStatus(const paddle::Tensor &seq_lens_encoder,
constexpr int BlockSize = 1024;

// has_running_seqs is CPU tensor, need to copy to GPU first
auto has_running_seqs_gpu =
has_running_seqs.copy_to(seq_lens_this_time.place(), false);
unified_update_model_status_kernel<BlockSize>
<<<1, BlockSize, 0, seq_lens_this_time.stream()>>>(
const_cast<int *>(seq_lens_encoder.data<int>()),
const_cast<int *>(seq_lens_decoder.data<int>()),
const_cast<bool *>(has_running_seqs_gpu.data<bool>()),
const_cast<bool *>(has_running_seqs.data<bool>()),
const_cast<int64_t *>(step_input_ids.data<int64_t>()),
const_cast<int64_t *>(step_output_ids.data<int64_t>()),
const_cast<int *>(step_output_len.data<int>()),
Expand All @@ -213,11 +211,6 @@ void UnifiedUpdateModelStatus(const paddle::Tensor &seq_lens_encoder,
max_step_tokens,
max_model_len,
num_end_tokens);
// Copy result back to CPU
auto has_running_seqs_cpu =
has_running_seqs_gpu.copy_to(has_running_seqs.place(), false);
bool *out_data = const_cast<bool *>(has_running_seqs.data<bool>());
out_data[0] = has_running_seqs_cpu.data<bool>()[0];
}

PD_BUILD_STATIC_OP(unified_update_model_status)
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def __post_init__(self):
self.enable_prefix_caching = False
if (
not current_platform.is_cuda()
or self.speculative_config is not None
or (self.speculative_config is not None and self.enable_logprob)
or self.splitwise_role == "prefill"
or self.dynamic_load_weight
):
Expand Down
2 changes: 2 additions & 0 deletions fastdeploy/model_executor/forward_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ class ForwardMeta:

position_ids: Optional[paddle.Tensor] = None

real_bsz: int = 0

def clear_caches(self):
"""Safely clean up the caches"""
if self.caches:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,15 @@ def run_static_model(self, entry: ConcreteSizeEntry, **kwargs):

def __call__(self, **kwargs) -> List[paddle.Tensor] | paddle.Tensor:
# Get real shape (total num tokens)
ids_remove_padding: paddle.Tensor = kwargs["forward_meta"].ids_remove_padding
real_shape = ids_remove_padding.shape[0]
if self.speculative_decoding and all(self.real_bsz_to_captured_size.values()):
seq_lens_this_time: paddle.Tensor = kwargs["forward_meta"].seq_lens_this_time
num_running_requests = int((seq_lens_this_time.flatten() > 0).sum().item())
real_bsz = kwargs["forward_meta"].real_bsz
num_running_requests = real_bsz if real_bsz > 0 else int((seq_lens_this_time.flatten() > 0).sum().item())
num_running_requests = max(1, num_running_requests)
real_shape = self.real_bsz_to_captured_size[num_running_requests]
else:
ids_remove_padding: paddle.Tensor = kwargs["forward_meta"].ids_remove_padding
real_shape = ids_remove_padding.shape[0]
exist_prefill = kwargs["forward_meta"].exist_prefill
# Static split graph mode: use Static + CUDAGraph for prefill/mixed phase
static_cudagraph_for_prefill = exist_prefill and not self.full_cuda_graph and self.dy2st
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/model_executor/layers/sample/logprobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def gather_logprobs(
indices = token_ids
top_logprobs = token_logprobs

return LogprobsTensors(indices, top_logprobs, token_ranks)
return LogprobsTensors(indices.cpu(), top_logprobs.cpu(), token_ranks.cpu())


def build_output_logprobs(
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/model_executor/layers/sample/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,7 @@ def forward_cuda(
)
sampler_output.logprobs_tensors = logprobs_tensors
if cu_batch_token_offset is not None:
sampler_output.cu_batch_token_offset = cu_batch_token_offset
sampler_output.cu_batch_token_offset = cu_batch_token_offset.cpu()
return sampler_output

def forward_xpu(
Expand Down
67 changes: 40 additions & 27 deletions fastdeploy/model_executor/pre_and_post_process.py
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE:RL场景当前没法开 Overlap schedule

Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,6 @@ def post_process_specualate(
model_output: ModelOutputData,
share_inputs: InputBatch,
sampling_metadata: SamplingMetadata,
save_each_rank: bool = False,
skip_save_output: bool = False,
think_end_id: int = -1,
splitwise_role_is_decode: bool = False,
enable_entropy: bool = False,
Expand Down Expand Up @@ -508,7 +506,7 @@ def post_process_specualate(
unified_update_model_status(
model_output.seq_lens_encoder, # seq_lens_encoder
model_output.seq_lens_decoder, # seq_lens_decoder
model_output.not_need_stop, # has_running_seqs
model_output.not_need_stop_device, # has_running_seqs
model_output.draft_tokens, # step_input_ids
model_output.accept_tokens, # step_output_ids (read-write)
model_output.accept_num, # step_output_len (read-write)
Expand All @@ -522,24 +520,35 @@ def post_process_specualate(
model_output.max_dec_len, # max_dec_len
)


def save_output_specualate(
sampler_output: SamplerOutput,
model_output: ModelOutputData,
share_inputs: InputBatch,
save_each_rank: bool = False,
skip_save_output: bool = False,
):
if not skip_save_output:
if sampler_output.logprobs_tensors is None:
recover_model_output_map = recover_batch_index_for_output(
model_output,
recover_share_inputs = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["accept_tokens", "accept_num", "seq_lens_decoder", "prompt_lens"],
)
recover_share_inputs = recover_batch_index_for_output(
share_inputs, model_output.index_to_batch_id, model_output.enable_pd_reorder, ["preempted_idx"]
[
"accept_tokens_cpu",
"accept_num_cpu",
"seq_lens_decoder_cpu",
"prompt_lens_cpu",
"last_preempted_idx",
],
)
speculate_save_output(
recover_model_output_map["accept_tokens"],
recover_model_output_map["accept_num"],
recover_share_inputs["accept_tokens_cpu"],
recover_share_inputs["accept_num_cpu"],
model_output.not_need_stop,
recover_model_output_map["seq_lens_decoder"],
recover_model_output_map["prompt_lens"],
recover_share_inputs["preempted_idx"],
recover_share_inputs["seq_lens_decoder_cpu"],
recover_share_inputs["prompt_lens_cpu"],
recover_share_inputs["last_preempted_idx"],
model_output.mp_rank,
save_each_rank,
bool(envs.ENABLE_V1_KVCACHE_SCHEDULER),
Expand All @@ -548,30 +557,35 @@ def post_process_specualate(
recover_batch_index_for_sampler_output(
sampler_output, model_output.index_to_batch_id, model_output.enable_pd_reorder
)
recover_model_output_map = recover_batch_index_for_output(
model_output,
recover_share_inputs = recover_batch_index_for_output(
share_inputs,
model_output.index_to_batch_id,
model_output.enable_pd_reorder,
["seq_lens_decoder", "prompt_lens"],
)
recover_share_inputs = recover_batch_index_for_output(
share_inputs, model_output.index_to_batch_id, model_output.enable_pd_reorder, ["preempted_idx"]
[
"sampled_token_ids",
"accept_tokens_cpu",
"accept_num_cpu",
"seq_lens_decoder_cpu",
"prompt_lens_cpu",
"last_preempted_idx",
],
)
speculate_save_output_topk(
sampler_output.sampled_token_ids,
recover_share_inputs["sampled_token_ids"],
sampler_output.logprobs_tensors.logprob_token_ids,
sampler_output.logprobs_tensors.logprobs,
sampler_output.logprobs_tensors.selected_token_ranks,
sampler_output.token_num_per_batch,
recover_share_inputs["accept_num_cpu"],
sampler_output.cu_batch_token_offset,
model_output.not_need_stop,
recover_model_output_map["seq_lens_decoder"],
recover_model_output_map["prompt_lens"],
recover_share_inputs["preempted_idx"],
recover_share_inputs["seq_lens_decoder_cpu"],
recover_share_inputs["prompt_lens_cpu"],
recover_share_inputs["last_preempted_idx"],
3, # mtype
model_output.mp_rank,
save_each_rank,
)
share_inputs["last_preempted_idx"][:] = 0


def post_process(
Expand Down Expand Up @@ -609,13 +623,12 @@ def post_process(
model_output,
share_inputs,
sampling_metadata,
save_each_rank,
skip_save_output,
think_end_id,
splitwise_role_is_decode,
enable_entropy,
routing_replay_manager,
)
share_inputs["last_preempted_idx"].copy_(share_inputs["preempted_idx"])
else:
post_process_normal(
sampler_or_pooler_output,
Expand Down
Loading
Loading