[RL] pause: use abort pipeline with scheduling loop alive for gracefu…#7753
[RL] pause: use abort pipeline with scheduling loop alive for gracefu…#7753jackyYang6 wants to merge 1 commit intoPaddlePaddle:developfrom
Conversation
|
Thanks for your contribution! |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览有 1 个 required 任务失败(
2 任务状态汇总2.1 Required任务 : 6/10 通过
2.2 可选任务 — 22/26 通过
3 失败详情(仅 required)Approval — 代码规范/审批检查(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请求 xyxinyang 或 zyyzghb 审批此 PR 关联变更: PR 在 pause 流程中新增了 3 条 链接: 查看日志 |
…l termination Replace the old preempted_all + error_response approach in _control_pause with a two-phase design: Phase 1: Block new requests via _rejecting_new_requests (NOT is_paused) - Scheduling loop keeps running so _trigger_abort can process - add_abort_req_ids(ALL) marks all requests for abort - Scheduling loop catches them via _trigger_abort as they cycle through Phase 2: After drain, set is_paused=True to fully stop scheduling loop - Handle scheduler-only stragglers with direct _send_error_response - Wait for output queue empty, then reset Depends-on: PaddlePaddle#7615 (refact abort_requests to fire-and-forget)
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-08 19:13:48
📋 Review 摘要
PR 概述:重构 _control_pause() 实现,通过两阶段设计(先 abort + drain,再 pause 调度循环)解决 RL 场景下 abort pipeline 与 pause 操作的死锁问题及部分结果丢失问题。
变更范围:fastdeploy/engine/common_engine.py、tests/engine/test_common_engine.py
影响面 Tag:[Engine] [RL]
📝 PR 规范检查
PR 各段落(Motivation / Modifications / Usage or Command / Accuracy Tests / Checklist)均完整且有实质内容,Checklist 勾选状态与 diff 吻合,标题 [RL] Tag 格式正确。规范合规,无需修改建议。
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | common_engine.py:1525 |
_wait_inflight_drained() 无超时保护,abort pipeline 异常时将永久阻塞 |
| 🟡 建议 | common_engine.py:1557 |
_wait_output_queue_empty() 直接访问 scheduler.responses,在 global_scheduler 下可能 AttributeError |
| ❓ 疑问 | common_engine.py:1482 |
Phase 4 双标志位修改非原子,_rejecting_new_requests = False 在锁外执行 |
总体评价
新的两阶段 pause 设计思路清晰,有效解决了死锁和部分结果丢失的问题,测试用例也同步更新。主要建议关注 _wait_inflight_drained() 的无超时风险(生产环境一旦 abort pipeline 异常将永久挂起),以及 _wait_output_queue_empty() 对非 local_scheduler 实现的兼容性。
| def _wait_inflight_drained(self): | ||
| """ | ||
| Wait until resource_manager.requests is completely empty. | ||
| No timeout — abort pipeline will complete. Aligned with SGLang's poll-until-drained. |
There was a problem hiding this comment.
🟡 建议 _wait_inflight_drained() 缺少超时保护,存在永久阻塞风险。
若 abort pipeline 出现异常(如 _trigger_abort 未被正确调用、resource_manager 内部状态不一致),该循环将无限等待,导致 pause 调用永远不返回,进而阻塞整个控制线程。原有实现中所有等待点均有 60s 超时保护,本次移除属于回归。
建议添加最大超时并在超时时抛出异常或降级处理:
max_timeout = 120 # seconds
while self.resource_manager.requests:
elapsed = time.time() - start_time
if elapsed > max_timeout:
self.llm_logger.error(
f"Abort drain timed out after {max_timeout}s: "
f"{len(self.resource_manager.requests)} requests still remain!"
)
raise TimeoutError(f"Abort drain did not complete within {max_timeout}s")
...| warned = False | ||
| while True: | ||
| with self.scheduler.mutex: | ||
| has_pending = bool(self.scheduler.responses) or bool(self.scheduler.batch_responses_per_step) |
There was a problem hiding this comment.
🟡 建议 _wait_output_queue_empty() 直接访问 self.scheduler.responses 和 self.scheduler.batch_responses_per_step,对 global_scheduler 存在 AttributeError 风险。
local_scheduler.py 中两个字段均存在,但 global_scheduler.py 中只有 self.local_responses(无 self.responses),dp_scheduler.py 需单独确认。若 engine 配置使用 global_scheduler,此处会抛出 AttributeError 导致 pause 流程中断。
建议通过 hasattr 防御,或在 scheduler 基类/接口中统一暴露 pending_responses() 方法:
# 方案1:hasattr 防御
has_pending = (
(hasattr(self.scheduler, 'responses') and bool(self.scheduler.responses)) or
(hasattr(self.scheduler, 'batch_responses_per_step') and bool(self.scheduler.batch_responses_per_step))
)
# 方案2:在 scheduler 基类中添加 has_pending_responses() 接口| # Phase 4: Fully pause the scheduling loop | ||
| with self._pause_cond: | ||
| self.is_paused = True | ||
| self._rejecting_new_requests = False |
There was a problem hiding this comment.
❓ 疑问 _rejecting_new_requests = False 在 _pause_cond 锁外执行,与 is_paused = True 非原子。
当前 Phase 4 的操作是:
with self._pause_cond:
self.is_paused = True # 锁内
# 锁已释放
self._rejecting_new_requests = False # 锁外在两条语句之间存在短暂窗口:is_paused = True 已设置但 _rejecting_new_requests 仍为 True;此时若有并发调用(虽然实际路径极少),状态可能短暂不一致。
建议将两个标志位的修改合并在同一锁内完成:
with self._pause_cond:
self.is_paused = True
self._rejecting_new_requests = False
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #7753 +/- ##
==========================================
Coverage ? 71.56%
==========================================
Files ? 396
Lines ? 55609
Branches ? 8694
==========================================
Hits ? 39797
Misses ? 13074
Partials ? 2738
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Depends-on: #7615 (refact abort_requests to fire-and-forget)
Motivation
In RL scenarios, the upstream framework calls
abort_requestfollowed bypauseto stop the engine. The old_control_pauseimplementation had two critical issues:Lost partial results:
preempted_all()+_send_error_response(500)discarded already-inferred tokens, returning error instead of partial results to clients.Deadlock with abort pipeline: Setting
is_paused=Trueat the start blocked the scheduling loop (_pause_cond.wait_for), which prevented_trigger_abortfrom processing abort requests — causing a 30s timeout deadlock.The new design separates "reject new requests" (
_rejecting_new_requests) from "pause scheduling loop" (is_paused), allowing the abort pipeline to complete naturally before engine state reset. This ensures partial inference results are returned to clients viatoken_processor._put_abort_results(200 "Aborted") through the normal output path.Modifications
fastdeploy/engine/common_engine.pyself._rejecting_new_requests = False__init__to decouple request rejection from scheduling loop pauseif self.is_paused or self._rejecting_new_requests:_control_pause()rewrite_wait_inflight_drained()resource_manager.requestsis empty (no timeout, aligned with SGLang)_wait_output_queue_empty()Execution flow
Usage or Command
Accuracy Tests
Functional verification with 20 concurrent streaming requests (max_num_seqs=8):
_rejecting_new_requestswindow rejectionChecklist
[RL],[Engine]pre-commitbefore commit.test_control_pause_and_resume_paths)releasebranch, make sure the PR has been submitted to thedevelopbranch.