Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions docs/features/thinking_budget.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Thinking Budget Logits Processor

## Overview

`ThinkingBudgetLogitsProcessor` limits the number of tokens generated inside the `<think> ... </think>` segment. When the budget is reached, it forces a line break token and then the `</think>` token to terminate the thinking section.

## When to Use

- Models that emit `<think>`/`</think>` tokens for reasoning.
- You need a hard cap on thinking length without changing sampling logic.

## How It Works

1. **CPU precompute (DataProcessor)**: when a request includes `thinking_budget`, the prompt token ids are scanned to determine whether thinking has started, whether it already ended, and how many tokens are already inside the thinking section.
2. **Per-step update**: during decoding, the processor tracks `last_token_id` and `tokens_after_start`.
3. **Budget enforcement**: once the budget is reached, it forces a line break and then the thinking end token.

## Requirements

- The model must provide valid token ids for `think_start_id`, `think_end_id`, and `line_break_id` (via `ModelConfig`).
- If any of these ids are invalid, the processor is disabled and `thinking_budget` will not take effect.

## Request Parameters

- `thinking_budget` (int, required to enable): maximum number of tokens after `<think>` before forced termination.
- `think_stop_sentence` (string, optional): a stop sentence that will be tokenized on the CPU side and enforced near the budget boundary.

## Online Usage

### 1. Start service

```bash
python -m fastdeploy.entrypoints.openai.api_server \
--model Qwen/Qwen3-0.6B \
--port 8180 \
--metrics-port 8181 \
--engine-worker-queue-port 8182 \
--max-model-len 32768 \
--max-num-seqs 32 \
--logits-processors ThinkingBudgetLogitsProcessor
```

### 2. Send request

```bash
curl -X POST "http://0.0.0.0:8180/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "Hello!"}],
"max_completion_tokens": 30,
"logits_processors_args": {
"thinking_budget": 20,
"think_stop_sentence": "Thinking limit reached, now replying."
}
}'
```

If you do not need thinking control for a request, simply omit `thinking_budget`.

## Offline Usage

```python
from fastdeploy import LLM, SamplingParams

llm = LLM(
model="Qwen/Qwen3-0.6B",
engine_worker_queue_port=8282,
cache_queue_port=8383,
logits_processors=["ThinkingBudgetLogitsProcessor"],
)

sampling_params = SamplingParams(
max_tokens=512,
logits_processors_args={"thinking_budget": 20, "think_stop_sentence": "Thinking limit reached, now replying."},
)

outputs = llm.chat([{"role": "user", "content": "Hello, who are u?"}], sampling_params)
print(outputs[0].outputs.text)
```

## Performance Note

This processor runs `update_state` and `apply` on every decode step. If you only need a hard thinking-length cap and care most about throughput, consider the operator-level reasoning-length controls instead of per-step logits processing.
83 changes: 83 additions & 0 deletions docs/zh/features/thinking_budget.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Thinking Budget Logits Processor

## 概述

`ThinkingBudgetLogitsProcessor` 用于限制 `<think> ... </think>` 区间的生成长度。当预算达到阈值时,会强制生成换行符 token,再强制生成 `</think>`,从而结束思考段。

## 适用场景

- 模型会输出 `<think>`/`</think>` 的思考标记。
- 需要对思考段做硬限制,但不希望改变采样策略。

## 工作原理

1. **CPU 侧预计算(DataProcessor)**:当请求中包含 `thinking_budget`,会基于 prompt 的 token ids 计算是否已进入思考段、是否已结束,以及已有的思考长度。
2. **每步更新**:解码过程中跟踪 `last_token_id` 与 `tokens_after_start`。
3. **预算约束**:达到预算后,依次强制换行符与思考结束 token。

## 前置要求

- 模型需提供有效的 `think_start_id`、`think_end_id`、`line_break_id`(来自 `ModelConfig`)。
- 若任意 id 无效,处理器会禁用,`thinking_budget` 不生效。

## 请求参数

- `thinking_budget`(int,启用所需):`<think>` 之后允许的最大 token 数。
- `think_stop_sentence`(string,可选):CPU 侧会将该字符串编码为 token ids,并在预算边界附近强制输出。

## 在线使用

### 1. 启动服务

```bash
python -m fastdeploy.entrypoints.openai.api_server \
--model Qwen/Qwen3-0.6B \
--port 8180 \
--metrics-port 8181 \
--engine-worker-queue-port 8182 \
--max-model-len 32768 \
--max-num-seqs 32 \
--logits-processors ThinkingBudgetLogitsProcessor
```

### 2. 发送请求

```bash
curl -X POST "http://0.0.0.0:8180/v1/chat/completions" \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "你好!"}],
"max_completion_tokens": 30,
"logits_processors_args": {
"thinking_budget": 20,
"think_stop_sentence": "思考已达上限,开始回复"
}
}'
```

如果某个请求不需要思考限制,直接省略 `thinking_budget` 即可。

## 离线使用

