Skip to content

[BUG] BedrockModel.stream leaks inner task on outer cancellation, causing "Task exception was never retrieved" #2266

@OrangeNebula

Description

@OrangeNebula

Checks

  • I have updated to the latest minor and patch version of Strands
  • I have checked the documentation and this is not expected behavior
  • I have searched existing issues and there are no duplicates of my issue

Strands Version

1.38.0 (latest release as of 2026-04-30).
Verified that BedrockModel.stream is unchanged on main HEAD (identical to 1.38.0 and 1.35.0 for this method).

Python Version

3.13

Operating System

Linux

Installation Method

pip

Steps to Reproduce

The bug is in how BedrockModel.stream manages the lifecycle of the background task it creates internally. Any outer cancellation of the generator exposes it. Three independent scenarios reproduce the same symptom:

Scenario 1 — Outer timeout via asyncio.wait_for

A standard Python idiom for bounding the time to the first streaming event:

import asyncio
from strands.models.bedrock import BedrockModel


async def main() -> None:
    model = BedrockModel()  # uses default model id
    gen = model.stream(
        messages=[{"role": "user", "content": [{"text": "hello"}]}]
    )
    ait = gen.__aiter__()
    try:
        # Unrealistically short deadline to make the test deterministic:
        # any outer timeout shorter than the actual first-chunk latency reproduces.
        await asyncio.wait_for(ait.__anext__(), timeout=0.001)
    except asyncio.TimeoutError:
        pass  # caller handles the timeout

    # The async generator is now abandoned. The internal asyncio.Task
    # that stream() created via asyncio.create_task(asyncio.to_thread(...))
    # is still running. When boto3's converse_stream() eventually
    # finishes or hits its read_timeout, the exception (or successful
    # result) lands on that orphan task and is never consumed.
    await asyncio.sleep(65)  # wait long enough for boto3 read_timeout


asyncio.run(main())

Scenario 2 — Consumer break + generator GC

Equally standard in web servers / UI code that wants only the first few events:

async for event in gen:
    # Stop early, generator is GC'd before sending completion.
    break

Scenario 3 — Client disconnect in a streaming HTTP endpoint

When a FastAPI/Starlette handler streams events from agent.stream_async(...) and the client disconnects mid-stream, the underlying coroutine is cancelled. The same orphan task remains inside BedrockModel.stream.

In all three scenarios, if the background boto3 call eventually raises (e.g. botocore.exceptions.ReadTimeoutError after read_timeout, or a ClientError), asyncio's default handler emits:

