Skip to content
62 changes: 62 additions & 0 deletions contributing/samples/agent_tool_event_streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# AgentTool Event Streaming Demo

This sample demonstrates the AgentTool event streaming feature (Issue #3984).


**Before the fix:**
- When a coordinator agent delegates to a sub-agent via AgentTool, the sub-agent's execution acts as a "black box"
- No events are yielded during sub-agent execution
- Frontend appears unresponsive for the duration of sub-agent execution
- Only the final result is returned after sub-agent completes

**After the fix:**
- Sub-agent events are streamed in real-time to the parent Runner
- Frontend receives immediate feedback about sub-agent progress
- Users can see intermediate steps, tool calls, and responses as they happen
- Much better UX for hierarchical multi-agent systems

## Running the Demo

```bash
cd contributing/samples/agent_tool_event_streaming
adk web .

```

Then in the web UI, select agent_tool_event_streaming from the dropdown
1. Ask: "Research the history of artificial intelligence"
2. Watch the events stream in real-time - you'll see:
- Coordinator agent's function call
- Research agent's step-by-step progress
- Research agent's intermediate responses
- Final summary


## Expected Behavior

With event streaming enabled, you should see:

1. **Coordinator events:**
- Function call to `research_agent`

2. **Research agent events (streamed in real-time):**
- "Step 1: Acknowledging task..."
- "Step 2: Researching topic..."
- "Step 3: Analyzing findings..."
- "Final summary: ..."

3. **Coordinator final response:**
- Summary of the research

All events should appear progressively, not all at once at the end.

## Before/After Comparison

To see the difference:

1. **Before fix:** Run on a branch without the event streaming feature
- You'll see: Coordinator call → (long pause) → Final result

2. **After fix:** Run on this branch
- You'll see: Coordinator call → Research steps streaming → Final result

17 changes: 17 additions & 0 deletions contributing/samples/agent_tool_event_streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .agent import root_agent

__all__ = ['root_agent']
66 changes: 66 additions & 0 deletions contributing/samples/agent_tool_event_streaming/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Sample demonstrating AgentTool event streaming.

This sample shows how events from sub-agents wrapped in AgentTool are
streamed to the parent Runner in real-time, providing visibility into
sub-agent execution progress.

Before the fix: Sub-agent events are buffered until completion, making
the frontend appear unresponsive during long-running sub-agent tasks.

After the fix: Sub-agent events are streamed immediately, providing
real-time feedback to the frontend.
"""

from google.adk import Agent
from google.adk.tools import AgentTool

# Sub-agent that performs a multi-step task
research_agent = Agent(
name='research_agent',
model='gemini-2.5-flash-lite',
description='A research agent that performs multi-step research tasks',
instruction="""
You are a research assistant. When given a research task, break it down
into steps and report your progress as you work:

1. First, acknowledge the task and outline your approach
2. Then, perform the research (simulate by thinking through the steps)
3. Finally, provide a comprehensive summary

Always be verbose about your progress so the user can see what you're doing.
""",
)

# Coordinator agent that delegates to the research agent
coordinator_agent = Agent(
name='coordinator_agent',
model='gemini-2.5-flash-lite',
description='A coordinator that delegates research tasks',
instruction="""
You are a coordinator agent. When users ask research questions, delegate
them to the research_agent tool. Always use the research_agent tool for
any research-related queries.
""",
tools=[
AgentTool(
agent=research_agent,
skip_summarization=True,
)
],
)

root_agent = coordinator_agent
38 changes: 21 additions & 17 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,24 +678,28 @@ async def _postprocess_handle_function_calls_async(
function_call_event: Event,
llm_request: LlmRequest,
) -> AsyncGenerator[Event, None]:
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
):
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

# Always yield the function response event first
yield function_response_event
# Handle function calls with AgentTool event streaming (handles both AgentTool and regular calls)
function_response_event = None
async with Aclosing(
functions.handle_function_calls_async_with_agent_tool_streaming(
invocation_context, function_call_event, llm_request.tools_dict
)
) as agen:
async for event in agen:
# Track the function response event for post-processing
if (
event.content
and event.content.parts
and any(
part.function_response
for part in event.content.parts
if part.function_response
)
Comment on lines +693 to +697
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition to check for a function_response can be simplified. The if part.function_response clause within the generator expression is redundant because any() correctly handles None values by treating them as False.

            and any(
                part.function_response for part in event.content.parts
            )

):
Comment on lines +690 to +698
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition to check for a function response event can be simplified by using the existing event.get_function_responses() helper method. This improves readability and maintainability by reusing existing logic.

        if event.get_function_responses():

function_response_event = event
yield event

if function_response_event:
# Check if this is a set_model_response function response
if json_response := _output_schema_processor.get_structured_model_response(
function_response_event
Expand Down
134 changes: 134 additions & 0 deletions src/google/adk/flows/llm_flows/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,140 @@ def generate_request_confirmation_event(
)


async def handle_function_calls_async_with_agent_tool_streaming(
invocation_context: InvocationContext,
function_call_event: Event,
tools_dict: dict[str, BaseTool],
) -> AsyncGenerator[Event, None]:
"""Handles function calls with event streaming for AgentTool.

Yields events from AgentTool sub-agents as they are generated, then
yields the final function response event.
"""
from ...tools.agent_tool import AgentTool

function_calls = function_call_event.get_function_calls()
if not function_calls:
return

agent_tool_calls = []
regular_calls = []

# Separate AgentTool calls from regular calls
for function_call in function_calls:
tool = tools_dict.get(function_call.name)
if isinstance(tool, AgentTool):
agent_tool_calls.append((function_call, tool))
else:
regular_calls.append(function_call)

# If no AgentTool calls, use normal flow
if not agent_tool_calls:
function_response_event = await handle_function_calls_async(
invocation_context, function_call_event, tools_dict
)
if function_response_event:
auth_event = generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event
tool_confirmation_event = generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event
yield function_response_event
return
Comment on lines +217 to +233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's some code duplication between this block for handling non-AgentTool calls and the block at the end of the function (lines 308-320) that handles AgentTool call responses. Both blocks generate and yield auth_event, tool_confirmation_event, and the final function_response_event.

To improve maintainability and reduce redundancy, consider extracting this common logic into a helper async generator function. This would centralize the event generation and yielding process.


# Stream events from AgentTool sub-agents
agent_tool_results = {}
for function_call, agent_tool in agent_tool_calls:
tool_context = _create_tool_context(invocation_context, function_call, None)
last_content = None

async with Aclosing(
agent_tool.run_async_with_events(
args=function_call.args or {}, tool_context=tool_context
)
) as agen:
async for event in agen:
yield event
if event.content:
last_content = event.content

# Build final result from last content using AgentTool helper method
tool_result = agent_tool._build_tool_result_from_content(last_content)
# Wrap non-dict results for function response format
if not isinstance(tool_result, dict):
tool_result = {'result': tool_result}
agent_tool_results[function_call.id] = tool_result

# Handle regular calls if any
regular_response_event = None
if regular_calls:
# Use set for efficient membership testing
agent_tool_names = {fc.name for fc, _ in agent_tool_calls}
regular_call_event = Event(
invocation_id=function_call_event.invocation_id,
author=function_call_event.author,
content=types.Content(
role='user',
parts=[
part
for part in (function_call_event.content.parts or [])
if part.function_call
and part.function_call.name not in agent_tool_names
],
),
branch=function_call_event.branch,
)
regular_response_event = await handle_function_calls_async(
invocation_context, regular_call_event, tools_dict
)

# Build AgentTool response events
agent_tool_response_events = []
for function_call, agent_tool in agent_tool_calls:
if function_call.id in agent_tool_results:
tool_context = _create_tool_context(
invocation_context, function_call, None
)
response_event = __build_response_event(
agent_tool,
agent_tool_results[function_call.id],
tool_context,
invocation_context,
)
agent_tool_response_events.append(response_event)

# Merge all response events
all_events = []
if regular_response_event:
all_events.append(regular_response_event)
all_events.extend(agent_tool_response_events)

if all_events:
if len(all_events) == 1:
final_response_event = all_events[0]
else:
final_response_event = merge_parallel_function_response_events(all_events)

# Yield auth and confirmation events
auth_event = generate_auth_event(invocation_context, final_response_event)
if auth_event:
yield auth_event

tool_confirmation_event = generate_request_confirmation_event(
invocation_context, function_call_event, final_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

# Yield the final function response event
yield final_response_event


async def handle_function_calls_async(
invocation_context: InvocationContext,
function_call_event: Event,
Expand Down
Loading