fix(async_engine): make safe_run cancellation cleanup reliable with shield and SafeRunException#4439
fix(async_engine): make safe_run cancellation cleanup reliable with shield and SafeRunException#4439lvhan028 wants to merge 1 commit intoInternLM:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR aims to make AsyncEngine.safe_run() cleanup more reliable when the surrounding task is cancelled, by shielding cleanup calls and re-raising a SafeRunException so the session manager can treat the cancellation as handled.
Changes:
- Replaces
session.async_abort()with directhandle.async_cancel()(shielded) insidesafe_run(). - For PyTorch backend, wraps
handle.async_end()inasyncio.shield()and suppresses its exceptions. - Adds explanatory comments and raises
SafeRunExceptionafter cleanup to avoid propagating the original cancellation/exception.
Comments suppressed due to low confidence (1)
lmdeploy/serve/core/async_engine.py:295
- The current cancellation-cleanup logic doesn’t actually guarantee cleanup completes under Python 3.11+ task cancellation semantics. If
safe_runis entered via cancellation,await asyncio.shield(...)can raiseCancelledErrorimmediately; you catch and continue, but you never await the underlying cancel/end task to completion. Also, because cancellation is “sticky” in 3.11+, the laterawait generator.aclose()infinallycan raiseCancelledErrorbefore closing the generator unless the task is explicitly uncancelled or the close is also shielded. Consider creating tasks for cleanup, awaiting them to completion even after aCancelledError, and clearing the cancellation state (e.g., viaasyncio.current_task().uncancel()where available) soaclose()and metrics decrement reliably run.
# Use asyncio.shield to protect cleanup coroutines from being cancelled.
# When a task is in cancelling state, bare `await` raises CancelledError
# immediately. shield ensures the inner coroutine runs to completion.
# The outer `except (asyncio.CancelledError, Exception)` catches the
# CancelledError that shield itself re-raises at the await point.
try:
await asyncio.shield(handle.async_cancel(session.session_id))
except (asyncio.CancelledError, Exception) as cancel_e:
logger.debug(f'[safe_run] session {session.session_id} async_cancel exception caught: {cancel_e}')
if self.backend == 'pytorch':
logger.info(f'[safe_run] session {session.session_id} ending session')
try:
await asyncio.shield(handle.async_end(session.session_id))
except (asyncio.CancelledError, Exception) as end_e:
logger.debug(f'[safe_run] session {session.session_id} async_end exception caught: {end_e}')
# Wrap as SafeRunException so that the outer `request_handle` context
# manager in `session_manager.py` can distinguish a handled cancellation (caught by
# `except SafeRunException: pass`) from an unexpected CancelledError.
# Without this, the suppressed exception leaves the task in cancelling
# state, causing a second CancelledError at the next await point.
raise SafeRunException(f'Safe run exception for session {session.session_id}') from e
finally:
await generator.aclose()
metrics_processor.decrease_api_routed_requests()
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try: | ||
| await asyncio.shield(handle.async_cancel(session.session_id)) | ||
| except (asyncio.CancelledError, Exception) as cancel_e: | ||
| logger.debug(f'[safe_run] session {session.session_id} async_cancel exception caught: {cancel_e}') | ||
| if self.backend == 'pytorch': | ||
| await handle.async_end(session.session_id) | ||
| logger.info(f'[safe_run] session {session.session_id} ending session') | ||
| try: | ||
| await asyncio.shield(handle.async_end(session.session_id)) | ||
| except (asyncio.CancelledError, Exception) as end_e: | ||
| logger.debug(f'[safe_run] session {session.session_id} async_end exception caught: {end_e}') |
There was a problem hiding this comment.
Exceptions from handle.async_cancel() / handle.async_end() are now swallowed and only logged at DEBUG level. If these cleanup operations fail (e.g., RPC failure in mp engine), the session may remain alive/leak resources with little visibility. Consider logging these as warning/error (ideally with stack trace) and/or recording a failure metric so operational issues are detectable.
| # Use asyncio.shield to protect cleanup coroutines from being cancelled. | ||
| # When a task is in cancelling state, bare `await` raises CancelledError | ||
| # immediately. shield ensures the inner coroutine runs to completion. | ||
| # The outer `except (asyncio.CancelledError, Exception)` catches the | ||
| # CancelledError that shield itself re-raises at the await point. |
There was a problem hiding this comment.
The explanatory comment around asyncio.shield is misleading: shield prevents the inner awaitable from being cancelled, but it does not ensure the outer task waits for it to finish once the outer task is cancelled (it can still raise CancelledError immediately). Please adjust the comment to match actual asyncio.shield behavior so future changes don’t rely on an incorrect guarantee.
| # Use asyncio.shield to protect cleanup coroutines from being cancelled. | |
| # When a task is in cancelling state, bare `await` raises CancelledError | |
| # immediately. shield ensures the inner coroutine runs to completion. | |
| # The outer `except (asyncio.CancelledError, Exception)` catches the | |
| # CancelledError that shield itself re-raises at the await point. | |
| # Use asyncio.shield to prevent the inner cleanup coroutine itself from | |
| # being cancelled when this task is cancelled. Note that awaiting a | |
| # shielded coroutine can still raise CancelledError immediately if the | |
| # *outer* task is already in a cancelling state; shield only protects | |
| # the wrapped awaitable from receiving the cancellation. |
No description provided.