Skip to content

[RL] pause: use abort pipeline with scheduling loop alive for gracefu…#7753

Open
jackyYang6 wants to merge 1 commit intoPaddlePaddle:developfrom
jackyYang6:rl/pause-abort
Open

[RL] pause: use abort pipeline with scheduling loop alive for gracefu…#7753
jackyYang6 wants to merge 1 commit intoPaddlePaddle:developfrom
jackyYang6:rl/pause-abort

Conversation

@jackyYang6
Copy link
Copy Markdown
Contributor

@jackyYang6 jackyYang6 commented May 8, 2026

Depends-on: #7615 (refact abort_requests to fire-and-forget)

Motivation

In RL scenarios, the upstream framework calls abort_request followed by pause to stop the engine. The old _control_pause implementation had two critical issues:

  1. Lost partial results: preempted_all() + _send_error_response(500) discarded already-inferred tokens, returning error instead of partial results to clients.

  2. Deadlock with abort pipeline: Setting is_paused=True at the start blocked the scheduling loop (_pause_cond.wait_for), which prevented _trigger_abort from processing abort requests — causing a 30s timeout deadlock.

The new design separates "reject new requests" (_rejecting_new_requests) from "pause scheduling loop" (is_paused), allowing the abort pipeline to complete naturally before engine state reset. This ensures partial inference results are returned to clients via token_processor._put_abort_results (200 "Aborted") through the normal output path.

Modifications

fastdeploy/engine/common_engine.py

Change Description
self._rejecting_new_requests = False New flag in __init__ to decouple request rejection from scheduling loop pause
if self.is_paused or self._rejecting_new_requests: Request intake check now covers both states
_control_pause() rewrite Two-phase design: (1) reject + abort + drain, (2) pause + handle stragglers + reset
_wait_inflight_drained() New method: polls until resource_manager.requests is empty (no timeout, aligned with SGLang)
_wait_output_queue_empty() New method: polls until scheduler output queue is consumed (no timeout)

Execution flow

_control_pause:
  ├─ _rejecting_new_requests = True      (block new requests, scheduling loop alive)
  ├─ wait worker queue empty             (current forward pass completes)
  ├─ add_abort_req_ids(ALL)              (scheduling loop processes via _trigger_abort)
  ├─ _wait_inflight_drained()            (poll rm.requests empty)
  ├─ is_paused = True                    (now pause scheduling loop)
  ├─ handle scheduler stragglers         (direct _send_error_response for never-scheduled)
  ├─ _wait_output_queue_empty()          (confirm responses sent via ZMQ)
  └─ scheduler.reset() + cache reset

Usage or Command

# Pause (aborts all inflight requests with partial results, then resets engine)
curl -X POST http://localhost:8180/v1/pause

# Check paused state
curl http://localhost:8180/v1/is_paused

# Resume
curl -X POST http://localhost:8180/v1/resume

Accuracy Tests

Functional verification with 20 concurrent streaming requests (max_num_seqs=8):

Phase Result Details
Pause execution PASS 0.342s (without debug sleep)
Running requests (8) abort PASS ~157 partial tokens returned per request
Waiting requests (12) abort PASS 1 token (EOS) returned
GPU blocks reclaimed PASS 11472 → 11472 (full recovery)
Requests rejected while paused PASS completion_tokens=0
_rejecting_new_requests window rejection PASS Verified with debug sleep
Resume + new requests PASS 5 requests completed normally (3183-4095 tokens)
Metrics: running=0, waiting=0 post-pause PASS

Checklist

  • Add at least a tag in the PR title: [RL], [Engine]
  • Format your code, run pre-commit before commit.
  • Add unit tests. (unit test updated for test_control_pause_and_resume_paths)
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch.

@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented May 8, 2026

Thanks for your contribution!

PaddlePaddle-bot

This comment was marked as outdated.

@PaddlePaddle-bot
Copy link
Copy Markdown

PaddlePaddle-bot commented May 8, 2026

🤖 Paddle-CI-Agent | ci_status_monitor | 2026-05-08 19:46:19

CI报告基于以下代码生成(30分钟更新一次):


1 任务总览

