-
Notifications
You must be signed in to change notification settings - Fork 743
[RL] pause: use abort pipeline with scheduling loop alive for gracefu… #7753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -142,6 +142,7 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False): | |
|
|
||
| self.is_paused = False # pause request generation | ||
| self._pause_cond = threading.Condition() | ||
| self._rejecting_new_requests = False # blocks new requests during abort drain | ||
|
|
||
| self._ctrl_output_queues = {} | ||
| self._ctrl_response_mailboxes = collections.defaultdict(collections.OrderedDict) | ||
|
|
@@ -1325,7 +1326,7 @@ def _insert_zmq_task_to_scheduler(self): | |
| trace_print(LoggingEventName.REQUEST_QUEUE_START, data["request_id"], data.get("user", "")) | ||
| self.llm_logger.debug(f"Receive request from api server: {request}") | ||
|
|
||
| if self.is_paused: | ||
| if self.is_paused or self._rejecting_new_requests: | ||
| self.llm_logger.warning(f"Engine is paused, drop request: {request}") | ||
| self._send_error_response( | ||
| request.request_id, | ||
|
|
@@ -1445,12 +1446,14 @@ def _control_pause(self, control_request: ControlRequest): | |
| if self.is_paused: | ||
| self.llm_logger.info("Engine is already paused, no need to pause again.") | ||
| return | ||
| self.is_paused = True | ||
|
|
||
| self.llm_logger.info("Abort running requests.") | ||
| # Phase 1: Block new requests but keep scheduling loop running | ||
| # (scheduling loop must continue to process _trigger_abort) | ||
| self._rejecting_new_requests = True | ||
|
|
||
| self.resource_manager.log_status() | ||
| # preempted all running reqs. preempted reqs will be append to ResourceManager.waiting queue | ||
|
|
||
| # Wait for current worker batch to complete | ||
| timeout, count = 60, 0 | ||
| while self.engine_worker_queue.exist_tasks(): | ||
| time.sleep(0.001) | ||
|
|
@@ -1461,21 +1464,37 @@ def _control_pause(self, control_request: ControlRequest): | |
| error_msg = f"Emptying engine worker queue timed out after {timeout} seconds, worker may hanged!" | ||
| self.llm_logger.error(error_msg) | ||
| raise Exception(error_msg) | ||
| running_reqs = self.resource_manager.preempted_all() | ||
| if len(running_reqs) > 0: | ||
| self.llm_logger.info(f"Total {len(running_reqs)} requests need to be aborted.") | ||
| self.resource_manager.get_real_bsz() | ||
| self.engine_worker_queue.put_tasks((running_reqs, self.resource_manager.real_bsz)) | ||
| self.resource_manager.wait_worker_inflight_requests_finish(timeout=60) | ||
| # self.engine_worker_queue.clear_data() | ||
|
|
||
| # Phase 2: Trigger abort for ALL known requests | ||
| # Scheduling loop picks them up via _trigger_abort when they enter resource_manager | ||
| all_req_ids = list(set(self.resource_manager.requests.keys()) | set(self.scheduler.requests.keys())) | ||
| self.llm_logger.info(f"Pause: aborting {len(all_req_ids)} total requests.") | ||
| if all_req_ids: | ||
| self.resource_manager.add_abort_req_ids(all_req_ids) | ||
|
|
||
| # Phase 3: Wait for resource_manager to drain | ||
| # (all requests that enter rm will be caught by _trigger_abort) | ||
| self._wait_inflight_drained() | ||
|
|
||
| # Phase 4: Fully pause the scheduling loop | ||
| with self._pause_cond: | ||
| self.is_paused = True | ||
| self._rejecting_new_requests = False | ||
|
|
||
| # Handle any requests remaining in scheduler (never pulled into rm) | ||
| remaining = set(self.scheduler.requests.keys()) | ||
| if remaining: | ||
| self.llm_logger.info(f"Pause: {len(remaining)} scheduler-only requests, sending abort directly.") | ||
| for req_id in remaining: | ||
| self._send_error_response(req_id, "Aborted", error_code=200) | ||
| self.resource_manager.waiting_abort_req_id_set.discard(req_id) | ||
|
|
||
| # Phase 5: Wait for output queue to be consumed (responses sent via ZMQ) | ||
| self._wait_output_queue_empty() | ||
|
|
||
| # Phase 6: Reset | ||
| self.token_processor.clear_data() | ||
| self.resource_manager.log_status() | ||
|
|
||
| # abort inflight requests to user | ||
| inflight_requests = self.scheduler.get_inflight_requests() | ||
| self.llm_logger.info(f"Abort inflight requests (total {len(inflight_requests)}).") | ||
| for req in inflight_requests: | ||
| self._send_error_response(req.request_id, "Request is aborted since engine is paused.") | ||
| self.scheduler.reset() | ||
|
|
||
| if envs.ENABLE_V1_KVCACHE_MANAGER: | ||
|
|
@@ -1500,6 +1519,50 @@ def _control_pause(self, control_request: ControlRequest): | |
| self.llm_logger.info("Successfully paused request generation.") | ||
| return None | ||
|
|
||
| def _wait_inflight_drained(self): | ||
| """ | ||
| Wait until resource_manager.requests is completely empty. | ||
| No timeout — abort pipeline will complete. Aligned with SGLang's poll-until-drained. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 建议 若 abort pipeline 出现异常(如 建议添加最大超时并在超时时抛出异常或降级处理: max_timeout = 120 # seconds
while self.resource_manager.requests:
elapsed = time.time() - start_time
if elapsed > max_timeout:
self.llm_logger.error(
f"Abort drain timed out after {max_timeout}s: "
f"{len(self.resource_manager.requests)} requests still remain!"
)
raise TimeoutError(f"Abort drain did not complete within {max_timeout}s")
... |
||
| """ | ||
| start_time = time.time() | ||
| warned_10s = False | ||
| warned_30s = False | ||
| while self.resource_manager.requests: | ||
| elapsed = time.time() - start_time | ||
| if elapsed > 30 and not warned_30s: | ||
| warned_30s = True | ||
| self.llm_logger.warning( | ||
| f"Abort drain stalled ({elapsed:.1f}s): " | ||
| f"{len(self.resource_manager.requests)} requests remaining, " | ||
| f"waiting_abort={len(self.resource_manager.waiting_abort_req_id_set)}, " | ||
| f"to_be_aborted={len(self.resource_manager.to_be_aborted_req_id_set)}" | ||
| ) | ||
| elif elapsed > 10 and not warned_10s: | ||
| warned_10s = True | ||
| self.llm_logger.warning( | ||
| f"Abort drain slow ({elapsed:.1f}s): " f"{len(self.resource_manager.requests)} requests remaining" | ||
| ) | ||
| time.sleep(0.005) | ||
| self.llm_logger.info("All inflight requests drained.") | ||
|
|
||
| def _wait_output_queue_empty(self): | ||
| """ | ||
| Wait until scheduler output queue is consumed by the output thread. | ||
| Ensures all abort responses have been sent via ZMQ before reset. | ||
| """ | ||
| start_time = time.time() | ||
| warned = False | ||
| while True: | ||
| with self.scheduler.mutex: | ||
| has_pending = bool(self.scheduler.responses) or bool(self.scheduler.batch_responses_per_step) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 建议
建议通过 # 方案1:hasattr 防御
has_pending = (
(hasattr(self.scheduler, 'responses') and bool(self.scheduler.responses)) or
(hasattr(self.scheduler, 'batch_responses_per_step') and bool(self.scheduler.batch_responses_per_step))
)
# 方案2:在 scheduler 基类中添加 has_pending_responses() 接口 |
||
| if not has_pending: | ||
| return | ||
| elapsed = time.time() - start_time | ||
| if elapsed > 5 and not warned: | ||
| warned = True | ||
| self.llm_logger.warning(f"Output queue drain slow ({elapsed:.1f}s)") | ||
| time.sleep(0.005) | ||
|
|
||
| def _control_resume(self, control_request: ControlRequest) -> Optional[dict]: | ||
| """Control function for resuming request generation. | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ 疑问
_rejecting_new_requests = False在_pause_cond锁外执行,与is_paused = True非原子。当前 Phase 4 的操作是:
在两条语句之间存在短暂窗口:
is_paused = True已设置但_rejecting_new_requests仍为 True;此时若有并发调用(虽然实际路径极少),状态可能短暂不一致。建议将两个标志位的修改合并在同一锁内完成: