[DataProcessor]merge processor#7747
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 旨在将输入预处理/数据处理链路进行合并重构:引入统一的 Processor(文本 + 多模态可选),并新增一套基于 MMProcessor 抽象类的多模态处理实现,同时同步调整 LLM chat 入参路径与相关单测。
Changes:
- 新增
fastdeploy/input/processor.py,作为统一的请求预处理与响应解码入口,并在InputPreprocessor中替换原 Text/MM processor 的创建逻辑。 - 新增多模态子模块
fastdeploy/input/multimodal/*(含MMProcessor抽象类、Qwen/Ernie/PaddleOCR 处理器与 image processors)。 - 调整
LLM.chat()的 prompts/messages 传递方式与部分测试用例。
PR 标题/描述检查(需补充)
- 标题建议按规范加空格与更清晰的动词,例如:
[DataProcessor] Merge processors/[DataProcessor] Merge input processor。 - PR 描述仍是模板,缺少 Motivation / Modifications / Usage / Accuracy Tests 等信息;本 PR 改动面较大,建议补齐并说明兼容性与迁移影响,并视情况补充/更新相关文档。
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/input/test_preprocess.py | 适配 Processor 替换原 TextProcessor 的创建逻辑测试 |
| tests/entrypoints/test_generation.py | 调整 chat 一致性测试(目前存在覆盖面退化问题) |
| tests/entrypoints/test_chat.py | 改为 hook process_messages 以捕获 prompt 拼接结果 |
| fastdeploy/input/processor.py | 新增统一 Processor(文本/多模态入口) |
| fastdeploy/input/preprocess.py | InputPreprocessor 改为创建统一 Processor 并挂载 mm_processor |
| fastdeploy/entrypoints/llm.py | chat 入参传递与 _add_request 支持 messages/batch 形态调整 |
| fastdeploy/entrypoints/chat_utils.py | 调整 parse_chat_messages 的 content 归一化行为(当前存在兼容性问题) |
| fastdeploy/input/multimodal/mm_processor.py | 新增 MMProcessor 抽象基类与多模态处理模板流程(当前与引擎侧字段契约不一致) |
| fastdeploy/input/multimodal/qwen_vl.py | 新增 Qwen2.5-VL 多模态 processor |
| fastdeploy/input/multimodal/qwen3_vl.py | 新增 Qwen3-VL 多模态 processor |
| fastdeploy/input/multimodal/paddleocr_vl.py | 新增 PaddleOCR-VL 多模态 processor |
| fastdeploy/input/multimodal/ernie_vl.py | 新增 ERNIE4.5-VL 多模态 processor |
| fastdeploy/input/multimodal/common.py | 新增多模态通用 resize 工具 |
| fastdeploy/input/multimodal/init.py | 导出多模态 processors |
| fastdeploy/input/multimodal/image_processors/* | 新增/整理 Qwen/Qwen3/PaddleOCR image processors 与导出 |
| fastdeploy/input/multimodal/image_processors/init.py | image processor 导出聚合 |
Comments suppressed due to low confidence (1)
fastdeploy/entrypoints/chat_utils.py:209
parse_chat_messages现在在content is None或content为str时直接返回None/str,会破坏下游对content为 list[dict] 的假设(例如MultiModalProcessor._extract_mm_items会对item.get(...)直接调用,遇到 str 会抛 AttributeError)。建议保持返回格式稳定:None->[],str->[{"type":"text","text":...}],仅在原始为 list 时才解析为多模态 part 列表。
role = message["role"]
content = message["content"]
if content is None:
parsed_content = content
elif isinstance(content, str):
parsed_content = content
else:
parsed_content = [parse_content_part(mm_parser, part) for part in content]
conversation.append({"role": role, "content": parsed_content})
| for prompt_token_ids in self.TOKEN_IDS: | ||
| with self.subTest(prompt_token_ids=prompt_token_ids): | ||
| output1 = self.llm.chat(messages=[prompt_token_ids], sampling_params=sampling_params) | ||
| output2 = self.llm.chat( | ||
| [{"prompt": "", "prompt_token_ids": prompt_token_ids}], sampling_params=sampling_params | ||
| ) | ||
| output2 = self.llm.chat(messages=[prompt_token_ids], sampling_params=sampling_params) | ||
| self.assert_outputs_equal(output1, output2) |
| "video_cnt": 0, | ||
| "num_input_image_tokens": 0, | ||
| "num_input_video_tokens": 0, | ||
| "mm_positions": [], |
| hashes_to_cache, items_to_cache = [], [] | ||
| for idx, item in enumerate(all_items): | ||
| # Items fetched from cache (data is tuple) should not be re-cached | ||
| if isinstance(item.data, tuple): | ||
| continue | ||
| # Build pixel_values and meta for this item | ||
| if outputs["images"] is None or idx >= len(outputs["images"]): | ||
| continue | ||
| pixel_values = outputs["images"][idx] | ||
| # Compute hash: prefer uuid, fallback to content hash | ||
| cache_key = item.uuid if item.uuid else MultimodalHasher.hash_features(pixel_values) | ||
| meta = {} | ||
| if idx < len(outputs.get("grid_thw", []) or []): | ||
| grid_thw = np.asarray(outputs["grid_thw"][idx]) if outputs["grid_thw"] is not None else None | ||
| if grid_thw is not None: | ||
| if grid_thw.ndim > 1: | ||
| t_val, h, w = grid_thw[0] | ||
| else: | ||
| t_val, h, w = grid_thw | ||
| meta["thw"] = (int(t_val), int(h), int(w)) |
| def process_messages(self, request): | ||
| """将 messages 格式转换为 prompt + multimodal_data(通用,与模型无关)。 | ||
|
|
||
| 职责: | ||
| 1. 从 messages 中提取多模态内容(图片/视频) | ||
| → 写入 request["multimodal_data"] = {"image": [...], "video": [...], "mm_order": [...]} | ||
| 2. 调用 tokenizer.apply_chat_template(messages) 拼接 prompt | ||
| → 写入 request["prompt"] | ||
|
|
||
| 调用时机:request 含 "messages" 且尚未有 "prompt"/"prompt_token_ids" 时。 | ||
| """ |
| for seq in stop_sequences: | ||
| if seq != self.tokenizer.eos_token_id: | ||
| stop_seqs.append(self.tokenizer.convert_tokens_to_ids(self.tokenizer.tokenize(seq))) |
| if prompt_token_ids[0] > self.tokenizer.vocab_size: | ||
| if not add_prefix_space: | ||
| log_request( | ||
| level=1, | ||
| message="bad_words: '{prompt}' token id {token_id} > vocab_size, skipping", | ||
| prompt=prompt, | ||
| token_id=prompt_token_ids[0], | ||
| ) | ||
| continue | ||
| if prompt_token_ids not in token_ids: | ||
| token_ids.extend(prompt_token_ids) |
| if isinstance(self.tokenizer, (LlamaTokenizer, Llama3Tokenizer)) and not self.tokenizer.pad_token_id: | ||
| return self.tokenizer.eos_token |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览
2 任务状态汇总2.1 Required任务:8/10 通过
2.2 可选任务 — 26/30 通过
3 失败详情(仅 Required)Approval — 审批规则(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请 xyxinyang 或 zyyzghb approve 此 PR 关联变更: PR 在 DataProcessor 相关文件中新增了 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #7747 +/- ##
==========================================
Coverage ? 70.27%
==========================================
Files ? 409
Lines ? 57383
Branches ? 8991
==========================================
Hits ? 40325
Misses ? 14258
Partials ? 2800
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| if isinstance(self.tokenizer, (LlamaTokenizer, Llama3Tokenizer)) and not self.tokenizer.pad_token_id: | ||
| return self.tokenizer.eos_token | ||
| return self.tokenizer.pad_token_id |
| if prompt_token_ids[0] > self.tokenizer.vocab_size: | ||
| if not add_prefix_space: | ||
| log_request( | ||
| level=1, | ||
| message="bad_words: '{prompt}' token id {token_id} > vocab_size, skipping", | ||
| prompt=prompt, | ||
| token_id=prompt_token_ids[0], | ||
| ) | ||
| continue | ||
| if prompt_token_ids not in token_ids: | ||
| token_ids.extend(prompt_token_ids) |
| started = False | ||
| ended = False | ||
| tokens_after_start = 0 | ||
| last_token_id = None | ||
| in_thinking = False |
| def process_messages(self, request): | ||
| """将 messages 格式转换为 prompt + multimodal_data(通用,与模型无关)。 | ||
|
|
||
| 职责: | ||
| 1. 从 messages 中提取多模态内容(图片/视频) | ||
| → 写入 request["multimodal_data"] = {"image": [...], "video": [...], "mm_order": [...]} | ||
| 2. 调用 tokenizer.apply_chat_template(messages) 拼接 prompt | ||
| → 写入 request["prompt"] | ||
|
|
||
| 调用时机:request 含 "messages" 且尚未有 "prompt"/"prompt_token_ids" 时。 | ||
| """ |
| if ed == image_pos: | ||
| mm_item = mm_context.images[image_idx] | ||
| if not isinstance(mm_item.data, tuple): | ||
| self.preprocess_image(mm_item.data, outputs, mm_item.uuid) | ||
| else: | ||
| self.preprocess_cached_image(mm_item.data, outputs, mm_item.uuid) | ||
| image_idx += 1 |
| outputs["grid_thw"].append(np.array([[t, h, w]])) | ||
|
|
||
| outputs["mm_positions"].append(ImagePosition(len(outputs["input_ids"]), num_tokens)) | ||
| outputs["input_ids"].extend([self.image_token_id] * num_tokens) |
| st, mm_idx = 0, 0 | ||
| while st < prompt_token_ids_len: | ||
| if prompt_token_ids[st] != self.image_token_id: | ||
| cur_idx = st | ||
| while cur_idx < prompt_token_ids_len and prompt_token_ids[cur_idx] != self.image_token_id: | ||
| cur_idx += 1 | ||
| self._add_text_tokens(prompt_token_ids[st:cur_idx], outputs) | ||
| st = cur_idx |
| tn = tn * int(second_per_grid_t) * self.tokens_per_second | ||
| t_index = tn.flatten() |
| def get(self, hashes: list) -> list: | ||
| """Retrieve cached multimodal data by hash list.""" | ||
| req = pickle.dumps(hashes) | ||
| self.socket.send_multipart([b"", req]) | ||
| _, resp = self.socket.recv_multipart() | ||
| items = pickle.loads(resp) | ||
| data_processor_logger.info(f"Get cache of mm_hashes: {hashes}") | ||
| return items | ||
|
|
||
| def put(self, hashes: list, items: list) -> None: | ||
| """Write processed multimodal items to cache.""" | ||
| req = pickle.dumps((hashes, items)) | ||
| self.socket.send_multipart([b"", req]) | ||
| data_processor_logger.info(f"Update cache of mm_hashes: {hashes}") |
| def test_consistency_single_prompt_tokens_chat(self): | ||
| """Test consistency between different prompt input formats""" | ||
| sampling_params = SamplingParams(temperature=1.0, top_p=0.0) | ||
|
|
||
| for prompt_token_ids in self.TOKEN_IDS: | ||
| with self.subTest(prompt_token_ids=prompt_token_ids): | ||
| output1 = self.llm.chat(messages=[prompt_token_ids], sampling_params=sampling_params) | ||
| output2 = self.llm.chat( | ||
| [{"prompt": "", "prompt_token_ids": prompt_token_ids}], sampling_params=sampling_params | ||
| ) | ||
| output2 = self.llm.chat(messages=[prompt_token_ids], sampling_params=sampling_params) | ||
| self.assert_outputs_equal(output1, output2) |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-09 15:51:53
📋 Review 摘要
PR 概述:合并多模态 VL 数据处理器,整合 QwenVL、Qwen3VL、ErnieVL、PaddleOCRVL 等图像/视频前处理逻辑到统一的 fastdeploy/input/multimodal/ 模块,并调整 entrypoints 的消息解析方式。
变更范围:fastdeploy/input/multimodal/、fastdeploy/entrypoints/
影响面 Tag:[DataProcessor] [APIServer]
📝 PR 规范检查
标题缺少 ] 与描述之间的空格,各描述段均为空(仅保留模板注释),Checklist 全部未勾选。
标题建议(可直接复制):
[DataProcessor] Merge multimodal VL processors
PR 描述建议(可直接复制,必须复刻 checklist §D2 模板的完整结构):
## Motivation
合并多个视觉语言模型(QwenVL、Qwen3VL、ErnieVL、PaddleOCRVL)的图像/视频前处理逻辑到统一的 `fastdeploy/input/multimodal/` 模块,消除重复代码,统一多模态数据处理接口。
## Modifications
- 新增 `fastdeploy/input/multimodal/` 目录,包含 `MMProcessor` 基类及 `QwenVLProcessor`、`Qwen3VLProcessor`、`ErnieVLProcessor`、`PaddleOCRVLProcessor` 四种实现
- 新增 `fastdeploy/input/multimodal/image_processors/`,包含 `QwenImageProcessor`、`Qwen3ImageProcessor`、`AdaptiveImageProcessor`(Ernie)、`PaddleOCRImageProcessor`
- 新增 `fastdeploy/input/multimodal/common.py`:共享 `smart_resize_qwen`、`smart_resize_paddleocr` 图像 resize 工具函数
- 修改 `fastdeploy/entrypoints/chat_utils.py`:`parse_chat_messages` 对 string/None content 不再封装成 list,保持原始格式
- 修改 `fastdeploy/entrypoints/llm.py`:`chat()` 直接传 messages 列表;`_add_request()` 新增 list-of-dict(messages 格式)分支,增加空列表安全检查
- 新增单元测试:`tests/input/multimodal/`(ErnieVL、QwenVL、PaddleOCRVL、MMProcessor)及更新 `tests/entrypoints/`
## Usage or Command
N/A
## Accuracy Tests
N/A
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [ ] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | fastdeploy/input/multimodal/ernie_vl.py:102 |
_write_back 用真值判断 request.get("prompt_token_ids") 无法区分 None 与 [] |
| 🟡 建议 | fastdeploy/input/multimodal/image_processors/ernie.py:148 |
assert 用于运行时用户输入校验,-O 优化模式下会失效;且断言条件 >= 0 与错误提示"positive int"语义不一致 |
| 🟡 建议 | fastdeploy/entrypoints/llm.py:334 |
空列表 [] 落入 else 分支,TypeError 错误信息中 list 仍被列为合法类型,令调用者困惑 |
| 🟡 建议 | fastdeploy/input/multimodal/common.py:124 |
smart_resize_paddleocr 缺少与 smart_resize_qwen 一致的最终边界校验,可能静默返回越界的 h_bar/w_bar |
总体评价
整体设计合理,多模态处理器统一化是正确方向,测试覆盖较为完整。存在若干防御性编码问题(真值判断、assert 校验、错误信息不准确)和一致性缺口(smart_resize_paddleocr 缺少最终边界校验),建议修复后合入。
|
|
||
| def _write_back(self, request: dict, outputs: dict) -> None: | ||
| """Ernie: preserve original prompt_token_ids if they already existed.""" | ||
| if request.get("prompt_token_ids"): |
There was a problem hiding this comment.
🟡 建议 _write_back 用真值判断 request.get("prompt_token_ids") 无法区分 None 与空列表 []。
当 prompt_token_ids=[] 时,真值为 False,会被错误覆盖而非保留。建议改为显式 is not None 检查:
if request.get("prompt_token_ids") is not None:
pass # preserve existing prompt_token_ids
else:
request["prompt_token_ids"] = outputs["input_ids"].tolist()|
|
||
| def set_pixels(self, min_pixels=None, max_pixels=None, msg=""): | ||
| if min_pixels is not None: | ||
| assert isinstance(min_pixels, int) and min_pixels >= 0, "min_pixels must be positive int" |
There was a problem hiding this comment.
🟡 建议 assert 被用于运行时用户输入校验(checklist §C 表层信号)。Python 在 -O 优化模式下 assert 会被跳过,导致非法入参静默通过。
此外断言条件 min_pixels >= 0 与错误提示 "must be positive int" 语义不一致(0 不是正整数)。建议改为显式 raise ValueError:
if not isinstance(min_pixels, int) or min_pixels <= 0:
raise ValueError(f"min_pixels must be a positive int, got {min_pixels}")| "prompt_token_ids": prompts[i], | ||
| "request_id": request_id, | ||
| } | ||
| elif isinstance(prompts[i], list) and len(prompts[i]) > 0 and isinstance(prompts[i][0], dict): |
There was a problem hiding this comment.
🟡 建议 当 prompts[i] 为空列表 [] 时,两个 len(prompts[i]) > 0 条件均不命中,最终落入 else: raise TypeError(...),但错误信息中 list 仍被列为合法类型,令调用者困惑。
建议在 else 前补充明确的空列表处理:
elif isinstance(prompts[i], list) and len(prompts[i]) == 0:
raise ValueError(f"prompt at index {i} is an empty list, which is not allowed.")| h_bar = round(height / factor) * factor | ||
| w_bar = round(width / factor) * factor | ||
| if h_bar * w_bar > max_pixels: | ||
| beta = math.sqrt((height * width) / max_pixels) |
There was a problem hiding this comment.
🟡 建议 smart_resize_paddleocr 缺少 smart_resize_qwen 同款的最终边界校验,可能静默返回越界的 h_bar/w_bar。
smart_resize_qwen 在末尾有:
if min_pixels > h_bar * w_bar or h_bar * w_bar > max_pixels:
raise ValueError(f"encounter invalid h_bar: {h_bar}, w_bar: {w_bar}")建议在 return h_bar, w_bar 前补充相同校验,保持两函数行为一致。
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.