Skip to content

[Question] How to handle event loss during resubscribe when connection breaks? #832

@lijinpengFj

Description

@lijinpengFj

Context

We're building a production system using the A2A Python SDK where a Server acts as a proxy between Clients and downstream Agents. The architecture looks like:

Client <--SSE--> Server (A2A Client) <--SSE--> Agent (A2A Server)

Problem

When the Server restarts (deployment/crash), the SSE connection to the Agent breaks. Later, when Server calls resubscribe_task, events that occurred during the disconnection are lost.

Looking at the current implementation:

# default_request_handler.py - on_resubscribe_to_task
queue = await self._queue_manager.tap(task.id)
if not queue:
    raise ServerError(error=TaskNotFoundError())

consumer = EventConsumer(queue)
async for event in result_aggregator.consume_and_emit(consumer):
    yield event

And InMemoryQueueManager:

# in_memory_queue_manager.py
async def tap(self, task_id: str) -> EventQueue | None:
    if task_id not in self._task_queue:
        return None  # Queue lost after restart
    return self._task_queue[task_id].tap()

Questions

  1. Is event loss acceptable by design?

    • The protocol spec says resubscribe is for "re-attaching to a running streaming task"
    • Should the SDK provide stronger guarantees?
  2. What's the recommended approach for production systems?

    Option A: Agent-side persistence

    • Agent persists events to Redis/DB
    • Custom QueueManager replays history on resubscribe

    Option B: Server-side recovery

    • Server calls get_task() first to get latest state
    • Accept that intermediate events are lost
    • Document this as expected behavior

    Option C: Use Push Notifications

    • Instead of SSE streaming, use push notifications
    • Agent calls Server webhook on state changes
    • Better for unreliable connections
  3. Should the SDK provide built-in support?

    • A PersistentQueueManager implementation?
    • Redis/RabbitMQ backends?
    • Or is this out of scope?

Our Current Thinking

We're considering implementing a custom QueueManager that:

  • Persists events to Redis
  • Replays missed events on tap()
  • Maintains consumer offsets

But we'd like to know:

  • Is this the right approach?
  • Has anyone solved this differently?
  • Should this be contributed back to the SDK?

Related Code


Would love to hear how others are handling this in production. Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions