Skip to content

fix(async_engine): make safe_run cancellation cleanup reliable with shield and SafeRunException#4439

Open
lvhan028 wants to merge 1 commit intoInternLM:mainfrom
lvhan028:fix-abort
Open

fix(async_engine): make safe_run cancellation cleanup reliable with shield and SafeRunException#4439
lvhan028 wants to merge 1 commit intoInternLM:mainfrom
lvhan028:fix-abort

Conversation

@lvhan028
Copy link
Collaborator

No description provided.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to make AsyncEngine.safe_run() cleanup more reliable when the surrounding task is cancelled, by shielding cleanup calls and re-raising a SafeRunException so the session manager can treat the cancellation as handled.

Changes:

  • Replaces session.async_abort() with direct handle.async_cancel() (shielded) inside safe_run().
  • For PyTorch backend, wraps handle.async_end() in asyncio.shield() and suppresses its exceptions.
  • Adds explanatory comments and raises SafeRunException after cleanup to avoid propagating the original cancellation/exception.
Comments suppressed due to low confidence (1)

lmdeploy/serve/core/async_engine.py:295

  • The current cancellation-cleanup logic doesn’t actually guarantee cleanup completes under Python 3.11+ task cancellation semantics. If safe_run is entered via cancellation, await asyncio.shield(...) can raise CancelledError immediately; you catch and continue, but you never await the underlying cancel/end task to completion. Also, because cancellation is “sticky” in 3.11+, the later await generator.aclose() in finally can raise CancelledError before closing the generator unless the task is explicitly uncancelled or the close is also shielded. Consider creating tasks for cleanup, awaiting them to completion even after a CancelledError, and clearing the cancellation state (e.g., via asyncio.current_task().uncancel() where available) so aclose() and metrics decrement reliably run.
            # Use asyncio.shield to protect cleanup coroutines from being cancelled.
            # When a task is in cancelling state, bare `await` raises CancelledError
            # immediately. shield ensures the inner coroutine runs to completion.
            # The outer `except (asyncio.CancelledError, Exception)` catches the
            # CancelledError that shield itself re-raises at the await point.
            try:
                await asyncio.shield(handle.async_cancel(session.session_id))
            except (asyncio.CancelledError, Exception) as cancel_e:
                logger.debug(f'[safe_run] session {session.session_id} async_cancel exception caught: {cancel_e}')
            if self.backend == 'pytorch':
                logger.info(f'[safe_run] session {session.session_id} ending session')
                try:
                    await asyncio.shield(handle.async_end(session.session_id))
                except (asyncio.CancelledError, Exception) as end_e:
                    logger.debug(f'[safe_run] session {session.session_id} async_end exception caught: {end_e}')
            # Wrap as SafeRunException so that the outer `request_handle` context
            # manager in `session_manager.py` can distinguish a handled cancellation (caught by
            # `except SafeRunException: pass`) from an unexpected CancelledError.
            # Without this, the suppressed exception leaves the task in cancelling
            # state, causing a second CancelledError at the next await point.
            raise SafeRunException(f'Safe run exception for session {session.session_id}') from e
        finally:
            await generator.aclose()
            metrics_processor.decrease_api_routed_requests()

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +277 to +286
try:
await asyncio.shield(handle.async_cancel(session.session_id))
except (asyncio.CancelledError, Exception) as cancel_e:
logger.debug(f'[safe_run] session {session.session_id} async_cancel exception caught: {cancel_e}')
if self.backend == 'pytorch':
await handle.async_end(session.session_id)
logger.info(f'[safe_run] session {session.session_id} ending session')
try:
await asyncio.shield(handle.async_end(session.session_id))
except (asyncio.CancelledError, Exception) as end_e:
logger.debug(f'[safe_run] session {session.session_id} async_end exception caught: {end_e}')
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions from handle.async_cancel() / handle.async_end() are now swallowed and only logged at DEBUG level. If these cleanup operations fail (e.g., RPC failure in mp engine), the session may remain alive/leak resources with little visibility. Consider logging these as warning/error (ideally with stack trace) and/or recording a failure metric so operational issues are detectable.

Copilot uses AI. Check for mistakes.
Comment on lines +272 to +276
# Use asyncio.shield to protect cleanup coroutines from being cancelled.
# When a task is in cancelling state, bare `await` raises CancelledError
# immediately. shield ensures the inner coroutine runs to completion.
# The outer `except (asyncio.CancelledError, Exception)` catches the
# CancelledError that shield itself re-raises at the await point.
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanatory comment around asyncio.shield is misleading: shield prevents the inner awaitable from being cancelled, but it does not ensure the outer task waits for it to finish once the outer task is cancelled (it can still raise CancelledError immediately). Please adjust the comment to match actual asyncio.shield behavior so future changes don’t rely on an incorrect guarantee.

Suggested change
# Use asyncio.shield to protect cleanup coroutines from being cancelled.
# When a task is in cancelling state, bare `await` raises CancelledError
# immediately. shield ensures the inner coroutine runs to completion.
# The outer `except (asyncio.CancelledError, Exception)` catches the
# CancelledError that shield itself re-raises at the await point.
# Use asyncio.shield to prevent the inner cleanup coroutine itself from
# being cancelled when this task is cancelled. Note that awaiting a
# shielded coroutine can still raise CancelledError immediately if the
# *outer* task is already in a cancelling state; shield only protects
# the wrapped awaitable from receiving the cancellation.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants