Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ MAX_REQUEST_BODY_MB=50
LOG_LEVEL=INFO

# --- Datastore engine ---
# Selects the storage backend:
# Selects the storage backend (must match a folder under
# `datastore/infrastructure/engines/`):
# bigquery — real BigQuery adapter (placeholder while being built).
# ducklake — Future planned
# ducklake — Future planned.
DATASTORE_ENGINE=bigquery
BQ_PROJECT=
BIGQUERY_PROJECT=
BIGQUERY_DATASET=
BIGQUERY_CREDENTIALS=
BIGQUERY_CREDENTIALS_RO=
Comment on lines +10 to +18
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep .env.example aligned with the actual settings model.

HTTP_MAX_CONNECTIONS and HTTP_MAX_KEEPALIVE_CONNECTIONS are documented here, but datastore/core/config.py does not declare either field, so pydantic-settings will silently ignore them. INCLUDE_UPDATED_AT was added to Config as well, but operators still don't get an example for that toggle here.

Also applies to: 26-27

🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 15-15: [UnorderedKey] The BIGQUERY_PROJECT key should go before the DATASTORE_ENGINE key

(UnorderedKey)


[warning] 16-16: [UnorderedKey] The BIGQUERY_DATASET key should go before the BIGQUERY_PROJECT key

(UnorderedKey)


[warning] 17-17: [UnorderedKey] The BIGQUERY_CREDENTIALS key should go before the BIGQUERY_DATASET key

(UnorderedKey)


[warning] 18-18: [UnorderedKey] The BIGQUERY_CREDENTIALS_RO key should go before the BIGQUERY_DATASET key

(UnorderedKey)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.env.example around lines 10 - 18, The .env.example is missing entries that
match the fields declared in datastore/core/config.py; add entries for
HTTP_MAX_CONNECTIONS, HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT
(with example values or blanks) using the exact variable names used in Config so
pydantic-settings doesn't ignore them, and update the other relevant section
mentioned in the comment to include the same variables for consistency.

SQL_FUNCTIONS_ALLOW_FILE=

# --- CKAN auth gate ---
Expand All @@ -21,6 +25,13 @@ CKAN_URL=
HTTP_TIMEOUT_SECONDS=10
AUTH_CACHE_TTL=10

# --- System columns + search ---
# Toggle the per-row `_updated_at` TIMESTAMP system column. False = `_id` only.
INCLUDE_UPDATED_AT=true
# Hard cap on `datastore_search` / `datastore_search_sql` `limit`. Requests
# above this return 400. Raise this only when downstream clients can stream.
SEARCH_RESULT_ROWS_MAX=32000

# --- Cache ---
# Empty REDIS_URL keeps the in-process InMemoryCache (single-pod only).
REDIS_URL=
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ Thumbs.db
# Environment files
.env
.env.local

# Local-only test engine — synthetic data for local dev; never push.
# Pair with DATASTORE_ENGINE=bigquery_test (which is NOT in the committed
# Config Literal, so flip to a non-Literal locally if you need to run it).
datastore/infrastructure/engines/bigquery_test/
93 changes: 47 additions & 46 deletions CLAUDE.md

Large diffs are not rendered by default.

89 changes: 61 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,38 @@ datastore/
├── registry.py # get_datastore_engine + get_allowed_sql_functions;
│ # dynamic importlib dispatch keyed on
│ # context.config.DATASTORE_ENGINE
├──bigquery/ # Engine package (one folder per backend).
| ├── __init__.py # Re-exports `BigQueryBackend`
| ├── backend.py # google-cloud-bigquery adapter (placeholder)
| ├── lib.py # Backend-specific helpers (optional)
| └── allowed_functions.txt # Per-engine datastore_search_sql
| # function allow-list — one name per
| # line, `#` comments allowed.
└── ducklake/ # Future planned engine
├── bigquery/ # Engine package (one folder per backend).
| ├── __init__.py # Exports `Backend = BigQueryBackend` —
| | # the registry imports `Backend`, so the
| | # concrete class name is engine-private.
| ├── backend.py # DatastoreBackend subclass (placeholder)
| ├── client.py # google-cloud-bigquery `Client` construction
| ├── lib.py # Backend-specific helpers (optional)
| └── allowed_functions.txt # Per-engine datastore_search_sql
| # function allow-list — one name per
| # line, `#` comments allowed.
└── ducklake/ # Future planned engine
```

To add a new engine (e.g. `ducklake`), drop a sibling folder with the
same four files. `DATASTORE_ENGINE` is validated against the set of
engine subdirectories that exist at process start, and the factory
dispatches via `importlib` — no `registry.py` / `config.py` edits.
To add a new engine (e.g. `ducklake`), drop a sibling folder following
the same layout (`__init__.py` exports `Backend = <YourBackend>`,
`backend.py` subclasses `DatastoreBackend`, plus an `allowed_functions.txt`).
`DATASTORE_ENGINE` is validated against the set of engine subdirectories
that exist at process start, and the factory imports each engine's
`Backend` via `importlib` — no `registry.py` / `config.py` edits.

## Column definitions

**Goal:** make Frictionless schema the native column shape while staying
drop-in compatible with existing CKAN clients during migration.

`datastore_create` accepts one of two input shapes:

| Shape | Keys | Status |
|---|---|---|
| Frictionless `schema` | `schema` — [Frictionless Table Schema](https://specs.frictionlessdata.io/table-schema/) | Recommended |
| Legacy CKAN `fields` | `fields`, `primary_key` | Deprecated; emits a `warnings` entry |


## Roadmap

Expand All @@ -75,26 +93,30 @@ What's shipped and what's next. Tick each box as the change set lands.
### Done

- [x] Foundation (app factory, lifespan, middleware, Dockerfile, Makefile, env config)
- [x] CKAN API surface mounted at `/api/3/action/datastore_*` (`datastore_create` live; 5 others return 501)
- [x] Health endpoints `/`, `/health`, `/ready` returning the CKAN envelope shape
- [x] Strict request validation (`DatastoreCreateRequest` + `FieldSpec`)
- [x] CKAN error envelope mapping (`APIError` taxonomy + handlers)
- [x] All six `datastore_*` actions wired end-to-end:
- `datastore_create`, `datastore_upsert`, `datastore_delete`
- `datastore_search` (streaming JSON / CSV / TSV; CKAN `_links` pagination)
- `datastore_search_sql` (sqlglot parses tables + functions; per-table
CKAN authorize; per-engine function allow-list)
- `datastore_info` (column schema + free-form `meta` dict)
- [x] Health endpoints `/`, `/health`, `/ready` returning the CKAN envelope shape.
`/ready` builds the rw + ro engine instances during lifespan and probes
`engine.healthcheck()` on each — 503 with a `Service Unavailable` envelope
if either fails (so k8s pulls the pod from the Service).
- [x] Strict request validation (Pydantic) + structured error envelopes
- [x] CKAN auth gate with TTL cache (InMemory by default; Redis when `REDIS_URL` is set)
- [x] Request context bundle (`RequestContext` / `ContextDep` / bound `CKANClient`)
- [x] Service-layer separation (`create_datastore`)
- [x] Engine abstraction + factory (`DatastoreBackend` ABC + `registry.py`)
- [x] Pydantic response models with nested `Result` per endpoint
- [x] End-to-end TestClient suite + service-level unit tests
- [x] Service / engine / streaming layer separation
- [x] Engine-agnostic registry — drop a folder under `infrastructure/engines/<name>/`
exporting `Backend`; `DATASTORE_ENGINE` is validated against engine directories
on disk, no registry / config edit required.

### Next

- [ ] Wire the remaining datastore endpoints (`upsert`, `delete`, `search`, `search_sql`, `info`)
- [ ] Real BigQuery backend (replace the placeholder in `infrastructure/engines/bigquery/backend.py`)
- [ ] Streaming search responses (JSON / CSV / TSV; ≈ 1-row peak memory)
- [ ] Real `/ready` healthcheck — wire engine instances through the lifespan
- [ ] DuckLake backend (second concrete engine implementing the same ABC)
- [ ] DuckLake backend (second concrete engine — same ABC, drop-in folder)
- [ ] Observability — JSON structured logs + request-id middleware
- [ ] Opt-in query-result cache (deferred until BigQuery + streaming land)
- [ ] Opt-in query-result cache (deferred until BigQuery lands)


## CKAN-side requirement
Expand Down Expand Up @@ -153,7 +175,9 @@ Every entry below maps 1:1 to a field on `datastore.core.config.Config`. See [.e
| `MAX_REQUEST_BODY_MB` | `50` | Reject request bodies larger than this (MB) |
| `DATASTORE_ENGINE` | `bigquery` | Storage backend — must match a folder under `infrastructure/engines/`; validated at startup |
| `SQL_FUNCTIONS_ALLOW_FILE` | _(empty)_ | Override path to the `datastore_search_sql` function allow-list; defaults to `<engine>/allowed_functions.txt` |
| `BQ_PROJECT` | _(empty)_ | Google Cloud project ID for the BigQuery backend |
| `BIGQUERY_PROJECT` | _(empty)_ | Google Cloud project ID. Required when `DATASTORE_ENGINE=bigquery`; unset → `/ready` returns 503 with a clear warning. |
| `BIGQUERY_CREDENTIALS` | _(empty)_ | Read-write service-account creds. Accepts a JSON blob (leading `{`), a path to a service-account JSON file, or empty (→ Application Default Credentials). |
| `BIGQUERY_CREDENTIALS_RO` | _(empty)_ | Read-only service-account creds (same format). Empty → falls back to `BIGQUERY_CREDENTIALS` so single-credential deployments work. |
| `REDIS_URL` | _(empty)_ | Redis URL for cache; empty → in-process `InMemoryCache` |
| `CKAN_URL` | _(empty)_ | Base URL of the CKAN instance (required when `AUTH_ENABLED=true`) |
| `HTTP_TIMEOUT_SECONDS` | `10` | Timeout for outbound CKAN calls (seconds) |
Expand Down Expand Up @@ -213,8 +237,17 @@ class DatastoreCreateResponse(ResponseModel):
class Result(BaseModel):
resource_id: str
package_id: str | None = None
fields: list[FieldSpec]
primary_key: list[str] = Field(default_factory=list)
# Canonical Frictionless Table Schema (carries `primaryKey` inside).
schema: dict[str, Any]
# Legacy mirror — marked deprecated in OpenAPI / IDE tooltips.
fields: Annotated[
list[FieldSpec],
Field(deprecated="use 'schema' (Frictionless Table Schema) instead"),
]
primary_key: Annotated[
list[str],
Field(deprecated="use 'schema.primaryKey' instead"),
]
records: list[dict[str, Any]] | None = None # when include_records=True
total: int | None = None # when include_total=True

Expand Down
96 changes: 69 additions & 27 deletions datastore/api/endpoints/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,37 @@

from typing import Annotated

from fastapi import APIRouter, HTTPException, Query
from fastapi import APIRouter, Query
from starlette.requests import Request
from starlette.responses import StreamingResponse

from datastore.api.context import Context
from datastore.api.responses import ORJSONResponse, _success_response
from datastore.api.responses import _deprecation_warnings, _success_response
from datastore.schemas.request import (
DatastoreCreateRequest,
DatastoreDeleteRequest,
DatastoreInfoRequest,
DatastoreSearchRequest,
DatastoreSearchSQLRequest,
DatastoreUpsertRequest,
)
from datastore.schemas.responses import (
DatastoreCreateResponse,
DatastoreDeleteResponse,
DatastoreInfoResponse,
DatastoreSearchResponse,
DatastoreUpsertResponse,
)
from datastore.services.read import search_datastore, search_sql_datastore
from datastore.services.write import create_datastore, upsert_datastore
from datastore.services.read import (
info_datastore,
search_datastore,
search_sql_datastore,
)
from datastore.services.write import (
create_datastore,
delete_datastore,
upsert_datastore,
)

router = APIRouter(tags=["datastore"])

Expand All @@ -43,20 +55,21 @@ async def datastore_create(
package_id=payload.resource.get("package_id"),
permission="create",
)

data_dict.update(
{
"resource": payload.resource_id or payload.resource,
"fields": payload.fields,
"schema": payload.schema,
"records": payload.records,
"primary_key": payload.primary_key,
"include_records": payload.include_records,
"include_total": payload.include_total,
}
)

result = await create_datastore(context, data_dict)
return _success_response(request, result)
warnings = _deprecation_warnings(payload)

return _success_response(request, result, warnings=warnings or None)


@router.post("/datastore_upsert", response_model=DatastoreUpsertResponse)
Expand All @@ -75,11 +88,6 @@ async def datastore_upsert(
return _success_response(request, result)


@router.post("/datastore_delete")
def datastore_delete() -> ORJSONResponse:
raise HTTPException(status_code=501, detail="datastore_delete is not implemented")


@router.get("/datastore_search", response_model=DatastoreSearchResponse)
async def datastore_search(
request: Request,
Expand All @@ -100,9 +108,7 @@ async def datastore_search(
permission="read",
)
data_dict.update(params.model_dump())
body_iter = await search_datastore(
context, data_dict, request_url=str(request.url)
)
body_iter = await search_datastore(context, data_dict, request_url=str(request.url))
return StreamingResponse(body_iter, media_type="application/json")


Expand All @@ -113,23 +119,59 @@ async def datastore_search_sql(
params: Annotated[DatastoreSearchSQLRequest, Query()],
):
"""`GET /api/3/datastore_search_sql` — execute a raw SQL SELECT and stream.
Accepts a single `sql` query parameter;
Accepts a single `sql` query parameter;
"""
for resource_id in params.resource_ids:
await context.auth.authorize(
resource_id=resource_id, permission="read"
)
await context.auth.authorize(resource_id=resource_id, permission="read")

data_dict = params.model_dump() | {
"function_names": params.function_names,
}

body_iter = await search_sql_datastore(
context, data_dict, request_url=str(request.url)
)

body_iter = await search_sql_datastore(context, data_dict, request_url=str(request.url))
return StreamingResponse(body_iter, media_type="application/json")


@router.get("/datastore_info")
def datastore_info() -> ORJSONResponse:
raise HTTPException(status_code=501, detail="datastore_info is not implemented")
@router.get("/datastore_info", response_model=DatastoreInfoResponse)
async def datastore_info(
request: Request,
context: Context,
params: Annotated[DatastoreInfoRequest, Query()],
):
"""`GET /api/3/datastore_info` — return table metadata.

Authorizes the caller on `resource_id` (same gate as `datastore_search`),
then asks the read-only engine for its `InfoResult`. The response is
small enough to skip streaming; we go through the standard
`_success_response` envelope.

Body shape:
result.fields — column schema, list of {"id", "type", ...}
result.meta — free-form dict (engine-specific extras)
"""
await context.auth.authorize(resource_id=params.resource_id, permission="read")
result = await info_datastore(context, params.model_dump())
return _success_response(request, result)


@router.post("/datastore_delete", response_model=DatastoreDeleteResponse)
async def datastore_delete(
request: Request,
payload: DatastoreDeleteRequest,
context: Context,
):
"""`POST /api/3/datastore_delete` — delete rows or drop the table.

Body:
`resource_id` / `id` (one required) — table to delete from.
`filters` (optional dict) — only rows matching every key/value
pair are deleted. Omit → whole table is dropped.
`force` (optional bool) — required to delete from a CKAN
read-only resource.

Returns the original `filters` echoed back (CKAN convention) so the
caller can confirm what the server actually applied.
"""
await context.auth.authorize(resource_id=payload.resource_id, permission="delete")
result = await delete_datastore(context, payload.model_dump())
return _success_response(request, result)
40 changes: 36 additions & 4 deletions datastore/api/endpoints/health.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,59 @@
from __future__ import annotations

from types import SimpleNamespace

from fastapi import APIRouter
from starlette.requests import Request
from starlette.responses import JSONResponse

from datastore.api.responses import _success_response
from datastore.core.config import get_config
from datastore.infrastructure.engines.registry import get_datastore_engine
from datastore.schemas.responses import StatusResponse, WelcomeResponse

router = APIRouter(tags=["health"])
welcome_router = APIRouter(tags=["health"])


probe_router = APIRouter(tags=["health"])

@router.get("/", response_model=WelcomeResponse)

@welcome_router.get("/", response_model=WelcomeResponse)
def welcome(request: Request):
return _success_response(
request,
WelcomeResponse.Result(message=get_config().APP_MESSAGE),
)

@router.get("/health", response_model=StatusResponse)

@probe_router.get("/health", response_model=StatusResponse)
def health(request: Request):
"""Liveness — always 200 while the process is up."""
return _success_response(request, StatusResponse.Result(status="ok"))


@router.get("/ready", response_model=StatusResponse)
@probe_router.get("/ready", response_model=StatusResponse)
def ready(request: Request):
"""Readiness — 200 when both rw and ro engines pass `healthcheck()`,
503 otherwise. Probes both modes because the credential split means
one can fail while the other works."""
ctx = SimpleNamespace(config=get_config())

failing: list[str] = []
for mode in ("rw", "ro"):
try:
engine = get_datastore_engine(ctx, mode=mode) # type: ignore[arg-type]
if not engine.healthcheck():
failing.append(mode)
except Exception:
failing.append(mode)

if failing:
return JSONResponse(
status_code=503,
content={
"help": str(request.url),
"success": False,
"result": {"status": "not_ready"},
},
)
Comment on lines +50 to +58
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep /ready failure responses in CKAN health envelope shape.

The 503 branch returns an error object instead of a result object, which breaks the required health endpoint response contract.

As per coding guidelines "datastore/api/endpoints/health.py: Health endpoints (/, /health, /ready) must return CKAN envelope shape: {help, success, result: {...}}".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/api/endpoints/health.py` around lines 47 - 61, The 503 branch in
datastore/api/endpoints/health.py returns an "error" key instead of the CKAN
envelope "result" object; update the JSONResponse content for the failing branch
to use the CKAN envelope shape { "help": str(request.url), "success": False,
"result": { ... } } (move the existing "__type" and "message" fields currently
under "error" into "result"), keep status_code=503 and the same message building
that uses failing and request.url so the /ready response conforms to the
required contract.

return _success_response(request, StatusResponse.Result(status="ready"))
Loading
Loading