Skip to content

Add native asyncio cursor support (AioCursor) #662

@laughingman7743

Description

@laughingman7743

Summary

Add native asyncio cursor support to PyAthena, enabling async/await based query execution without blocking the event loop. This provides true async I/O for applications using asyncio (e.g., FastAPI, aiohttp, async ETL pipelines).

Motivation

The current AsyncCursor uses ThreadPoolExecutor to run queries concurrently, but all boto3 calls (including the polling loop with time.sleep()) block the calling thread. This prevents effective integration with asyncio event loops where coroutines expect non-blocking operations.

A native asyncio cursor would:

  • Allow await cursor.execute(...) without blocking the event loop
  • Use asyncio.sleep() during polling, releasing the event loop for other coroutines
  • Support async for row in cursor: iteration
  • Enable proper integration with asyncio-based web frameworks and orchestrators

Why not aiobotocore / aioboto3?

Existing async wrappers for boto3 (aiobotocore, aioboto3) impose strict version pinning on botocore/boto3, which frequently conflicts with users' dependency trees and makes version management fragile. These libraries require exact patch-level botocore version matches, causing widespread compatibility issues in practice.

Instead, this implementation wraps synchronous boto3 calls with asyncio.to_thread() (available since Python 3.9; PyAthena requires >=3.10). This approach:

  • Has zero additional dependencies — uses only the standard library asyncio module
  • Works with any boto3/botocore version PyAthena already supports
  • Is well-suited for Athena's usage pattern (infrequent API calls + polling intervals)
  • Keeps the blocking boto3 call contained in a thread while the event loop remains free

Proposed API

import pyathena
from pyathena.aio.cursor import AioCursor

# Async connection + cursor
async with pyathena.aconnect(
    s3_staging_dir="s3://bucket/path/",
    region_name="us-east-1",
) as conn:
    cursor = conn.cursor()

    # async execute + fetch
    await cursor.execute("SELECT * FROM my_table LIMIT 100")
    rows = await cursor.fetchall()

    # async iteration
    await cursor.execute("SELECT * FROM large_table")
    async for row in cursor:
        process(row)

    # pandas variant
    from pyathena.aio.pandas.cursor import AioPandasCursor
    pandas_cursor = conn.cursor(AioPandasCursor)
    await pandas_cursor.execute("SELECT * FROM my_table")
    df = await pandas_cursor.as_pandas()

Architecture & Investigation

Current Architecture

PyAthena has a layered cursor architecture:

BaseCursor (pyathena/common.py)          — AWS API interactions, polling, request building
├── Cursor (pyathena/cursor.py)          — sync DB API 2.0 cursor (tuples)
│   └── DictCursor                       — dict results
├── AsyncCursor (pyathena/async_cursor.py) — ThreadPoolExecutor-based "async"
├── PandasCursor (pyathena/pandas/)      — pandas DataFrame results
├── ArrowCursor (pyathena/arrow/)        — PyArrow Table results
├── PolarsCursor (pyathena/polars/)      — Polars DataFrame results
├── S3FSCursor (pyathena/s3fs/)          — CSV/S3-based results
└── SparkBaseCursor (pyathena/spark/)    — Spark session calculations

Each cursor subpackage follows a consistent pattern: cursor.py, async_cursor.py, converter.py, result_set.py.

boto3/botocore API Calls Inventory

All blocking calls that need async wrapping (20 total):

Athena query APIs (in pyathena/common.py):

API Call Method Line
start_query_execution _execute() 652
get_query_execution _get_query_execution() 458
stop_query_execution _cancel() 713
list_query_executions _list_query_executions() 527
batch_get_query_execution _batch_get_query_execution() 502
list_databases _list_databases() 328
list_table_metadata _list_table_metadata() 418
get_table_metadata _get_table_metadata() 375

Result fetching (in pyathena/result_set.py):

API Call Method Line
get_query_results AthenaResultSet.__fetch() 346
head_object (S3) _get_content_length() 477
get_object (S3) _read_data_manifest() 495

Spark session APIs (in pyathena/spark/common.py):

API Call Method Line
start_session _start_session() 177
get_session _exists_session() 147
get_session_status _get_session_status() 119
terminate_session _terminate_session() 193
start_calculation_execution _calculate() 677
get_calculation_execution_status _get_calculation_execution_status() 473
get_calculation_execution _get_calculation_execution() 488
stop_calculation_execution _cancel() (spark) 229
get_object (S3) _read_s3_file_as_text() 107

STS APIs (in pyathena/connection.py):

API Call Method Line
assume_role _assume_role() 388
get_session_token _get_session_token() 427

Polling Mechanism (Key Async Win)

The biggest benefit of async cursors is in the polling loop (common.py:542-563):

# Current (blocks event loop entirely)
def __poll(self, query_id):
    while True:
        query_execution = self._get_query_execution(query_id)  # blocking boto3 call
        if query_execution.state in terminal_states:
            return query_execution
        time.sleep(self._poll_interval)  # blocks event loop!

# Proposed (event loop stays free)
async def _poll(self, query_id):
    while True:
        query_execution = await asyncio.to_thread(  # runs in thread
            retry_api_call, self._connection.client.get_query_execution,
            config=self._retry_config, logger=_logger,
            QueryExecutionId=query_id
        )
        if AthenaQueryExecution(query_execution).state in terminal_states:
            return ...
        await asyncio.sleep(self._poll_interval)  # yields to event loop!

Retry Mechanism

The existing retry_api_call() in pyathena/util.py uses tenacity.Retrying (synchronous). For the async version, the entire retry_api_call() invocation (including retries) will be wrapped with asyncio.to_thread(). This is simpler than using tenacity.AsyncRetrying and keeps the blocking I/O fully contained in a thread:

async def async_retry_api_call(func, config, logger=None, *args, **kwargs):
    return await asyncio.to_thread(retry_api_call, func, config, logger, *args, **kwargs)

Implementation Plan

Phase 1: Core Infrastructure

Create the pyathena/aio/ package with the foundational classes.

Files to create:

  1. pyathena/aio/__init__.py — Package init, re-exports
  2. pyathena/aio/util.pyasync_retry_api_call() helper
  3. pyathena/aio/connection.pyAsyncConnection
    • async __aenter__ / async __aexit__
    • cursor() factory returning AioCursor by default
    • Handle STS calls (assume_role, get_session_token) via asyncio.to_thread() in a classmethod factory or during __aenter__
    • Note: boto3.Session() creation is sync and doesn't make network calls — only STS calls need async treatment
  4. pyathena/aio/common.pyAioBaseCursor
    • Inherits from BaseCursor to reuse request builders (_build_start_query_execution_request, etc.)
    • Overrides all I/O methods with async versions using async_retry_api_call()
    • Replaces time.sleep() with asyncio.sleep() in polling
    • Handles KeyboardInterrupt / asyncio.CancelledError in polling
  5. pyathena/aio/cursor.pyAioCursor, AioDictCursor
    • async execute() → returns AioCursor (for chaining)
    • async executemany()
    • async fetchone(), async fetchmany(), async fetchall()
    • async cancel(), async close()
    • __aiter__ / __anext__ for async for support
    • async __aenter__ / async __aexit__
  6. pyathena/aio/result_set.pyAioResultSet, AioDictResultSet
    • Extends AthenaResultSet with async fetch methods
    • async fetchone(), async fetchmany(), async fetchall()
    • __aiter__ / __anext__ for async iteration
    • Async _pre_fetch() and _fetch() using async_retry_api_call()
  7. pyathena/__init__.py — Add aconnect() top-level function

Files to create for tests:

  1. tests/pyathena/aio/__init__.py
  2. tests/pyathena/aio/test_cursor.py — Tests for AioCursor
  3. tests/pyathena/aio/test_connection.py — Tests for AsyncConnection

Phase 2: Specialized Cursors (incremental, one PR per cursor type)

Each follows the same pattern as the sync counterpart but with async methods.

  • pyathena/aio/pandas/AioPandasCursor + AioPandasResultSet
  • pyathena/aio/arrow/AioArrowCursor + AioArrowResultSet
  • pyathena/aio/polars/AioPolarsCursor + AioPolarsResultSet
  • pyathena/aio/s3fs/AioS3FSCursor + AioS3FSResultSet
  • pyathena/aio/spark/AioSparkCursor + AioSparkBaseCursor

Phase 3: Integration & Polish

  • Documentation updates (README, sphinx docs)
  • SQLAlchemy dialect integration (if applicable — SQLAlchemy 2.0 has async support)
  • Performance benchmarks vs sync + ThreadPoolExecutor approaches

Design Decisions

1. Module Structure: pyathena/aio/ Package

Mirrors the existing package structure under a new aio/ namespace:

pyathena/aio/
├── __init__.py
├── util.py              # async_retry_api_call
├── connection.py        # AsyncConnection
├── common.py            # AioBaseCursor
├── cursor.py            # AioCursor, AioDictCursor
├── result_set.py        # AioResultSet, AioDictResultSet
├── pandas/
│   ├── cursor.py        # AioPandasCursor
│   └── result_set.py    # AioPandasResultSet
├── arrow/
│   ├── cursor.py        # AioArrowCursor
│   └── result_set.py    # AioArrowResultSet
├── polars/
│   ├── cursor.py        # AioPolarsCursor
│   └── result_set.py    # AioPolarsResultSet
├── s3fs/
│   ├── cursor.py        # AioS3FSCursor
│   └── result_set.py    # AioS3FSResultSet
└── spark/
    ├── common.py        # AioSparkBaseCursor
    └── cursor.py        # AioSparkCursor

