-
Notifications
You must be signed in to change notification settings - Fork 107
Description
Summary
Add native asyncio cursor support to PyAthena, enabling async/await based query execution without blocking the event loop. This provides true async I/O for applications using asyncio (e.g., FastAPI, aiohttp, async ETL pipelines).
Motivation
The current AsyncCursor uses ThreadPoolExecutor to run queries concurrently, but all boto3 calls (including the polling loop with time.sleep()) block the calling thread. This prevents effective integration with asyncio event loops where coroutines expect non-blocking operations.
A native asyncio cursor would:
- Allow
await cursor.execute(...)without blocking the event loop - Use
asyncio.sleep()during polling, releasing the event loop for other coroutines - Support
async for row in cursor:iteration - Enable proper integration with asyncio-based web frameworks and orchestrators
Why not aiobotocore / aioboto3?
Existing async wrappers for boto3 (aiobotocore, aioboto3) impose strict version pinning on botocore/boto3, which frequently conflicts with users' dependency trees and makes version management fragile. These libraries require exact patch-level botocore version matches, causing widespread compatibility issues in practice.
Instead, this implementation wraps synchronous boto3 calls with asyncio.to_thread() (available since Python 3.9; PyAthena requires >=3.10). This approach:
- Has zero additional dependencies — uses only the standard library
asynciomodule - Works with any boto3/botocore version PyAthena already supports
- Is well-suited for Athena's usage pattern (infrequent API calls + polling intervals)
- Keeps the blocking boto3 call contained in a thread while the event loop remains free
Proposed API
import pyathena
from pyathena.aio.cursor import AioCursor
# Async connection + cursor
async with pyathena.aconnect(
s3_staging_dir="s3://bucket/path/",
region_name="us-east-1",
) as conn:
cursor = conn.cursor()
# async execute + fetch
await cursor.execute("SELECT * FROM my_table LIMIT 100")
rows = await cursor.fetchall()
# async iteration
await cursor.execute("SELECT * FROM large_table")
async for row in cursor:
process(row)
# pandas variant
from pyathena.aio.pandas.cursor import AioPandasCursor
pandas_cursor = conn.cursor(AioPandasCursor)
await pandas_cursor.execute("SELECT * FROM my_table")
df = await pandas_cursor.as_pandas()Architecture & Investigation
Current Architecture
PyAthena has a layered cursor architecture:
BaseCursor (pyathena/common.py) — AWS API interactions, polling, request building
├── Cursor (pyathena/cursor.py) — sync DB API 2.0 cursor (tuples)
│ └── DictCursor — dict results
├── AsyncCursor (pyathena/async_cursor.py) — ThreadPoolExecutor-based "async"
├── PandasCursor (pyathena/pandas/) — pandas DataFrame results
├── ArrowCursor (pyathena/arrow/) — PyArrow Table results
├── PolarsCursor (pyathena/polars/) — Polars DataFrame results
├── S3FSCursor (pyathena/s3fs/) — CSV/S3-based results
└── SparkBaseCursor (pyathena/spark/) — Spark session calculations
Each cursor subpackage follows a consistent pattern: cursor.py, async_cursor.py, converter.py, result_set.py.
boto3/botocore API Calls Inventory
All blocking calls that need async wrapping (20 total):
Athena query APIs (in pyathena/common.py):
| API Call | Method | Line |
|---|---|---|
start_query_execution |
_execute() |
652 |
get_query_execution |
_get_query_execution() |
458 |
stop_query_execution |
_cancel() |
713 |
list_query_executions |
_list_query_executions() |
527 |
batch_get_query_execution |
_batch_get_query_execution() |
502 |
list_databases |
_list_databases() |
328 |
list_table_metadata |
_list_table_metadata() |
418 |
get_table_metadata |
_get_table_metadata() |
375 |
Result fetching (in pyathena/result_set.py):
| API Call | Method | Line |
|---|---|---|
get_query_results |
AthenaResultSet.__fetch() |
346 |
head_object (S3) |
_get_content_length() |
477 |
get_object (S3) |
_read_data_manifest() |
495 |
Spark session APIs (in pyathena/spark/common.py):
| API Call | Method | Line |
|---|---|---|
start_session |
_start_session() |
177 |
get_session |
_exists_session() |
147 |
get_session_status |
_get_session_status() |
119 |
terminate_session |
_terminate_session() |
193 |
start_calculation_execution |
_calculate() |
677 |
get_calculation_execution_status |
_get_calculation_execution_status() |
473 |
get_calculation_execution |
_get_calculation_execution() |
488 |
stop_calculation_execution |
_cancel() (spark) |
229 |
get_object (S3) |
_read_s3_file_as_text() |
107 |
STS APIs (in pyathena/connection.py):
| API Call | Method | Line |
|---|---|---|
assume_role |
_assume_role() |
388 |
get_session_token |
_get_session_token() |
427 |
Polling Mechanism (Key Async Win)
The biggest benefit of async cursors is in the polling loop (common.py:542-563):
# Current (blocks event loop entirely)
def __poll(self, query_id):
while True:
query_execution = self._get_query_execution(query_id) # blocking boto3 call
if query_execution.state in terminal_states:
return query_execution
time.sleep(self._poll_interval) # blocks event loop!
# Proposed (event loop stays free)
async def _poll(self, query_id):
while True:
query_execution = await asyncio.to_thread( # runs in thread
retry_api_call, self._connection.client.get_query_execution,
config=self._retry_config, logger=_logger,
QueryExecutionId=query_id
)
if AthenaQueryExecution(query_execution).state in terminal_states:
return ...
await asyncio.sleep(self._poll_interval) # yields to event loop!Retry Mechanism
The existing retry_api_call() in pyathena/util.py uses tenacity.Retrying (synchronous). For the async version, the entire retry_api_call() invocation (including retries) will be wrapped with asyncio.to_thread(). This is simpler than using tenacity.AsyncRetrying and keeps the blocking I/O fully contained in a thread:
async def async_retry_api_call(func, config, logger=None, *args, **kwargs):
return await asyncio.to_thread(retry_api_call, func, config, logger, *args, **kwargs)Implementation Plan
Phase 1: Core Infrastructure
Create the pyathena/aio/ package with the foundational classes.
Files to create:
pyathena/aio/__init__.py— Package init, re-exportspyathena/aio/util.py—async_retry_api_call()helperpyathena/aio/connection.py—AsyncConnectionasync __aenter__/async __aexit__cursor()factory returningAioCursorby default- Handle STS calls (
assume_role,get_session_token) viaasyncio.to_thread()in a classmethod factory or during__aenter__ - Note:
boto3.Session()creation is sync and doesn't make network calls — only STS calls need async treatment
pyathena/aio/common.py—AioBaseCursor- Inherits from
BaseCursorto reuse request builders (_build_start_query_execution_request, etc.) - Overrides all I/O methods with async versions using
async_retry_api_call() - Replaces
time.sleep()withasyncio.sleep()in polling - Handles
KeyboardInterrupt/asyncio.CancelledErrorin polling
- Inherits from
pyathena/aio/cursor.py—AioCursor,AioDictCursorasync execute()→ returnsAioCursor(for chaining)async executemany()async fetchone(),async fetchmany(),async fetchall()async cancel(),async close()__aiter__/__anext__forasync forsupportasync __aenter__/async __aexit__
pyathena/aio/result_set.py—AioResultSet,AioDictResultSet- Extends
AthenaResultSetwith async fetch methods async fetchone(),async fetchmany(),async fetchall()__aiter__/__anext__for async iteration- Async
_pre_fetch()and_fetch()usingasync_retry_api_call()
- Extends
pyathena/__init__.py— Addaconnect()top-level function
Files to create for tests:
tests/pyathena/aio/__init__.pytests/pyathena/aio/test_cursor.py— Tests forAioCursortests/pyathena/aio/test_connection.py— Tests forAsyncConnection
Phase 2: Specialized Cursors (incremental, one PR per cursor type)
Each follows the same pattern as the sync counterpart but with async methods.
pyathena/aio/pandas/—AioPandasCursor+AioPandasResultSetpyathena/aio/arrow/—AioArrowCursor+AioArrowResultSetpyathena/aio/polars/—AioPolarsCursor+AioPolarsResultSetpyathena/aio/s3fs/—AioS3FSCursor+AioS3FSResultSetpyathena/aio/spark/—AioSparkCursor+AioSparkBaseCursor
Phase 3: Integration & Polish
- Documentation updates (README, sphinx docs)
- SQLAlchemy dialect integration (if applicable — SQLAlchemy 2.0 has async support)
- Performance benchmarks vs sync + ThreadPoolExecutor approaches
Design Decisions
1. Module Structure: pyathena/aio/ Package
Mirrors the existing package structure under a new aio/ namespace:
pyathena/aio/
├── __init__.py
├── util.py # async_retry_api_call
├── connection.py # AsyncConnection
├── common.py # AioBaseCursor
├── cursor.py # AioCursor, AioDictCursor
├── result_set.py # AioResultSet, AioDictResultSet
├── pandas/
│ ├── cursor.py # AioPandasCursor
│ └── result_set.py # AioPandasResultSet
├── arrow/
│ ├── cursor.py # AioArrowCursor
│ └── result_set.py # AioArrowResultSet
├── polars/
│ ├── cursor.py # AioPolarsCursor
│ └── result_set.py # AioPolarsResultSet
├── s3fs/
│ ├── cursor.py # AioS3FSCursor
│ └── result_set.py # AioS3FSResultSet
└── spark/
├── common.py # AioSparkBaseCursor
└── cursor.py # AioSparkCursor
2. Naming: Aio Prefix
Use Aio prefix (e.g., AioCursor) to distinguish from the existing Async prefix (e.g., AsyncCursor) which uses threads. This is consistent with the aio package name and avoids confusion.
3. Inheritance: Extend Sync Classes
AioBaseCursor inherits from BaseCursor to reuse:
- All request builder methods (
_build_*) — pure computation, no I/O - Constants (
LIST_QUERY_EXECUTIONS_MAX_RESULTS, etc.) __init__parameter handlingconnectionproperty
Override only the I/O methods with async versions. This minimizes code duplication while providing clean async interfaces.
4. asyncio.to_thread() for All Blocking Calls
Every boto3 API call is wrapped with asyncio.to_thread():
response = await asyncio.to_thread(
retry_api_call,
self._connection.client.some_api_call,
config=self._retry_config,
logger=_logger,
**request,
)This keeps the implementation simple and dependency-free.
5. Connection Initialization
boto3.Session() creation is synchronous and fast (no network calls). STS operations (assume_role, get_session_token) are the only potentially blocking calls during connection setup.
Options (to be decided during implementation):
- Factory classmethod:
conn = await AsyncConnection.create(...)— cleanest for STS cases - Module function:
conn = await pyathena.aconnect(...)— most user-friendly - Lazy STS: defer STS calls to first
execute()— simplest__init__
Recommended: provide both aconnect() (module-level) and AsyncConnection (class-level).
6. Cancellation Handling
Replace KeyboardInterrupt handling in polling with asyncio.CancelledError:
async def _poll(self, query_id):
try:
return await self.__poll(query_id)
except asyncio.CancelledError:
if self._kill_on_interrupt:
await self._cancel(query_id)
return await self.__poll(query_id)
raise7. No New Dependencies
This feature adds zero new dependencies. Only standard library asyncio is used.
Implementation Notes for Handoff
Key Patterns to Follow
-
Request builders are sync and reusable — All
_build_*methods inBaseCursorand request construction logic can be called directly from async code (no I/O involved). -
retry_api_call()wrapping pattern — Every sync method that callsretry_api_call()needs an async counterpart usingasyncio.to_thread(retry_api_call, ...). -
Result set init calls
_pre_fetch()— TheAthenaResultSet.__init__()calls_pre_fetch()which makes a boto3 call. ForAioResultSet, this needs to be deferred to an async factory method or called externally after construction. -
S3 client in result set —
AthenaResultSet.__init__()creates an S3 client (connection.session.client("s3", ...)) for reading results. This is sync and fast, so it can remain in__init__. -
Converter and formatter are sync — Type conversion and parameter formatting involve no I/O and can be reused as-is.
-
Test structure — Tests mirror source structure under
tests/pyathena/. Existing tests intest_async_cursor.pyprovide patterns for testing concurrent execution. New tests should usepytest-asyncioand follow similar patterns withasync def test_*functions.
Critical Implementation Detail: AioResultSet Initialization
AthenaResultSet.__init__() immediately calls self._pre_fetch() which issues a blocking get_query_results API call. For async:
class AioResultSet(AthenaResultSet):
def __init__(self, ...):
# Override to skip _pre_fetch in __init__
# Call parent's __init__ logic but defer the fetch
...
async def _pre_fetch(self):
response = await async_retry_api_call(
self.connection.client.get_query_results, ...)
self._process_metadata(response) # sync, pure computation
self._process_rows(response) # sync, pure computation
@classmethod
async def create(cls, connection, converter, query_execution, ...):
instance = cls.__new__(cls)
# Initialize fields
...
if instance.state == AthenaQueryExecution.STATE_SUCCEEDED:
instance._rownumber = 0
await instance._pre_fetch()
return instanceDependencies Check
boto3>=1.26.4— sync AWS SDK (wrapped withasyncio.to_thread())botocore>=1.29.4— underlying sync HTTP clienttenacity>=4.1.0— retry logic (used synchronously inside threads)- Python
>=3.10—asyncio.to_thread()available since 3.9 - No new dependencies needed
Open Questions
- SQLAlchemy async dialect — SQLAlchemy 2.0+ supports async via
create_async_engine(). Should we addawsathena+aio://dialect entries? (Probably Phase 3) async_generatorforfetchmany()— Should we also provideasync def fetchmany_iter()that yields chunks as an async generator?- Thread pool configuration — Should
AsyncConnectionaccept a customExecutorforasyncio.to_thread()override, or rely on the default thread pool?