```python
from fastdeploy import LLM, SamplingParams

llm = LLM(
model="Qwen/Qwen3-0.6B",
engine_worker_queue_port=8282,
cache_queue_port=8383,
logits_processors=["ThinkingBudgetLogitsProcessor"],
)

sampling_params = SamplingParams(
max_tokens=512,
logits_processors_args={"thinking_budget": 20, "think_stop_sentence": "思考已达上限,开始回复"},
)

outputs = llm.chat([{"role": "user", "content": "将李白的静夜思改为现代诗"}], sampling_params)
print(outputs[0].outputs.text)
```

## 性能说明

该处理器会在每个 decode step 执行 `update_state` 与 `apply`。如果仅需要硬性的思考长度限制且更关注吞吐,建议优先使用算子级思考长度控制方案。
1 change: 1 addition & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ def __init__(
setattr(self, "freq_allocation", self.rope_scaling["mrope_section"][0])

self.ori_vocab_size = args.get("ori_vocab_size", self.vocab_size)
self.think_start_id = args.get("think_start_id", -1)
self.think_end_id = args.get("think_end_id", -1)
self.im_patch_id = args.get("image_patch_id", -1)
self.line_break_id = args.get("line_break_id", -1)
Expand Down
25 changes: 24 additions & 1 deletion fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1938,13 +1938,35 @@ def _start_worker_service(self):
else len(self.data_processor.tokenizer.vocab)
)

think_start_id = self.data_processor.tokenizer.get_vocab().get("<think>", -1)
if think_start_id >= 0:
self.llm_logger.info(f"Get think_start_id {think_start_id} from vocab.")
else:
self.llm_logger.info("No <think> token found in vocabulary, the model can not do reasoning.")
think_end_id = self.data_processor.tokenizer.get_vocab().get("</think>", -1)
if think_end_id > 0:
if think_end_id >= 0:
self.llm_logger.info(f"Get think_end_id {think_end_id} from vocab.")
else:
self.llm_logger.info("No </think> token found in vocabulary, the model can not do reasoning.")
image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1)
line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1)
if line_break_id < 0:
line_break_ids = self.data_processor.tokenizer.encode("\n", add_special_tokens=False)
if isinstance(line_break_ids, dict):
line_break_ids = line_break_ids.get("input_ids")
elif hasattr(line_break_ids, "input_ids"):
line_break_ids = line_break_ids.input_ids
if line_break_ids:
if isinstance(line_break_ids, (list, tuple)):
first = line_break_ids[0]
if isinstance(first, (list, tuple)):
line_break_id = int(first[0]) if first else -1
else:
line_break_id = int(first)
else:
line_break_id = int(line_break_ids)
if line_break_id >= 0:
self.llm_logger.info(f"Get line_break_id {line_break_id} from tokenizer.")

ports = ",".join(map(str, self.cfg.parallel_config.engine_worker_queue_port))
ips = None
Expand Down Expand Up @@ -1972,6 +1994,7 @@ def _start_worker_service(self):
f" --data_parallel_size {self.cfg.parallel_config.data_parallel_size}"
f" --quantization '{json.dumps(self.cfg.model_config.quantization)}'"
f" --ori_vocab_size {ori_vocab_size}"
f" --think_start_id {think_start_id}"
f" --think_end_id {think_end_id}"
f" --image_patch_id {image_patch_id}"
f" --line_break_id {line_break_id}"
Expand Down
25 changes: 24 additions & 1 deletion fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,13 +526,35 @@ def _start_worker_service(self):
else len(self.engine.data_processor.tokenizer.vocab)
)

think_start_id = self.data_processor.tokenizer.get_vocab().get("<think>", -1)
if think_start_id >= 0:
llm_logger.info(f"Get think_start_id {think_start_id} from vocab.")
else:
llm_logger.info("No <think> token found in vocabulary, the model can not do reasoning.")
think_end_id = self.data_processor.tokenizer.get_vocab().get("</think>", -1)
if think_end_id > 0:
if think_end_id >= 0:
llm_logger.info(f"Get think_end_id {think_end_id} from vocab.")
else:
llm_logger.info("No </think> token found in vocabulary, the model can not do reasoning.")
image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1)
line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1)
if line_break_id < 0:
line_break_ids = self.data_processor.tokenizer.encode("\n", add_special_tokens=False)
if isinstance(line_break_ids, dict):
line_break_ids = line_break_ids.get("input_ids")
elif hasattr(line_break_ids, "input_ids"):
line_break_ids = line_break_ids.input_ids
if line_break_ids:
if isinstance(line_break_ids, (list, tuple)):
first = line_break_ids[0]
if isinstance(first, (list, tuple)):
line_break_id = int(first[0]) if first else -1
else:
line_break_id = int(first)
else:
line_break_id = int(line_break_ids)
if line_break_id >= 0:
llm_logger.info(f"Get line_break_id {line_break_id} from tokenizer.")

ports = ",".join(map(str, self.cfg.parallel_config.engine_worker_queue_port))
ips = None
Expand Down Expand Up @@ -560,6 +582,7 @@ def _start_worker_service(self):
f" --data_parallel_size {self.cfg.parallel_config.data_parallel_size}"
f" --quantization '{json.dumps(self.cfg.model_config.quantization)}'"
f" --ori_vocab_size {ori_vocab_size}"
f" --think_start_id {think_start_id}"
f" --think_end_id {think_end_id}"
f" --image_patch_id {image_patch_id}"
f" --line_break_id {line_break_id}"
Expand Down
Loading
Loading