1 个 required 任务失败Approval),需优先处理;另有 3 个 required 任务运行中,待结果。

总执行(rerun次数) 总任务 ✅ 通过 ❌ 失败 ⏳ 运行中 ⏸️ 等待中 跳过
36(0) 36 28 3 4 1 0

2 任务状态汇总

2.1 Required任务 : 6/10 通过

必选任务阻塞合并,失败需优先处理。

状态 任务 耗时 根因 修复建议 日志 重跑
Approval 8s PR问题:新增logging语句,需指定RD审批 请求 xyxinyang 或 zyyzghb 审批 Job -
run_ce_cases - 运行中 - Job -
run_tests_with_coverage - 运行中 - Job -
run_xpu_4cards_cases - 运行中 - Job -
其余 6 个必选任务通过 - - - - -

2.2 可选任务 — 22/26 通过

可选任务不阻塞合并,失败仅供参考。

状态 任务 耗时 日志 重跑
Check PR Template 12s Job -
Trigger Jenkins for PR 13m23s Job -
Run iluvatar Tests / run_iluvatar_cases - Job -
⏸️ CI_HPU - - -
其余 22 个可选任务通过 - - -

3 失败详情(仅 required)

Approval — 代码规范/审批检查(置信度: 高)

Approval

  • 状态: ❌ 失败
  • 错误类型: 代码规范
  • 置信度: 高
  • 根因摘要: PR新增logging语句,需指定RD审批(xyxinyang/zyyzghb)
  • 分析器: 通用分析(fallback)

根因详情:
PR 在 pause 逻辑中新增了 3 条 llm_logger.info() 调用,修改了日志行为。仓库的 check_approval.sh 脚本检测到 diff 中包含 .info( 日志方法修改,触发了强制审批流程(exit code 6 = 1 个待审批错误)。根据仓库规则,修改 logging 行为(.info/.debug/.error/log_request)必须获得指定 FastDeploy RD 成员的批准。

关键日志:

Detected log modification in diff:
+        self.llm_logger.info(f"Pause: aborting {len(all_req_ids)} total requests.")
+            self.llm_logger.info(f"Pause: {len(remaining)} scheduler-only requests, sending abort directly.")
+        self.llm_logger.info("All inflight requests drained.")
0. You must have one FastDeploy RD (xyxinyang(zhouchong), zyyzghb(zhangyongyue)) approval for modifying logging behavior.
There are 1 approved errors.
##[error]Process completed with exit code 6.

修复建议:

  1. 请联系 xyxinyang(zhouchong)zyyzghb(zhangyongyue) 对本 PR 进行审批
  2. 或评估是否可移除/调整新增的 llm_logger.info() 调用以绕过审批检查

修复建议摘要: 请求 xyxinyang 或 zyyzghb 审批此 PR

关联变更: PR 在 pause 流程中新增了 3 条 llm_logger.info() 调用

链接: 查看日志

…l termination

Replace the old preempted_all + error_response approach in _control_pause
with a two-phase design:

Phase 1: Block new requests via _rejecting_new_requests (NOT is_paused)
  - Scheduling loop keeps running so _trigger_abort can process
  - add_abort_req_ids(ALL) marks all requests for abort
  - Scheduling loop catches them via _trigger_abort as they cycle through

Phase 2: After drain, set is_paused=True to fully stop scheduling loop
  - Handle scheduler-only stragglers with direct _send_error_response
  - Wait for output queue empty, then reset

Depends-on: PaddlePaddle#7615 (refact abort_requests to fire-and-forget)
Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Paddle-CI-Agent | pr_review | 2026-05-08 19:13:48

📋 Review 摘要

PR 概述:重构 _control_pause() 实现,通过两阶段设计(先 abort + drain,再 pause 调度循环)解决 RL 场景下 abort pipeline 与 pause 操作的死锁问题及部分结果丢失问题。
变更范围fastdeploy/engine/common_engine.pytests/engine/test_common_engine.py
影响面 Tag[Engine] [RL]

📝 PR 规范检查

PR 各段落(Motivation / Modifications / Usage or Command / Accuracy Tests / Checklist)均完整且有实质内容,Checklist 勾选状态与 diff 吻合,标题 [RL] Tag 格式正确。规范合规,无需修改建议。

问题

级别 文件 概述
🟡 建议 common_engine.py:1525 _wait_inflight_drained() 无超时保护,abort pipeline 异常时将永久阻塞
🟡 建议 common_engine.py:1557 _wait_output_queue_empty() 直接访问 scheduler.responses,在 global_scheduler 下可能 AttributeError
❓ 疑问 common_engine.py:1482 Phase 4 双标志位修改非原子,_rejecting_new_requests = False 在锁外执行

总体评价

新的两阶段 pause 设计思路清晰,有效解决了死锁和部分结果丢失的问题,测试用例也同步更新。主要建议关注 _wait_inflight_drained() 的无超时风险(生产环境一旦 abort pipeline 异常将永久挂起),以及 _wait_output_queue_empty() 对非 local_scheduler 实现的兼容性。

def _wait_inflight_drained(self):
"""
Wait until resource_manager.requests is completely empty.
No timeout — abort pipeline will complete. Aligned with SGLang's poll-until-drained.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _wait_inflight_drained() 缺少超时保护,存在永久阻塞风险。

若 abort pipeline 出现异常(如 _trigger_abort 未被正确调用、resource_manager 内部状态不一致),该循环将无限等待,导致 pause 调用永远不返回,进而阻塞整个控制线程。原有实现中所有等待点均有 60s 超时保护,本次移除属于回归。

建议添加最大超时并在超时时抛出异常或降级处理:

max_timeout = 120  # seconds
while self.resource_manager.requests:
    elapsed = time.time() - start_time
    if elapsed > max_timeout:
        self.llm_logger.error(
            f"Abort drain timed out after {max_timeout}s: "
            f"{len(self.resource_manager.requests)} requests still remain!"
        )
        raise TimeoutError(f"Abort drain did not complete within {max_timeout}s")
    ...

warned = False
while True:
with self.scheduler.mutex:
has_pending = bool(self.scheduler.responses) or bool(self.scheduler.batch_responses_per_step)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _wait_output_queue_empty() 直接访问 self.scheduler.responsesself.scheduler.batch_responses_per_step,对 global_scheduler 存在 AttributeError 风险。

local_scheduler.py 中两个字段均存在,但 global_scheduler.py 中只有 self.local_responses(无 self.responses),dp_scheduler.py 需单独确认。若 engine 配置使用 global_scheduler,此处会抛出 AttributeError 导致 pause 流程中断。

建议通过 hasattr 防御,或在 scheduler 基类/接口中统一暴露 pending_responses() 方法:

# 方案1:hasattr 防御
has_pending = (
    (hasattr(self.scheduler, 'responses') and bool(self.scheduler.responses)) or
    (hasattr(self.scheduler, 'batch_responses_per_step') and bool(self.scheduler.batch_responses_per_step))
)
# 方案2:在 scheduler 基类中添加 has_pending_responses() 接口

# Phase 4: Fully pause the scheduling loop
with self._pause_cond:
self.is_paused = True
self._rejecting_new_requests = False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 _rejecting_new_requests = False_pause_cond 锁外执行,与 is_paused = True 非原子。

当前 Phase 4 的操作是:

with self._pause_cond:
    self.is_paused = True   # 锁内
# 锁已释放
self._rejecting_new_requests = False  # 锁外

在两条语句之间存在短暂窗口:is_paused = True 已设置但 _rejecting_new_requests 仍为 True;此时若有并发调用(虽然实际路径极少),状态可能短暂不一致。

建议将两个标志位的修改合并在同一锁内完成:

with self._pause_cond:
    self.is_paused = True
    self._rejecting_new_requests = False

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 31.11111% with 31 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@8396ef6). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/engine/common_engine.py 31.11% 29 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #7753   +/-   ##
==========================================
  Coverage           ?   71.56%           
==========================================
  Files              ?      396           
  Lines              ?    55609           
  Branches           ?     8694           
==========================================
  Hits               ?    39797           
  Misses             ?    13074           
  Partials           ?     2738           
Flag Coverage Δ
GPU 71.56% <31.11%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants