Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 80 additions & 17 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):

self.is_paused = False # pause request generation
self._pause_cond = threading.Condition()
self._rejecting_new_requests = False # blocks new requests during abort drain

self._ctrl_output_queues = {}
self._ctrl_response_mailboxes = collections.defaultdict(collections.OrderedDict)
Expand Down Expand Up @@ -1325,7 +1326,7 @@ def _insert_zmq_task_to_scheduler(self):
trace_print(LoggingEventName.REQUEST_QUEUE_START, data["request_id"], data.get("user", ""))
self.llm_logger.debug(f"Receive request from api server: {request}")

if self.is_paused:
if self.is_paused or self._rejecting_new_requests:
self.llm_logger.warning(f"Engine is paused, drop request: {request}")
self._send_error_response(
request.request_id,
Expand Down Expand Up @@ -1445,12 +1446,14 @@ def _control_pause(self, control_request: ControlRequest):
if self.is_paused:
self.llm_logger.info("Engine is already paused, no need to pause again.")
return
self.is_paused = True

self.llm_logger.info("Abort running requests.")
# Phase 1: Block new requests but keep scheduling loop running
# (scheduling loop must continue to process _trigger_abort)
self._rejecting_new_requests = True

self.resource_manager.log_status()
# preempted all running reqs. preempted reqs will be append to ResourceManager.waiting queue

# Wait for current worker batch to complete
timeout, count = 60, 0
while self.engine_worker_queue.exist_tasks():
time.sleep(0.001)
Expand All @@ -1461,21 +1464,37 @@ def _control_pause(self, control_request: ControlRequest):
error_msg = f"Emptying engine worker queue timed out after {timeout} seconds, worker may hanged!"
self.llm_logger.error(error_msg)
raise Exception(error_msg)
running_reqs = self.resource_manager.preempted_all()
if len(running_reqs) > 0:
self.llm_logger.info(f"Total {len(running_reqs)} requests need to be aborted.")
self.resource_manager.get_real_bsz()
self.engine_worker_queue.put_tasks((running_reqs, self.resource_manager.real_bsz))
self.resource_manager.wait_worker_inflight_requests_finish(timeout=60)
# self.engine_worker_queue.clear_data()

# Phase 2: Trigger abort for ALL known requests
# Scheduling loop picks them up via _trigger_abort when they enter resource_manager
all_req_ids = list(set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys()))
self.llm_logger.info(f"Pause: aborting {len(all_req_ids)} total requests.")
if all_req_ids:
self.resource_manager.add_abort_req_ids(all_req_ids)

# Phase 3: Wait for resource_manager to drain
# (all requests that enter rm will be caught by _trigger_abort)
self._wait_inflight_drained()

# Phase 4: Fully pause the scheduling loop
with self._pause_cond:
self.is_paused = True
self._rejecting_new_requests = False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 _rejecting_new_requests = False_pause_cond 锁外执行,与 is_paused = True 非原子。

当前 Phase 4 的操作是:

with self._pause_cond:
    self.is_paused = True   # 锁内
# 锁已释放
self._rejecting_new_requests = False  # 锁外

在两条语句之间存在短暂窗口:is_paused = True 已设置但 _rejecting_new_requests 仍为 True;此时若有并发调用(虽然实际路径极少),状态可能短暂不一致。

建议将两个标志位的修改合并在同一锁内完成:

with self._pause_cond:
    self.is_paused = True
    self._rejecting_new_requests = False


# Handle any requests remaining in scheduler (never pulled into rm)
remaining = set(self.scheduler.requests.keys())
if remaining:
self.llm_logger.info(f"Pause: {len(remaining)} scheduler-only requests, sending abort directly.")
for req_id in remaining:
self._send_error_response(req_id, "Aborted", error_code=200)
self.resource_manager.waiting_abort_req_id_set.discard(req_id)

# Phase 5: Wait for output queue to be consumed (responses sent via ZMQ)
self._wait_output_queue_empty()

# Phase 6: Reset
self.token_processor.clear_data()
self.resource_manager.log_status()

# abort inflight requests to user
inflight_requests = self.scheduler.get_inflight_requests()
self.llm_logger.info(f"Abort inflight requests (total {len(inflight_requests)}).")
for req in inflight_requests:
self._send_error_response(req.request_id, "Request is aborted since engine is paused.")
self.scheduler.reset()

if envs.ENABLE_V1_KVCACHE_MANAGER:
Expand All @@ -1500,6 +1519,50 @@ def _control_pause(self, control_request: ControlRequest):
self.llm_logger.info("Successfully paused request generation.")
return None

def _wait_inflight_drained(self):
"""
Wait until resource_manager.requests is completely empty.
No timeout — abort pipeline will complete. Aligned with SGLang's poll-until-drained.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _wait_inflight_drained() 缺少超时保护,存在永久阻塞风险。

若 abort pipeline 出现异常(如 _trigger_abort 未被正确调用、resource_manager 内部状态不一致),该循环将无限等待,导致 pause 调用永远不返回,进而阻塞整个控制线程。原有实现中所有等待点均有 60s 超时保护,本次移除属于回归。

建议添加最大超时并在超时时抛出异常或降级处理:

max_timeout = 120  # seconds
while self.resource_manager.requests:
    elapsed = time.time() - start_time
    if elapsed > max_timeout:
        self.llm_logger.error(
            f"Abort drain timed out after {max_timeout}s: "
            f"{len(self.resource_manager.requests)} requests still remain!"
        )
        raise TimeoutError(f"Abort drain did not complete within {max_timeout}s")
    ...

"""
start_time = time.time()
warned_10s = False
warned_30s = False
while self.resource_manager.requests:
elapsed = time.time() - start_time
if elapsed > 30 and not warned_30s:
warned_30s = True
self.llm_logger.warning(
f"Abort drain stalled ({elapsed:.1f}s): "
f"{len(self.resource_manager.requests)} requests remaining, "
f"waiting_abort={len(self.resource_manager.waiting_abort_req_id_set)}, "
f"to_be_aborted={len(self.resource_manager.to_be_aborted_req_id_set)}"
)
elif elapsed > 10 and not warned_10s:
warned_10s = True
self.llm_logger.warning(
f"Abort drain slow ({elapsed:.1f}s): " f"{len(self.resource_manager.requests)} requests remaining"
)
time.sleep(0.005)
self.llm_logger.info("All inflight requests drained.")

def _wait_output_queue_empty(self):
"""
Wait until scheduler output queue is consumed by the output thread.
Ensures all abort responses have been sent via ZMQ before reset.
"""
start_time = time.time()
warned = False
while True:
with self.scheduler.mutex:
has_pending = bool(self.scheduler.responses) or bool(self.scheduler.batch_responses_per_step)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _wait_output_queue_empty() 直接访问 self.scheduler.responsesself.scheduler.batch_responses_per_step,对 global_scheduler 存在 AttributeError 风险。

local_scheduler.py 中两个字段均存在,但 global_scheduler.py 中只有 self.local_responses(无 self.responses),dp_scheduler.py 需单独确认。若 engine 配置使用 global_scheduler,此处会抛出 AttributeError 导致 pause 流程中断。

建议通过 hasattr 防御,或在 scheduler 基类/接口中统一暴露 pending_responses() 方法:

# 方案1:hasattr 防御
has_pending = (
    (hasattr(self.scheduler, 'responses') and bool(self.scheduler.responses)) or
    (hasattr(self.scheduler, 'batch_responses_per_step') and bool(self.scheduler.batch_responses_per_step))
)
# 方案2:在 scheduler 基类中添加 has_pending_responses() 接口

if not has_pending:
return
elapsed = time.time() - start_time
if elapsed > 5 and not warned:
warned = True
self.llm_logger.warning(f"Output queue drain slow ({elapsed:.1f}s)")
time.sleep(0.005)

def _control_resume(self, control_request: ControlRequest) -> Optional[dict]:
"""Control function for resuming request generation.

Expand Down
20 changes: 14 additions & 6 deletions tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,22 +1137,30 @@ def test_control_pause_and_resume_paths(self):
eng = self._make_mixed_engine()
eng.is_paused = False
eng._pause_cond = threading.Condition()
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False), put_tasks=Mock())
eng.engine_worker_queue = Mock(exist_tasks=Mock(return_value=False))
eng.resource_manager = Mock(
preempted_all=Mock(return_value=[Request(request_id="r1", prompt_token_ids=[1], prompt_token_ids_len=1)]),
get_real_bsz=Mock(),
wait_worker_inflight_requests_finish=Mock(),
requests={"r1": Mock(output_token_ids=[1, 2, 3])},
waiting_abort_req_id_set=set(),
to_be_aborted_req_id_set=set(),
add_abort_req_ids=Mock(),
log_status=Mock(),
cache_manager=Mock(reset=Mock()),
real_bsz=1,
)
eng.token_processor = Mock(clear_data=Mock())
eng.scheduler = Mock(get_inflight_requests=Mock(return_value=[]), reset=Mock())
mock_scheduler = Mock(reset=Mock())
mock_scheduler.requests = {}
mock_scheduler.mutex = threading.Lock()
mock_scheduler.responses = {}
mock_scheduler.batch_responses_per_step = []
eng.scheduler = mock_scheduler
eng._send_error_response = Mock()
eng._wait_inflight_drained = Mock()
eng._wait_output_queue_empty = Mock()

with patch("fastdeploy.engine.common_engine.envs.ENABLE_V1_KVCACHE_SCHEDULER", True):
eng._control_pause(ControlRequest(request_id="ctrl1", method="pause"))
self.assertTrue(eng.is_paused)
eng.resource_manager.add_abort_req_ids.assert_called_once()

eng._control_resume(ControlRequest(request_id="ctrl2", method="resume"))
self.assertFalse(eng.is_paused)
Expand Down
Loading