Task exception was never retrieved
future: <Task finished name='Task-N'
    coro=<to_thread() done, defined at .../asyncio/threads.py:12>
    exception=ReadTimeoutError("Read timeout on endpoint URL:
        'https://bedrock-runtime.<region>.amazonaws.com/model/<model_id>/converse-stream'")>

No changes to SDK source are made. The reproducer imports BedrockModel as-is from strands.models.bedrock.

Expected Behavior

When the async generator returned by BedrockModel.stream is abandoned, cancelled, or closed before the normal completion path is reached, the SDK should ensure the background task it created internally is handled such that its exception (if any) is consumed. No orphan-task warning should surface to application logs.

Cancelling the underlying boto3 worker thread is not expected — that is a known Python asyncio.to_thread limitation. The ask is cleanup of the task object, not interruption of the thread.

Actual Behavior

BedrockModel.stream in strands/models/bedrock.py (verified against tag v1.38.0 and main HEAD) creates an independent task and only awaits it on the successful path:

# strands/models/bedrock.py (v1.38.0, unchanged on main)
async def stream(self, messages, ...):
    def callback(event: StreamEvent | None = None) -> None:
        loop.call_soon_threadsafe(queue.put_nowait, event)

    loop = asyncio.get_event_loop()
    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
    ...
    thread = asyncio.to_thread(self._stream, callback, messages, ...)
    task = asyncio.create_task(thread)     # independent Task

    while True:
        event = await queue.get()          # cancellation point
        if event is None:
            break
        yield event

    await task                              # only reached on the happy path

There is no try/except/finally wrapping the loop. When the outer consumer leaves the generator via CancelledError, GeneratorExit, or any exception:

  1. The async generator terminates without executing await task.
  2. asyncio.to_thread cannot propagate cancellation to the worker thread, so self.client.converse_stream(**request) keeps running until it completes or hits boto3's read_timeout.
  3. When the thread raises, the exception lands on task. With no awaiter, asyncio schedules a __del__-time warning via its default exception handler.

This is observable by the user as the "Task exception was never retrieved" message and, in debug mode, a full traceback.

Additional Context

Why this matters beyond a single codebase:

  • asyncio.wait_for is the idiomatic way to enforce any deadline on awaitables in Python, including "time to first event" on a streaming LLM call. Any Strands user who wraps stream() with wait_for hits this.
  • Client disconnects on streaming HTTP endpoints are common. FastAPI/Starlette propagate disconnects as CancelledError on the handler coroutine — same code path.
  • Async generators are normally safe to abandon. Python's async for ...: break pattern is common and expected to work without leaking resources. SDK consumers do not expect this to cause stderr noise hours later.
  • Agent.cancel() (introduced in feat: add CancellationToken for graceful agent execution cancellation #1772) does not address this layer. It sets a threading.Event checked inside process_stream's chunk loop. Before the first chunk arrives, or when the caller is not using Agent at all (using BedrockModel directly), the signal has no effect on the background task.
  • Python asyncio.to_thread cannot cancel the worker thread — this is a language constraint, not a Strands bug. The fix is to consume the task's exception, not to interrupt the thread.

Why Agent.cancel() alone is not a sufficient answer

  • The SDK supports direct use of BedrockModel.stream outside an Agent context (custom execution loops, testing, multi-provider wrappers). Those users have no Agent.cancel() to call.
  • Even inside an Agent, the cancellation checkpoint in process_stream requires that at least one chunk has been received. A slow first token leaves the checkpoint unreachable.
  • Agent.cancel() is a turn-level signal. Callers who want to cancel only the current model call (e.g. to switch to another provider) do not want to invalidate the whole turn.

Possible Solution

Three options, in increasing scope. The first is a minimal fix that addresses the reported symptom without changing any public behavior; the others are design-level improvements that could come later.

Option A — Retrieve the exception on any non-happy path (~10 line diff)

async def stream(self, ...):
    ...
    task = asyncio.create_task(thread)
    try:
        while True:
            event = await queue.get()
            if event is None:
                break
            yield event
        await task
    except BaseException:
        if not task.done():
            task.add_done_callback(_retrieve_exception)
        raise


def _retrieve_exception(task: "asyncio.Task") -> None:
    try:
        task.exception()
    except asyncio.CancelledError:
        pass

Pros: minimal diff; behavior identical on the happy path; silences the orphan warning across all three scenarios above.
Cons: the boto3 thread continues until read_timeout; no hook for callers that want to await it.

Option B — Drain the task in a detached coroutine

Same try/except shape as Option A, but schedule a background coroutine that awaits task so any side-effects (logging, observability hooks inside _stream) run to completion:

except BaseException:
    if not task.done():
        async def _drain() -> None:
            try:
                await task
            except BaseException:
                pass
        asyncio.create_task(_drain(), name="bedrock_stream_drain")
    raise

Pros: uniform teardown across all cancellation paths.
Cons: introduces a detached task without a handle callers can await.

Option C — First-class cancellation primitive

Add an optional cancellation argument to BedrockModel.stream (and eventually other providers) that integrates with the existing process_stream checkpoint pattern. This would offer parity with the TypeScript SDK's cancelSignal option on Agent.invoke and would make Agent.cancel() a natural composition on top.

Pros: enables declarative deadlines, aligns Python/TS APIs, composes well with Agent.cancel().
Cons: larger surface; benefits from a cancellation RFC covering other providers.

Recommendation

Option A alone solves the user-visible symptom reported here and is safe to ship independently of any larger cancellation redesign. Options B and C can follow as separate discussions once there is consensus on the teardown contract for all Model implementations.

Happy to open a PR for Option A if maintainers agree on the direction.

Related Issues

None of these cover the BedrockModel.stream internal task cleanup described above.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions