A SQL-first Python library for querying and mapping data across multiple database backends.
- Multi-Database Support: SQLite, PostgreSQL, MySQL, Oracle with unified interface
- SQL-First Design: Load queries from
.sqlfiles organized in namespaces, or pass inline SQL directly - Inline SQL: Execute raw SQL strings alongside registry keys — no registry required for ad-hoc queries
- SQL Sanitization: Configurable sanitizer strips comments, blocks statement stacking, and restricts SQL verbs
- Flexible Mapping: Map results to dataclasses, Pydantic models, or plain classes
- Aggregate Mapping: Reconstruct complex object graphs from joined queries (single-pass O(n))
- Transaction Management: Context manager support with automatic rollback
- Migration Management: Version-controlled database migrations
- Repository Pattern: Optional DDD-style repository base classes
- Async Support: Full async/await support for all operations
- Type Safe: Fully typed with mypy strict mode
pip install rowquerypip install rowquery[postgres] # PostgreSQL
pip install rowquery[mysql] # MySQL
pip install rowquery[oracle] # Oracle
pip install rowquery[all] # All driverssql/
user/
get_by_id.sql
list_active.sql
order/
create.sql
from row_query import Engine, ConnectionConfig, SQLRegistry
config = ConnectionConfig(driver="sqlite", database="app.db")
registry = SQLRegistry("sql/")
engine = Engine.from_config(config, registry)
# Registry key lookup (dot-separated namespace)
user = engine.fetch_one("user.get_by_id", {"id": 1})
users = engine.fetch_all("user.list_active")
count = engine.fetch_scalar("user.count")
# Inline SQL — pass a raw SQL string directly
user = engine.fetch_one("SELECT * FROM users WHERE id = ?", 1)
users = engine.fetch_all("SELECT * FROM users WHERE active = ?", True)
count = engine.fetch_scalar("SELECT COUNT(*) FROM users")
rows = engine.fetch_all("SELECT * FROM users WHERE id IN (?, ?)", [1, 2])from dataclasses import dataclass
from row_query.mapping import ModelMapper
@dataclass
class User:
id: int
name: str
email: str
mapper = ModelMapper(User)
user = engine.fetch_one("user.get_by_id", {"id": 1}, mapper=mapper)
# Returns: User(id=1, name="Alice", email="alice@example.com")from row_query.mapping import aggregate, AggregateMapper
from dataclasses import dataclass
@dataclass
class Order:
id: int
total: float
@dataclass
class UserWithOrders:
id: int
name: str
email: str
orders: list[Order]
# Build mapping plan for complex object graph
plan = (
aggregate(UserWithOrders, prefix="user__")
.key("id")
.auto_fields()
.collection("orders", Order, prefix="order__", key="id")
.build()
)
# Execute joined query and map in single pass
users = engine.fetch_all("user.with_orders", mapper=AggregateMapper(plan))# Use context manager for automatic rollback on error
with engine.transaction() as tx:
tx.execute("user.create", {"name": "Alice", "email": "alice@example.com"})
tx.execute("audit.log", {"action": "user_created"})
# Commits on exit, rolls back on exceptionfrom row_query import Engine, SQLSanitizer
# Configure what inline SQL is permitted
sanitizer = SQLSanitizer(
strip_comments=True, # Remove -- and /* */ comments (default: True)
block_multiple_statements=True, # Reject "SELECT 1; DROP TABLE t" (default: True)
allowed_verbs=frozenset({"SELECT"}), # Only allow SELECT statements (default: None = any)
)
engine = Engine.from_config(config, registry, sanitizer=sanitizer)
# Inline SQL is sanitized before execution
users = engine.fetch_all("SELECT * FROM users -- get all") # comment stripped
engine.execute("DROP TABLE users") # raises SQLSanitizationError (verb not allowed)
engine.execute("SELECT 1; DROP TABLE t") # raises SQLSanitizationError (multiple statements)
# Registry queries are always trusted and never sanitized
users = engine.fetch_all("user.list_active") # no sanitization appliedfrom row_query import AsyncEngine, ConnectionConfig
config = ConnectionConfig(driver="sqlite", database="app.db")
engine = AsyncEngine.from_config(config, registry)
async def fetch_users():
# Registry key or inline SQL — both work
users = await engine.fetch_all("user.list_active")
users = await engine.fetch_all("SELECT * FROM users WHERE active = ?", True)
return users
# Async transactions
async with engine.transaction() as tx:
await tx.execute("user.create", {"name": "Bob"})
await tx.execute("INSERT INTO audit (action) VALUES (?)", "user_created")- Examples - Runnable code examples
- CONTRIBUTING.md - Development guide
- CHANGELOG.md - Version history
This project uses uv for package management.
# Install dependencies
uv sync --extra all --extra dev
# Run tests
uv run pytest
# Run tests with coverage
uv run pytest --cov=row_query --cov-report=html
# Lint and format
uv run ruff check row_query/ tests/
uv run ruff format row_query/ tests/
# Type check
uv run mypy row_query/MIT License - see LICENSE for details.
Contributions welcome! Please read CONTRIBUTING.md first.