2. Naming: Aio Prefix

Use Aio prefix (e.g., AioCursor) to distinguish from the existing Async prefix (e.g., AsyncCursor) which uses threads. This is consistent with the aio package name and avoids confusion.

3. Inheritance: Extend Sync Classes

AioBaseCursor inherits from BaseCursor to reuse:

  • All request builder methods (_build_*) — pure computation, no I/O
  • Constants (LIST_QUERY_EXECUTIONS_MAX_RESULTS, etc.)
  • __init__ parameter handling
  • connection property

Override only the I/O methods with async versions. This minimizes code duplication while providing clean async interfaces.

4. asyncio.to_thread() for All Blocking Calls

Every boto3 API call is wrapped with asyncio.to_thread():

response = await asyncio.to_thread(
    retry_api_call,
    self._connection.client.some_api_call,
    config=self._retry_config,
    logger=_logger,
    **request,
)

This keeps the implementation simple and dependency-free.

5. Connection Initialization

boto3.Session() creation is synchronous and fast (no network calls). STS operations (assume_role, get_session_token) are the only potentially blocking calls during connection setup.

Options (to be decided during implementation):

  • Factory classmethod: conn = await AsyncConnection.create(...) — cleanest for STS cases
  • Module function: conn = await pyathena.aconnect(...) — most user-friendly
  • Lazy STS: defer STS calls to first execute() — simplest __init__

Recommended: provide both aconnect() (module-level) and AsyncConnection (class-level).

6. Cancellation Handling

Replace KeyboardInterrupt handling in polling with asyncio.CancelledError:

async def _poll(self, query_id):
    try:
        return await self.__poll(query_id)
    except asyncio.CancelledError:
        if self._kill_on_interrupt:
            await self._cancel(query_id)
            return await self.__poll(query_id)
        raise

7. No New Dependencies

This feature adds zero new dependencies. Only standard library asyncio is used.

Implementation Notes for Handoff

Key Patterns to Follow

  1. Request builders are sync and reusable — All _build_* methods in BaseCursor and request construction logic can be called directly from async code (no I/O involved).

  2. retry_api_call() wrapping pattern — Every sync method that calls retry_api_call() needs an async counterpart using asyncio.to_thread(retry_api_call, ...).

  3. Result set init calls _pre_fetch() — The AthenaResultSet.__init__() calls _pre_fetch() which makes a boto3 call. For AioResultSet, this needs to be deferred to an async factory method or called externally after construction.

  4. S3 client in result setAthenaResultSet.__init__() creates an S3 client (connection.session.client("s3", ...)) for reading results. This is sync and fast, so it can remain in __init__.

  5. Converter and formatter are sync — Type conversion and parameter formatting involve no I/O and can be reused as-is.

  6. Test structure — Tests mirror source structure under tests/pyathena/. Existing tests in test_async_cursor.py provide patterns for testing concurrent execution. New tests should use pytest-asyncio and follow similar patterns with async def test_* functions.

Critical Implementation Detail: AioResultSet Initialization

AthenaResultSet.__init__() immediately calls self._pre_fetch() which issues a blocking get_query_results API call. For async:

class AioResultSet(AthenaResultSet):
    def __init__(self, ...):
        # Override to skip _pre_fetch in __init__
        # Call parent's __init__ logic but defer the fetch
        ...

    async def _pre_fetch(self):
        response = await async_retry_api_call(
            self.connection.client.get_query_results, ...)
        self._process_metadata(response)  # sync, pure computation
        self._process_rows(response)      # sync, pure computation

    @classmethod
    async def create(cls, connection, converter, query_execution, ...):
        instance = cls.__new__(cls)
        # Initialize fields
        ...
        if instance.state == AthenaQueryExecution.STATE_SUCCEEDED:
            instance._rownumber = 0
            await instance._pre_fetch()
        return instance

Dependencies Check

  • boto3>=1.26.4 — sync AWS SDK (wrapped with asyncio.to_thread())
  • botocore>=1.29.4 — underlying sync HTTP client
  • tenacity>=4.1.0 — retry logic (used synchronously inside threads)
  • Python >=3.10asyncio.to_thread() available since 3.9
  • No new dependencies needed

Open Questions

  1. SQLAlchemy async dialect — SQLAlchemy 2.0+ supports async via create_async_engine(). Should we add awsathena+aio:// dialect entries? (Probably Phase 3)
  2. async_generator for fetchmany() — Should we also provide async def fetchmany_iter() that yields chunks as an async generator?
  3. Thread pool configuration — Should AsyncConnection accept a custom Executor for asyncio.to_thread() override, or rely on the default thread pool?

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions