fix(realtime): sync remote items to local chat_ctx with placeholders to prevent in-flight deletion#5114
fix(realtime): sync remote items to local chat_ctx with placeholders to prevent in-flight deletion#5114
Conversation
…o prevent in-flight deletion
livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/realtime/realtime_model.py
Show resolved
Hide resolved
| def _on_remote_item_added(self, ev: llm.RemoteItemAddedEvent) -> None: | ||
| # add the remote item to the local chat context as a placeholder | ||
| local_chat_ctx = self._agent._chat_ctx | ||
| if local_chat_ctx.get_by_id(ev.item.id) is not None: | ||
| return | ||
|
|
||
| # only add placeholders for server-initiated items (responses, function calls), | ||
| # which always append at the end of the conversation. client-initiated items | ||
| # (from update_chat_ctx) already exist in _agent._chat_ctx and go local→remote, | ||
| # so they don't need placeholders. | ||
| last_item_id = local_chat_ctx.items[-1].id if local_chat_ctx.items else None | ||
| if ev.previous_item_id is None or ev.previous_item_id == last_item_id: | ||
| local_chat_ctx.items.append(ev.item.model_copy()) |
There was a problem hiding this comment.
🔴 _on_remote_item_added leaves stale empty placeholder messages in local chat context
When the server creates an assistant message item, conversation.item.created fires immediately (before any content is streamed), so _on_remote_item_added appends a ChatMessage(role="assistant", content=[]) placeholder to the local context. Later, at agent_activity.py:2702, the placeholder is only replaced if msg_gen and forwarded_text is truthy. If the response is interrupted before any text is forwarded, or the response produces no text (e.g., audio-only without transcript), forwarded_text is "" and _upsert_item is never called—leaving a permanent empty assistant message in _agent._chat_ctx. Before this PR, no message was added in that case. This empty message could confuse downstream consumers of the chat context (e.g., summarization, provider format conversion, or the next LLM turn).
Prompt for agents
In livekit-agents/livekit/agents/voice/agent_activity.py, the _on_remote_item_added method (lines 1166-1178) unconditionally appends the placeholder item. However, in _realtime_generation_task_impl around line 2702, if msg_gen exists but forwarded_text is empty (interrupted or audio-only), the placeholder is never replaced or removed, leaving a stale empty assistant message in the local chat context.
To fix this, either:
1. In _realtime_generation_task_impl, after the interrupted handling and after the forwarded_text check, add cleanup logic: if msg_gen exists but forwarded_text is empty and the placeholder exists in _agent._chat_ctx, remove it (or update it with interrupted=True and empty content).
2. Or, change _on_remote_item_added to only add placeholders for item types that will always be replaced (e.g., only user messages and function calls, not assistant messages which are conditionally added).
Option 1 is likely cleaner. After line 2711, add an else branch like:
elif msg_gen:
# Remove stale placeholder if no text was forwarded
idx = self._agent._chat_ctx.index_by_id(msg_gen.message_id)
if idx is not None:
del self._agent._chat_ctx.items[idx]
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
The server still has that item, the next server-initiated item would have previous_item_id pointing to this message, which wouldn't match last_item_id in _on_remote_item_added, so the next placeholder would be silently dropped.
so maybe we should allow empty item for realtime session.
There was a problem hiding this comment.
I think empty message is fine? If it is interrupted w/o any text forwarded or is an audio-only message, the remote message should be "empty" as well, right?
| # (from update_chat_ctx) already exist in _agent._chat_ctx and go local→remote, | ||
| # so they don't need placeholders. | ||
| last_item_id = local_chat_ctx.items[-1].id if local_chat_ctx.items else None | ||
| if ev.previous_item_id is None or ev.previous_item_id == last_item_id: |
There was a problem hiding this comment.
QQ: would parallel tool calls have the same previous_item_id? I wonder if we should check for ev.previous_item_id in item_ids instead.
There was a problem hiding this comment.
gpt-realtime can do "parallel" tool calls by doing multiple tool calls concurrently on one turn, but they have a sequential order on arrival from what I understand.
When I saw this line, I have to admit it stood out as a risk, but I can't think of a realistic scenario that causes an issue.
I thought we could be on the safe side, and log a warning in the else condition, just to catch if there is anything we have missed.
There was a problem hiding this comment.
yeah, I think the function calls will come in sequential with different previous item id, each ConversationItemAdded event carries a ConversationItem (either a message or a function call) and a previous_item_id.
class ConversationItemAdded(BaseModel):
"""Sent by the server when an Item is added to the default Conversation.
This can happen in several cases:
- When the client sends a `conversation.item.create` event.
- When the input audio buffer is committed. In this case the item will be a user message containing the audio from the buffer.
- When the model is generating a Response. In this case the `conversation.item.added` event will be sent when the model starts generating a specific Item, and thus it will not yet have any content (and `status` will be `in_progress`).
The event will include the full content of the Item (except when model is generating a Response) except for audio data, which can be retrieved separately with a `conversation.item.retrieve` event if necessary.
"""
event_id: str
"""The unique ID of the server event."""
item: ConversationItem
"""A single item within a Realtime conversation."""
type: Literal["conversation.item.added"]
"""The event type, must be `conversation.item.added`."""
previous_item_id: Optional[str] = None
"""The ID of the item that precedes this one, if any.
This is used to maintain ordering when items are inserted.
"""
ConversationItem: TypeAlias = Annotated[
Union[
RealtimeConversationItemSystemMessage,
RealtimeConversationItemUserMessage,
RealtimeConversationItemAssistantMessage,
RealtimeConversationItemFunctionCall,
RealtimeConversationItemFunctionCallOutput,
RealtimeMcpApprovalResponse,
RealtimeMcpListTools,
RealtimeMcpToolCall,
RealtimeMcpApprovalRequest,
],
PropertyInfo(discriminator="type"),
]| def _on_remote_item_added(self, ev: llm.RemoteItemAddedEvent) -> None: | ||
| # add the remote item to the local chat context as a placeholder | ||
| local_chat_ctx = self._agent._chat_ctx | ||
| if local_chat_ctx.get_by_id(ev.item.id) is not None: | ||
| return | ||
|
|
||
| # only add placeholders for server-initiated items (responses, function calls), | ||
| # which always append at the end of the conversation. client-initiated items | ||
| # (from update_chat_ctx) already exist in _agent._chat_ctx and go local→remote, | ||
| # so they don't need placeholders. | ||
| last_item_id = local_chat_ctx.items[-1].id if local_chat_ctx.items else None | ||
| if ev.previous_item_id is None or ev.previous_item_id == last_item_id: | ||
| local_chat_ctx.items.append(ev.item.model_copy()) |
There was a problem hiding this comment.
I think empty message is fine? If it is interrupted w/o any text forwarded or is an audio-only message, the remote message should be "empty" as well, right?
|
Nice work on this, I can't see any clear issues when reading over! I will load in the branch into my test suit tomorrow morning for some checks, but I suspect it will pass |
There was a problem hiding this comment.
@longcw I tested the solution both with the test suit and manual tests following patterns I know produce the issue. I was unable to trigger the issue I have been observing.
As noted in discussions above, I would encourage a defensive warning log to capture if we miss anything with the placeholder not finding the previous ID to be the most recent in context.
Overall, it seems to work nicely! I approve of this PR!
fix #5021