-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Description
StreamingResponseAggregator._process_function_call_part misroutes the first chunk of a streaming function call when using stream_function_call_arguments=True. The first chunk carries the function name and will_continue=True but no partial_args, so the dispatch logic (which checks partial_args) treats it as a non-streaming call. This appends a spurious empty-args function call to the parts sequence and leaves _current_fc_name unset, causing all subsequent partial_args chunks to accumulate silently without ever being flushed.
Steps to Reproduce:
- Configure a Gemini 3 model via Vertex AI with
stream_function_call_arguments=TrueinFunctionCallingConfig - Create an
LlmAgentwith a tool that accepts a string argument (e.g. a document-writing tool) - Run the agent via
Runner.run_asyncwith SSE streaming - Observe the aggregated events — the function call appears with empty
argsinstead of the fully accumulated argument content
Expected Behavior:
The aggregator should recognize the first chunk as the start of a streaming function call (based on will_continue=True), set _current_fc_name, and accumulate subsequent partial_args chunks. The final flushed function call should contain the complete accumulated arguments.
Observed Behavior:
The first chunk is treated as a complete non-streaming function call and appended with empty args. _current_fc_name is never set, so _flush_function_call_to_sequence is never triggered by subsequent chunks or the end-of-stream marker. The accumulated partial_args content is silently dropped.
Gemini 3 sends streaming function call chunks in this sequence:
| Chunk | name |
will_continue |
partial_args |
|---|---|---|---|
| 1 (first) | "tool_name" |
True |
None / [] |
| 2..N (middle) | None |
True |
[PartialArg(...)] |
| N+1 (end) | None |
None / False |
None |
The dispatch in _process_function_call_part checks partial_args to decide streaming vs non-streaming. Chunk 1 has no partial_args, so it takes the non-streaming path.
Environment Details:
- ADK Library Version:
google-adk==1.23.0 - Desktop OS: Linux
- Python Version:
3.12
Model Information:
- Are you using LiteLLM: No
- Which model is being used:
gemini-3-pro-previewvia Vertex AI
🟡 Optional Information
Regression:
N/A — stream_function_call_arguments is a new capability, so there is no prior version where it worked.
Logs:
N/A — the bug is silent. The aggregator produces a malformed event (empty args) without errors or warnings.
Screenshots / Video:
N/A
Additional Context:
There is a secondary issue: Gemini 3 requires a thought_signature on function_call Parts in conversation history. The aggregator captures it on the first chunk via _current_thought_signature, but when ADK reconstructs session history for subsequent LLM calls, the signature can be dropped, causing validation errors. This may warrant a separate issue.
The fix is to also check will_continue when dispatching in _process_function_call_part, and to handle the end-of-stream marker (no name, no partial_args, will_continue falsy) by flushing accumulated state.
Minimal Reproduction Code:
from google.adk.agents import LlmAgent
from google.adk import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
def my_tool(document: str) -> dict:
"""Write a document."""
return {"status": "ok"}
agent = LlmAgent(
name="demo",
model="gemini-3-pro-preview",
instruction="Write a short story using the my_tool tool.",
tools=[my_tool],
generate_content_config=types.GenerateContentConfig(
tool_config=types.ToolConfig(
function_calling_config=types.FunctionCallingConfig(
stream_function_call_arguments=True
)
)
),
)
session_service = InMemorySessionService()
runner = Runner(agent=agent, app_name="demo", session_service=session_service)
# Requires Vertex AI credentials:
# GOOGLE_GENAI_USE_VERTEXAI=TRUE
# GOOGLE_CLOUD_PROJECT=<your-project>
# GOOGLE_CLOUD_LOCATION=global
async def main():
session = await session_service.create_session(app_name="demo", user_id="u")
content = types.Content(
role="user", parts=[types.Part(text="Write a short story")]
)
async for event in runner.run_async(
user_id="u", session_id=session.id, new_message=content
):
if event.content and event.content.parts:
for part in event.content.parts:
if part.function_call:
fc = part.function_call
# BUG: fc.args is empty/None on the aggregated event
print(f"FC: {fc.name}, args={fc.args}")
import asyncio
asyncio.run(main())How often has this issue occurred?:
- Always (100%)