Skip to content

VertexAiSessionService does not paginate when invoking list_sessions #4302

@arryon

Description

@arryon

Describe the Bug:
The VertexAiSessionService does not return any Session objects beyond the initial page default of 100 when using its list_sessions() interface.

Steps to Reproduce:
See code snippet example below.

  1. Create a VertexAiSessionService with more than 100 sessions for a given user_id (app_name does not matter, the session service does not filter on app_name currently)
  2. Create an instance of VertexAiSessionService with optional properties and fetch the user sessions using `session_service.list_sessions(app_name="[app_name]", user_id="[user_id]")
  3. The resulting list will be capped to a maximum of 100 items

Expected Behavior:
We expect pagination to automatically be applied, since the sessions_iterator object is of type google.genai.pagers.AsyncPager (or perhaps more specifically google.cloud.spanner_v1.services.spanner.pagers.ListSessionsAsyncPager?).

Observed Behavior:
The list is capped to 100 items. This is because AsyncPager does not implement automated pagination on non-async iterator dunder methods (iter or next), but instead only implements __aiter__ and __anext__. The docs specifically state:

If there are more pages, the ``__aiter__`` method will make additional
    ``ListSessions`` requests and continue to iterate
    through the ``sessions`` field on the
    corresponding responses.

But, in VertexAiSessionService, we loop the iterator using for instead of async for, therefore not using the underlying pagination.

Offending line:

for api_session in sessions_iterator:

Environment Details:

  • ADK Library Version (pip show google-adk): 1.23.0
  • Desktop OS:** MacOS
  • Python Version (python -V): 3.11

Model Information:

  • Are you using LiteLLM: No
  • Which model is being used: (e.g., gemini-2.5-pro) N/A

🟡 Optional Information

Providing this information greatly speeds up the resolution process.

Regression:

Did this work in a previous version of ADK? If so, which one?

Not aware of this

Minimal Reproduction Code:
Requires installing UV, the script itself is standalone. Fill in GOOGLE_PROJECT_ID, LOCATION and AGENT_ENGINE_ID
Please provide a code snippet or a link to a Gist/repo that isolates the issue.

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
#   "google-adk>=1.23.0",
#   "click"
# ]
# ///
"""
Test script to reproduce the session pagination issue in VertexAiSessionService.

This script:
1. Creates 110 sessions for a test user
2. Lists sessions using ADK's VertexAiSessionService (buggy - uses 'for' instead of 'async for')
3. Lists sessions using direct Vertex AI client with proper async iteration
4. Compares results to demonstrate the pagination bug

Usage:
    uv run python scripts/test_session_pagination.py --environment dev
"""

import asyncio
from datetime import datetime
import click
import vertexai

from google.adk.sessions import VertexAiSessionService

GOOGLE_PROJECT_ID = ""
LOCATION = ""
AGENT_ENGINE_ID = ""
TEST_USER_ID = "test-pagination-user"
APP_NAME = "test-agent"
NUM_SESSIONS_TO_CREATE = 110  # More than the default page size of 100


async def create_test_sessions(service: VertexAiSessionService, num_sessions: int) -> list[str]:
    """Create test sessions and return their IDs."""
    print(f"\n{'='*60}")
    print(f"STEP 1: Creating {num_sessions} test sessions...")
    print(f"{'='*60}")

    session_ids = []
    for i in range(num_sessions):
        try:
            session = await service.create_session(
                app_name=APP_NAME,
                user_id=TEST_USER_ID,
                state={"test_index": i, "created_at": datetime.now().isoformat()},
            )
            session_ids.append(session.id)
            if (i + 1) % 10 == 0:
                print(f"  Created {i + 1}/{num_sessions} sessions...")
        except Exception as e:
            print(f"  Error creating session {i}: {e}")

    print(f"  Successfully created {len(session_ids)} sessions")
    return session_ids


async def list_sessions_adk(service: VertexAiSessionService) -> list[str]:
    """List sessions using ADK's VertexAiSessionService (has pagination bug)."""
    print(f"\n{'='*60}")
    print("STEP 2: Listing sessions using ADK VertexAiSessionService...")
    print("        (This uses 'for' instead of 'async for' - BUG)")
    print(f"{'='*60}")

    response = await service.list_sessions(app_name=APP_NAME, user_id=TEST_USER_ID)

    session_ids = []
    if hasattr(response, "sessions") and response.sessions:
        for session in response.sessions:
            if hasattr(session, "id"):
                session_ids.append(session.id)

    print(f"  ADK returned: {len(session_ids)} sessions")
    return session_ids


async def list_sessions_fixed() -> list[str]:
    """List sessions using direct Vertex AI client with proper async iteration."""
    print(f"\n{'='*60}")
    print("STEP 3: Listing sessions using fixed method...")
    print("        (Uses 'async for' for proper pagination)")
    print(f"{'='*60}")

    client = vertexai.Client(
        project=GOOGLE_PROJECT_ID,
        location=LOCATION,
    )

    agent_engine_id = AGENT_ENGINE_ID
    filter_config = {"filter": f'user_id="{TEST_USER_ID}"'}

    sessions_pager = await client.aio.agent_engines.sessions.list(
        name=f"reasoningEngines/{agent_engine_id}",
        config=filter_config,
    )

    # Use async for to properly paginate through ALL sessions
    session_ids = []
    async for api_session in sessions_pager:
        session_id = api_session.name.split("/")[-1]
        session_ids.append(session_id)

    print(f"  Fixed method returned: {len(session_ids)} sessions")
    return session_ids


async def cleanup_sessions(service: VertexAiSessionService, session_ids: list[str]) -> None:
    """Delete all test sessions."""
    print(f"\n{'='*60}")
    print(f"CLEANUP: Deleting {len(session_ids)} test sessions...")
    print(f"{'='*60}")

    deleted = 0
    for session_id in session_ids:
        try:
            await service.delete_session(
                app_name=APP_NAME,
                user_id=TEST_USER_ID,
                session_id=session_id,
            )
            deleted += 1
            if deleted % 10 == 0:
                print(f"  Deleted {deleted}/{len(session_ids)} sessions...")
        except Exception as e:
            print(f"  Error deleting session {session_id}: {e}")

    print(f"  Successfully deleted {deleted} sessions")


async def run_test(skip_create: bool, skip_cleanup: bool) -> None:
    """Run the pagination test."""
    # Create service
    service = VertexAiSessionService(
        project=GOOGLE_PROJECT_ID,
        location=LOCATION,
        agent_engine_id=AGENT_ENGINE_ID,
    )

    print(f"\nTest Configuration:")
    print(f"  Project: {GOOGLE_PROJECT_ID}")
    print(f"  Agent Engine: {AGENT_ENGINE_ID}")
    print(f"  Test User: {TEST_USER_ID}")
    print(f"  Sessions to create: {NUM_SESSIONS_TO_CREATE}")

    created_session_ids = []

    try:
        # Step 1: Create test sessions
        if not skip_create:
            created_session_ids = await create_test_sessions(service, NUM_SESSIONS_TO_CREATE)
        else:
            print("\n[Skipping session creation]")

        # Step 2: List using ADK (buggy)
        adk_session_ids = await list_sessions_adk(service)

        # Step 3: List using fixed method
        fixed_session_ids = await list_sessions_fixed()

        # Compare results
        print(f"\n{'='*60}")
        print("RESULTS COMPARISON")
        print(f"{'='*60}")
        print(f"  Sessions created:              {len(created_session_ids) if created_session_ids else 'N/A'}")
        print(f"  ADK list_sessions returned:    {len(adk_session_ids)}")
        print(f"  Fixed method returned:         {len(fixed_session_ids)}")
        print()

        if len(adk_session_ids) < len(fixed_session_ids):
            print("  BUG CONFIRMED!")
            print(f"  ADK is missing {len(fixed_session_ids) - len(adk_session_ids)} sessions due to pagination bug.")
            print("  The ADK uses 'for' instead of 'async for' with AsyncPager,")
            print("  causing it to only return the first page of results (~100).")
        elif len(adk_session_ids) == len(fixed_session_ids):
            if len(adk_session_ids) <= 100:
                print("  No difference detected (but session count <= 100, so pagination not tested)")
            else:
                print("  No bug detected - both methods returned the same count")
        else:
            print("  Unexpected: ADK returned MORE sessions than fixed method?")

        # Return the full list for cleanup
        all_session_ids = list(set(adk_session_ids) | set(fixed_session_ids))

    finally:
        # Cleanup
        if not skip_cleanup and (created_session_ids or not skip_create):
            # Use the fixed method to get all sessions for cleanup
            all_sessions = await list_sessions_fixed()
            await cleanup_sessions(service, all_sessions)
        elif skip_cleanup:
            print("\n[Skipping cleanup]")


@click.command()
@click.option(
    "--skip-create",
    is_flag=True,
    help="Skip session creation (use existing sessions)",
)
@click.option(
    "--skip-cleanup",
    is_flag=True,
    help="Skip cleanup (leave sessions in place)",
)
def main(skip_create: bool, skip_cleanup: bool) -> None:
    """Test session pagination bug in VertexAiSessionService."""
    asyncio.run(run_test(skip_create, skip_cleanup))


if __name__ == "__main__":
    main()

How often has this issue occurred?:

  • Always (100%)

Metadata

Metadata

Assignees

No one assigned

    Labels

    services[Component] This issue is related to runtime services, e.g. sessions, memory, artifacts, etc

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions