This guide walks you through the basics of using async-cassandra. For complete API documentation, see the API Reference.
pip install async-cassandra- Cassandra 4.0+ - Required for CQL protocol v5 support
- Python 3.12+ - For modern async features
- CQL Protocol v5+ - Older protocols are not supported
Note on Protocol Versions: async-cassandra requires CQL protocol v5 or higher for optimal async performance. If you're using Cassandra 3.x or older, you'll need to upgrade. See the protocol version documentation for details.
This example shows the minimum code needed to connect to Cassandra and run a query:
import asyncio
from async_cassandra import AsyncCluster
async def main():
# Use context managers for automatic cleanup (REQUIRED)
async with AsyncCluster(['localhost']) as cluster:
async with cluster.connect('my_keyspace') as session:
# Execute a query (waits for results without blocking)
result = await session.execute("SELECT * FROM users LIMIT 10")
# Process results (rows are like dictionaries)
for row in result:
print(f"User: {row.name}, Email: {row.email}")
# No manual cleanup needed - context managers handle it!
# Run the async function
asyncio.run(main())- Import AsyncCluster instead of Cluster
- Use
awaitfor all database operations - Wrap your code in an async function
- No blocking - your app stays responsive
Context managers ensure that resources are properly cleaned up after use, even if errors occur. When you use async with, Python guarantees that cleanup code runs no matter what happens.
Why this matters: async-cassandra's streaming operations can leak memory if not properly closed due to circular references in the callback system. Context managers prevent this.
📖 Want to understand how context managers work? See our detailed explanation of context managers to learn what happens behind the scenes.
Without context manager (manual cleanup):
cluster = AsyncCluster(['localhost'])
session = await cluster.connect('my_keyspace')
try:
result = await session.execute("SELECT * FROM users")
finally:
await session.close() # You must remember this
await cluster.shutdown() # And this!With context manager (automatic cleanup):
async with AsyncCluster(['localhost']) as cluster:
async with await cluster.connect('my_keyspace') as session:
result = await session.execute("SELECT * FROM users")
# Python automatically calls close() and shutdown() for youBenefits:
- No forgotten cleanup (prevents connection leaks)
- Cleaner code
- Exception safe (cleanup happens even if query fails)
If your Cassandra cluster requires authentication (most production clusters do), use the create_with_auth helper:
cluster = AsyncCluster.create_with_auth(
contact_points=['localhost'],
username='cassandra',
password='cassandra'
)
# Then connect as usual
session = await cluster.connect('my_keyspace')Note: Never hardcode credentials in your code. Use environment variables or a secrets manager:
import os
cluster = AsyncCluster.create_with_auth(
contact_points=['localhost'],
username=os.environ['CASSANDRA_USER'],
password=os.environ['CASSANDRA_PASS']
)Prepared statements are required for parameterized queries in async-cassandra. They provide:
- Security - Prevents CQL injection attacks (like SQL injection)
- Performance - Query is parsed once, executed many times
- Type Safety - Cassandra validates parameter types
# ❌ This will NOT work - direct parameters not supported
await session.execute("SELECT * FROM users WHERE id = ?", [user_id])
# ✅ This works - prepare first, then execute
prepared = await session.prepare("SELECT * FROM users WHERE id = ?")
result = await session.execute(prepared, [user_id])# Prepare once (usually at startup)
user_query = await session.prepare("SELECT * FROM users WHERE id = ?")
insert_stmt = await session.prepare(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)"
)
# Execute many times with different parameters
for user_id in user_ids:
result = await session.execute(user_query, [user_id])
user = result.one()
print(user)
# Insert new users
await session.execute(insert_stmt, [uuid.uuid4(), "Alice", "alice@example.com"])In distributed systems like Cassandra, failures are normal, not exceptions. Network issues, node failures, and timeouts happen regularly. Your application needs to handle these gracefully.
from async_cassandra import ConnectionError, QueryError
try:
result = await session.execute("SELECT * FROM users")
except ConnectionError as e:
# Happens when: Can't reach Cassandra, node is down, network issues
print(f"Connection failed: {e}")
# What to do: Retry, use a different node, alert ops team
except QueryError as e:
# Happens when: Bad CQL syntax, table doesn't exist, permissions issue
print(f"Query failed: {e}")
# What to do: Fix the query, check schema, verify permissionsimport asyncio
from async_cassandra import ConnectionError, QueryError
async def get_user_with_retry(session, user_id, max_retries=3):
"""Get user with automatic retry on connection errors."""
for attempt in range(max_retries):
try:
prepared = await session.prepare("SELECT * FROM users WHERE id = ?")
result = await session.execute(prepared, [user_id])
return result.one()
except ConnectionError as e:
if attempt == max_retries - 1:
# Final attempt failed, re-raise
raise
# Wait before retry (exponential backoff)
wait_time = 2 ** attempt
print(f"Connection failed, retrying in {wait_time}s...")
await asyncio.sleep(wait_time)
except QueryError as e:
# Don't retry query errors - they won't fix themselves
print(f"Query error (not retrying): {e}")
raiseWhen you query millions of rows, loading them all into memory will crash your application:
# ❌ BAD: This loads ALL rows into memory at once
result = await session.execute("SELECT * FROM billion_row_table")
for row in result: # 💥 OutOfMemoryError
process(row)Streaming fetches rows in small batches (pages) as you need them:
from async_cassandra.streaming import StreamConfig
# Configure streaming with page size
config = StreamConfig(
fetch_size=1000 # Fetch 1000 rows at a time
)
# ⚠️ CRITICAL: Always use context manager to prevent memory leaks!
# Streaming results MUST be properly closed or memory will leak
async with await session.execute_stream(
"SELECT * FROM billion_row_table",
stream_config=config
) as result:
# Process rows one at a time - only 1000 in memory at once
async for row in result:
await process_row(row)
# Previous rows are garbage collected
# Result automatically closed when exiting contextIMPORTANT: Streaming result sets create internal callbacks that can cause memory leaks if not properly closed. You MUST use one of these patterns:
✅ BEST PRACTICE - Context Manager (Recommended):
# This ensures cleanup even if exceptions occur
async with await session.execute_stream(query, stream_config=config) as result:
async for row in result:
await process_row(row)
# Automatically closed, preventing memory leaks✅ Alternative - try/finally:
# Only use if context manager isn't possible
result = await session.execute_stream(query, stream_config=config)
try:
async for row in result:
await process_row(row)
finally:
await result.close() # CRITICAL: Must always close!❌ NEVER DO THIS - Memory Leak:
# This will leak memory!
result = await session.execute_stream(query, stream_config=config)
async for row in result:
process_row(row)
# Result not closed - callbacks remain in memory forever!The streaming implementation uses callbacks to coordinate with the Cassandra driver. These callbacks create circular references that Python's garbage collector cannot clean up automatically. Without explicit cleanup via close() or a context manager, these references persist indefinitely, causing memory leaks.
📖 Learn more: For a detailed explanation of how context managers prevent these memory leaks, see Understanding Context Managers.
- ✅ Exporting data
- ✅ ETL processes
- ✅ Generating reports from large datasets
- ✅ Any query returning thousands+ of rows
- ❌ Don't use for small queries (adds overhead)
Here's a real-world example showing proper streaming usage with error handling:
import asyncio
from async_cassandra import AsyncCluster
from async_cassandra.streaming import StreamConfig
async def export_user_data(session, output_file):
"""Export all users to a CSV file using streaming."""
config = StreamConfig(
fetch_size=5000, # Process 5000 rows at a time
page_callback=lambda page_num, rows: print(f"Processing page {page_num}...")
)
# CRITICAL: Use context manager for automatic cleanup
async with await session.execute_stream(
"SELECT id, name, email, created_at FROM users",
stream_config=config
) as result:
with open(output_file, 'w') as f:
f.write("id,name,email,created_at\n")
row_count = 0
async for row in result:
f.write(f"{row.id},{row.name},{row.email},{row.created_at}\n")
row_count += 1
# Progress update every 10,000 rows
if row_count % 10000 == 0:
print(f"Exported {row_count:,} users...")
# Result is automatically closed here, preventing memory leaks
print(f"Export complete! Total users: {row_count:,}")
return row_count
# Usage
async def main():
async with AsyncCluster(['localhost']) as cluster:
async with await cluster.connect('my_keyspace') as session:
await export_user_data(session, 'users_export.csv')
asyncio.run(main())Key Points:
- Context manager (
async with) ensures cleanup even if errors occur - Large
fetch_size(5000) for better performance with small rows - Progress tracking without keeping all data in memory
- File operations inside the streaming context for consistency
FastAPI is an async web framework. If you use the regular Cassandra driver, it will block your entire web server during database queries. This means your API can't handle other requests while waiting for Cassandra to respond.
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from async_cassandra import AsyncCluster
import uuid
# Global variables for session and prepared statements
session = None
prepared_statements = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage Cassandra connection lifecycle."""
global session, prepared_statements
# Startup: Create connection with context manager
async with AsyncCluster(['localhost']) as cluster:
async with cluster.connect('my_keyspace') as session:
# Prepare statements once at startup (more efficient)
prepared_statements['get_user'] = await session.prepare(
"SELECT * FROM users WHERE id = ?"
)
prepared_statements['create_user'] = await session.prepare(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)"
)
yield # Server runs here
# Cleanup happens automatically when context exits
app = FastAPI(lifespan=lifespan)@app.get("/users/{user_id}")
async def get_user(user_id: str):
"""Get a user by ID."""
try:
# Convert string to UUID
user_uuid = uuid.UUID(user_id)
# Execute prepared statement
result = await session.execute(
prepared_statements['get_user'],
[user_uuid]
)
user = result.one()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": str(user.id),
"name": user.name,
"email": user.email
}
except ValueError:
raise HTTPException(status_code=400, detail="Invalid user ID format")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/users")
async def create_user(name: str, email: str):
"""Create a new user."""
user_id = uuid.uuid4()
await session.execute(
prepared_statements['create_user'],
[user_id, name, email]
)
return {"id": str(user_id), "name": name, "email": email}- Global session: Creating connections is expensive - do it once at startup
- Prepared statements at startup: Better performance, prepare once, use many times
- Proper error handling: Convert Cassandra errors to HTTP errors
- UUID handling: Cassandra uses UUIDs, web uses strings - convert properly
CQL (Cassandra Query Language) protocol versions define how clients communicate with Cassandra. async-cassandra requires protocol v5 or higher because:
- Better streaming: v5+ has improved streaming capabilities for large result sets
- Enhanced features: Modern error handling, better metadata, improved performance
- Async alignment: Features that work better with async patterns
# Option 1: Automatic negotiation (RECOMMENDED)
# Driver negotiates to highest available version
cluster = AsyncCluster(['localhost']) # Gets v5 (highest currently supported)
# Option 2: Explicitly specify v5
cluster = AsyncCluster(['localhost'], protocol_version=5) # Forces v5 exactly
# ❌ This will raise ConfigurationError
cluster = AsyncCluster(['localhost'], protocol_version=4)
# Error: Protocol version 4 is not supported. async-cassandra requires CQL protocol v5 or higher...Important: async-cassandra verifies protocol v5+ after connection:
- Driver negotiates to the highest available version (currently v5)
- Fails with clear error if server only supports v4 or lower
- Ensures compatibility with modern Cassandra features
Note: As of Cassandra 5.0, the maximum supported protocol version is v5. Protocol v5 is considered stable and is the recommended version for production use.
If you're currently using Cassandra 3.x (protocol v4) or older:
-
Upgrade Cassandra: Version 4.0+ supports protocol v5
# Check your current Cassandra version nodetool version -
Cloud Services: Most cloud providers already support v5+
- AWS Keyspaces: Supports v4 and v5
- Azure Cosmos DB: Check current documentation
- DataStax Astra: Supports v5+
-
Migration Path:
- Test with Cassandra 4.x in development first
- Protocol v5 is backward compatible with v4 features
- No code changes needed beyond the upgrade
# Error when using old protocol
try:
cluster = AsyncCluster(['localhost'], protocol_version=3)
except ConfigurationError as e:
print(e)
# Output: Protocol version 3 is not supported. async-cassandra requires
# CQL protocol v5 or higher for optimal async performance...- API Reference - Complete API documentation
- Connection Pooling - Understanding connection behavior
- Streaming - Handling large result sets
- Performance - Optimization tips
- FastAPI Example - Full production example