Skip to content

BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health#1

Open
sagargg wants to merge 14 commits into
mainfrom
feat/bigquery-backend
Open

BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health#1
sagargg wants to merge 14 commits into
mainfrom
feat/bigquery-backend

Conversation

@sagargg
Copy link
Copy Markdown
Member

@sagargg sagargg commented May 20, 2026

Summary

Replaces the BigQuery engine placeholder with a working backend covering all six datastore_* actions, plus the surrounding wiring (Frictionless schemas as the canonical column shape, real /ready healthcheck, system columns).

Highlights

  • Engineinfrastructure/engines/bigquery/backend.py now implements datastore_create, upsert, delete, search, search_sql, info against real BigQuery. Split into client.py (auth/Client construction), lib.py (pure SQL helpers — DDL, MERGE, INSERT, UPDATE), metadata.py (_table_metadata row store for per-resource Frictionless schemas), types.py (Frictionless ↔ BigQuery type map + widening rules).
  • Frictionless schema as canonical shapedatastore_create accepts schema (Frictionless Table Schema); legacy fields + primary_key are auto-converted and emit DeprecationWarning. datastore_info / datastore_search / datastore_search_sql return both legacy fields and Frictionless schema so clients can migrate at their own pace.
  • DML, not streaming — INSERT / UPSERT / UPDATE all go through DML (INSERT INTO ... SELECT FROM UNNEST(JSON_QUERY_ARRAY(@rows)) / MERGE / UPDATE … FROM) to avoid BigQuery's 30–90 min streaming-buffer lockout that blocks follow-up MERGE/UPDATE on the same row.
  • System columns — every resource table gets _id (INT64, monotonically increasing via inlined (SELECT IFNULL(MAX(_id), 0) FROM tbl) + ROW_NUMBER()) and, behind Config.INCLUDE_UPDATED_AT (default on), _updated_at TIMESTAMP. MERGE only bumps _updated_at when a non-PK column actually differs (NULL-safe IS DISTINCT FROM, JSON canonicalised via TO_JSON_STRING).
  • Lifespan + /ready — engines are constructed once at startup and stashed on app.state; /ready calls engine.healthcheck() for both read and write engines and returns 503 when either fails.
  • Readable errors — BigQuery error messages are translated to CKAN-shaped ValidationError / ConflictError / ServerError (duplicate-PK on MERGE, bad-type casts, out-of-range, invalid date/timestamp, etc.).

Layout (new files)

datastore/infrastructure/engines/bigquery/
  ├── backend.py        # DatastoreBackend impl
  ├── client.py         # bigquery.Client construction
  ├── lib.py            # pure SQL helpers (DDL, DML renderers, JSON extractors)
  ├── metadata.py       # MetadataStore Protocol + BigQuery impl
  ├── types.py          # Frictionless ↔ BigQuery type map + widening rules
  └── allowed_functions.txt

MetadataStore is a Protocol so future engines (DuckLake, Postgres) plug in without copying the backend.

Test plan

  • pytest — all 180 tests green (engine-level tables tests, metadata store tests, endpoint tests, write service units)
  • Layer arrow holds: rg "from (fastapi|starlette)" datastore/services datastore/infrastructure datastore/core returns nothing
  • Smoke test against a real BigQuery dataset:
    • datastore_create with Frictionless schema → table + _table_metadata row created
    • datastore_create with legacy fields + primary_key → deprecation warning, same result
    • datastore_upsert (method=upsert) — new rows inserted, matching rows updated only when non-PK columns differ, _updated_at advances only on real change
    • datastore_upsert (method=update) — NotFoundError when any PK is unmatched
    • datastore_search returns both fields and schema
    • datastore_search_sql blocks functions outside allowed_functions.txt
    • datastore_info round-trips the info data dictionary verbatim
    • Duplicate-PK insert surfaces as ValidationError, not a 500
    • Bad cast ("jk" into a number column) surfaces as ValidationError with the column name
    • /ready returns 503 when BigQuery credentials are bad
    • Config.INCLUDE_UPDATED_AT=false → table created without _updated_at; MERGE/UPDATE SQL omits the column

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added datastore_delete and datastore_info endpoints; Frictionless Table Schema is now the canonical create input.
    • BigQuery backend, engine registry warmup/cache, improved search streaming (schema included) and richer pagination links.
    • Readiness probe now checks read/write engine modes; configurable system-columns and search-row limits.
  • Bug Fixes

    • Better validation, error translation, and structured response warnings for deprecated fields.
  • Deprecations

    • Legacy fields and primary_key marked deprecated; migrate to schema.
  • Documentation

    • Updated README/architecture docs and example payloads.
  • Tests

    • Expanded unit and end-to-end tests covering BigQuery, create/delete/info/search, health, and pagination.

Review Change Stack

sagargg and others added 10 commits May 18, 2026 14:10
- New GET /api/3/action/datastore_info. Returns
  `result: { meta: dict, fields: [{...}] }` — `fields` is the column
  schema, `meta` is a free-form dict (engine populates whatever extras
  it has: row count, table size, etc.).
- Schema accepts either `resource_id` or `id`; model_validator requires
  at least one and normalises `id` → `resource_id` so downstream code
  reads a single field.
- New `InfoResult` dataclass on the engine ABC; `bigquery.info()` stub
  fixed (previously a no-op that returned None) — now returns a real
  InfoResult.
- Service `info_datastore` calls the read-only engine and reshapes
  into `DatastoreInfoResponse.Result`. Endpoint authorizes the caller
  (same gate as datastore_search) and uses the standard
  `_success_response` envelope — info responses are small, no streaming.
- Also: re-add `.gitignore` entry for the local-only test engine
  (the earlier follow-up commit was lost in a rebase / sync).
- New POST /api/3/action/datastore_delete. Body accepts `resource_id` /
  `id` (one required, normalised); optional `filters` (dict) — omit to
  drop the whole table; optional `force` for read-only resources.
- Response echoes back the resource + filters (CKAN convention), with
  `filters` excluded from the wire when omitted.
- Service `delete_datastore` calls the rw engine and reshapes into
  DatastoreDeleteResponse.Result. Engine placeholder fixed (was a
  no-op `{}` expression) — now returns a real WriteResult.
- Endpoint authorizes the caller with `permission="delete"` (same gate
  pattern as create / upsert).
…earch_sql

- CLAUDE.md + README.md: roadmap shows all six datastore_* actions now
  wired end-to-end; "Next" trimmed to BigQuery adapter + readiness +
  observability + DuckLake. §5.2 status table refreshed (no more 501
  stubs); §3 services tree lists read.py + streaming.py.
- example_payload/: new folders for datastore_delete (with_filters,
  whole_table, force_readonly), datastore_info (basic + id alias), and
  datastore_search_sql (basic, aggregate, with_cte). All use a UUID-shaped
  resource_id matching the real CKAN datastore table-naming convention.
- example_payload/README.md tree updated.
Engines are now built once during lifespan (`warmup_engines`) and probed
by `/ready` via `engine.healthcheck()` — 503 with a Service Unavailable
envelope if rw or ro fails. Registry imports a `Backend` alias from each
engine package so the concrete class name stays engine-private; adding
a backend remains "drop a folder". Renames BQ_* env vars to BIGQUERY_*
and splits credentials into rw / ro. Docs refreshed.
… fields

Make Frictionless Table Schema the native column shape for
`datastore_create` while keeping the legacy CKAN `fields` + `primary_key`
input working end-to-end for back-compat.

Request accepts exactly one of `schema` or legacy `fields`; the validator
folds the legacy pair into a canonical Frictionless schema so the service
and engine only ever see one shape. Response surfaces both for callers on
either side of the migration — `schema` is canonical, `fields` /
`primary_key` are mirrors marked `deprecated` in OpenAPI / IDE tooltips.

When legacy input is used, the envelope grows a `warnings` array pulled
directly from `Field(deprecated=...)` on the model — single source of
truth for the warning text.

Engine `create()` ABC takes a single `schema: dict` (no more separate
`fields` / `unique_keys`); BigQuery + bigquery_test backends updated.

Postgres <-> Frictionless type maps added in `core/constants.py`;
conversion helpers in `schemas/validators.py`. README has a brief
"Column definitions" section explaining the migration goal, and
`example_payload/datastore_create/with_schema.json` provides a
ready-to-curl Frictionless payload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cy fields

Extend the same dual-shape pattern shipped for `datastore_create` to
`datastore_info`. The response now carries both `schema` (canonical
Frictionless Table Schema) and `fields` (legacy `{id, type, info}` list
marked `deprecated` in OpenAPI), so clients on either side of the
migration see what they expect.

Engine `InfoResult` grows a `schema: dict` field next to the existing
`fields` and `meta`. Both placeholder backends populate both shapes —
the local `bigquery_test` engine projects its `DEMO_SCHEMA` into the
Frictionless `{name, type}` form without disturbing `search`, which
still uses the legacy shape via the same `DEMO_SCHEMA` constant.

Service passes both through verbatim; tests pin the new envelope keys.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…gacy fields

Extend the dual-shape response pattern shipped for `datastore_create` /
`datastore_info` to the streaming endpoints `datastore_search` and
`datastore_search_sql`. The CKAN envelope now carries `schema` (canonical
Frictionless Table Schema, same shape `datastore_create` stores) ahead
of `fields` (legacy `{id, type}` list, marked `deprecated` in OpenAPI),
so clients on either side of the migration see what they expect.

Engine `SearchResult` swaps `fields` for `schema` — engines produce the
canonical shape, the service derives the legacy list via the existing
`frictionless_schema_to_fields` helper. Per the migration direction,
schema is what we'll persist; legacy fields is a derived view.

Streaming writers (objects / lists / csv / tsv) all thread the new
`schema` kwarg through `_stream_envelope` so it's emitted lazily into
the body before the `records` block — peak memory still ≈ 1 row.

BigQuery placeholder + local `bigquery_test` backend updated. Test
mock-helper still accepts the legacy `{id, type}` shape for caller
convenience but converts to Frictionless before constructing
`SearchResult`. `test_datastore_search_sql` pins the new envelope key.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e tables

First real BigQuery write path. `datastore_create` now persists the
declared Frictionless schema and creates / migrates the per-resource
data table.

Layout:
  - `metadata.py` — `BigQueryMetadataStore` implements the new
    engine-agnostic `MetadataStore` Protocol added to `engines/base.py`.
    Owns the `_table_metadata` table (resource_id, schema JSON,
    created_at, updated_at) with `initialize` / `insert` / `update` /
    `get` / `delete`. Future engines drop a sibling `metadata.py` to
    satisfy the same Protocol.
  - `types.py` — Frictionless → BigQuery type map + `can_widen` for
    `ALTER COLUMN SET DATA TYPE` rules (INT64 → NUMERIC/BIGNUMERIC/
    FLOAT64; DATE → DATETIME/TIMESTAMP; everything else rejected).
  - `lib.py` — pure helpers (schema diff, ALTER-clause rendering,
    JSON-column serialisation, error formatting).
  - `backend.py` — orchestration. `create()` runs DDL → records insert
    → metadata write in that order so any failure short-circuits before
    the metadata row is written; metadata never describes a state the
    table doesn't match. Two branch helpers (`_apply_new_resource` /
    `_apply_existing_resource`) make the per-path sequence explicit.
    `ALTER TABLE` packs added columns + supported type widenings into
    one atomic statement; unsupported transitions raise `ConflictError`
    with a recreate-the-resource hint up-front. Records are
    stream-inserted via `insert_rows_json`; `object` / `array` /
    `geojson` field values are JSON-encoded first since BigQuery's
    `JSON` type expects strings on the wire.
  - Every `client.query` and `client.insert_rows_json` call is funnelled
    through `_run_query` / `_run_insert_rows` so transport / SQL errors
    surface as `ServerError` with `op` + `resource_id` baked in — never
    as raw `google.api_core` exceptions. Metadata store does the same
    via its own `_run`.

Config: new `BIGQUERY_DATASET` env var (the dataset that holds
`_table_metadata` and all user data tables). Placeholder mode
(BIGQUERY_PROJECT or BIGQUERY_DATASET unset) keeps the unit suite
runnable without GCP creds — `create()` is a no-op echo there.

Tests: 46 new (`tests/test_bigquery_metadata.py` + `tests/test_bigquery_tables.py`)
covering SQL shape, parameter binding, the diff/widening rules, JSON
serialisation, atomicity (data-op failure must not write metadata), and
error wrapping at every layer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rror mapping

Wire the upsert action end-to-end with three methods on a single
backend method:

  - "upsert"  → MERGE keyed on schema.primaryKey (update existing,
                insert new).
  - "insert"  → DML INSERT INTO ... SELECT FROM UNNEST(@rows).
                Replaces the old streaming insert_rows_json — that API
                parks rows in BigQuery's streaming buffer for 30-90 min
                and silently blocks any follow-up MERGE/UPDATE on the
                same table. DML writes go straight to storage, so a
                datastore_create + immediate datastore_upsert now works.
  - "update"  → DML UPDATE keyed on primaryKey. Compares
                num_dml_affected_rows against input row count and
                raises NotFoundError if any row had no matching key
                (UPDATE silently no-ops on misses otherwise).

Records ride as one JSON-array string parameter @rows for every path;
BigQuery unpacks via JSON_QUERY_ARRAY and type-aware extractors
(CAST(JSON_VALUE), PARSE_JSON(JSON_QUERY) for JSON columns). One
statement regardless of batch size.

BigQuery raw errors are mapped to clear ValidationErrors:
  - "Scalar subquery produced more than one element" → "duplicated
    rows with the same primary key".
  - "Bad <type> value: <v>" / "Could not cast '<v>' to type <T>" /
    "Could not parse '<v>' as <T>" → "Value <v> is not a valid
    <integer|number|boolean|...>".
  - "Value out of range for <T>: <v>" → "out of range" message.
  - "Invalid <date|timestamp|datetime|time>: <v>" → typed message.

Refactor: _run_query and _run_dml merged — _run_query now returns the
QueryJob so DML callers can grab num_dml_affected_rows without a
second helper. Module docstring + section markers updated. Per-row
JSON-column serialiser removed (handled inside SQL via PARSE_JSON).

Tests:
  - Test file restructured to one test per behaviour (~22 essentials,
    no duplicates).
  - New autouse conftest fixture clears BIGQUERY_PROJECT/DATASET/creds
    + resets engine cache so the suite stays hermetic regardless of a
    developer's .env. Restores the placeholder-echo branch for tests
    that predate the upsert wiring.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…E_UPDATED_AT

- Always inject `_id` INT64; assign via inlined `(SELECT MAX(_id) FROM tbl) + ROW_NUMBER()` so no extra round-trip.
- Add `_updated_at` TIMESTAMP behind `Config.INCLUDE_UPDATED_AT` (default on); MERGE bumps it only when a non-PK column actually differs (NULL-safe `IS DISTINCT FROM`, JSON canonicalised via `TO_JSON_STRING`).
- Trim verbose docstrings/comments in bigquery/lib.py; rename `user_*` locals to `data_*` / `insert_*`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

📝 Walkthrough

Walkthrough

This PR implements Frictionless Table Schema as the canonical datastore column representation, introduces a fully functional BigQuery backend with metadata persistence, refactors the engine plugin architecture for caching and warmup, and updates all API endpoints to expose both new and legacy schema formats while marking legacy fields as deprecated.

Changes

Frictionless Schema Foundation & Configuration

Layer / File(s) Summary
Configuration & Type Mapping Constants
datastore/core/config.py, datastore/core/constants.py
Config adds BIGQUERY_PROJECT, BIGQUERY_DATASET, BIGQUERY_CREDENTIALS, BIGQUERY_CREDENTIALS_RO, and INCLUDE_UPDATED_AT fields replacing the prior single BQ_PROJECT. Module constants introduce POSTGRES_TO_FRICTIONLESS and FRICTIONLESS_TO_POSTGRES dictionaries mapping between canonical type representations.
Request/Response Schemas & Validators
datastore/schemas/request.py, datastore/schemas/responses.py, datastore/schemas/validators.py
DatastoreCreateRequest accepts canonical Frictionless schema as primary input with legacy fields/primary_key deprecated and mutually exclusive; both DatastoreInfoRequest and DatastoreDeleteRequest are introduced with resource_id/id aliasing. Response models add canonical schema field while marking legacy fields/primary_key as deprecated via Annotated. Converters fields_to_frictionless_schema and frictionless_schema_to_fields translate between representations; validate_frictionless_schema validates descriptors via frictionless library.

Engine Architecture & Registry

Layer / File(s) Summary
Engine Base Contracts & Metadata Protocol
datastore/infrastructure/engines/base.py
SearchResult now carries canonical schema: dict with columns derived from field names. New InfoResult dataclass holds {schema, meta}. MetadataStore protocol defines initialize/insert/update/get/delete lifecycle. DatastoreBackend.create signature changed to accept unified schema: dict (replacing legacy fields/unique_keys); DatastoreBackend.info returns InfoResult instead of dict.
Engine Registry & Caching
datastore/infrastructure/engines/registry.py, datastore/infrastructure/engines/bigquery/__init__.py
Engine registry refactored with _build_engine dynamically importing engine modules, requiring Backend export, instantiating with config, and caching instances keyed by (engine, mode). warmup_engines eagerly initializes rw/ro backends at startup; reset_engine_cache clears instances for teardown/tests. BigQuery module now exports Backend as alias of BigQueryBackend.

BigQuery Backend Implementation

Layer / File(s) Summary
BigQuery Client & Type Mapping
datastore/infrastructure/engines/bigquery/client.py, datastore/infrastructure/engines/bigquery/types.py
build_client(config, mode) constructs BigQuery clients with project validation and mode-specific credential selection (RW vs RO); credentials parsed from JSON blob or service-account file path. FRICTIONLESS_TO_BIGQUERY maps field types; can_widen() validates column type transitions against BigQuery's allowed widening rules.
BigQuery SQL Helpers (DDL/DML)
datastore/infrastructure/engines/bigquery/lib.py
SQL builders generate CREATE/ALTER DDL from Frictionless schema (with system columns _id, optional _updated_at), INSERT/MERGE/UPDATE DML using parameterized JSON-array input (@rows), with typed JSON extraction for each field. schema_diff computes added/changed/removed columns; reject_unsupported_type_changes validates widening rules before DDL execution.
BigQuery Metadata Store
datastore/infrastructure/engines/bigquery/metadata.py
Hidden _table_metadata BigQuery table stores per-resource schemas with timestamps. initialize() creates table idempotently; insert()/update()/get()/delete() manage metadata rows via parameterized SQL; _run() wraps BigQuery job failures into ServerError with operation context.
BigQueryBackend Full Implementation
datastore/infrastructure/engines/bigquery/backend.py
Conditional initialization builds BigQuery client + metadata store when config is valid, otherwise placeholder mode (no DDL/DML). Orchestrates table lifecycle (create/alter for new/existing resources), record operations (insert/merge/update via DML with @rows parameter binding), metadata operations (insert after DDL succeeds, skip on DML failure). search() and search_sql() return SearchResult with schema/total; info() returns InfoResult with schema + row count; delete() returns WriteResult. _translate_bigquery_error() maps BigQuery error messages to user-friendly ValidationError for duplicate PKs and type coercions.

API Endpoints & Service Layer

Layer / File(s) Summary
Endpoint Implementations & Deprecation Warnings
datastore/api/endpoints/datastore.py, datastore/api/responses.py
datastore_delete and datastore_info endpoints now fully implemented (no longer HTTP 501 stubs). datastore_create uses schema + records fields and passes deprecation warnings to response. _success_response accepts optional warnings parameter; _deprecation_warnings() computes warnings from Annotated field metadata without accessing field values.
Service Functions & Streaming
datastore/services/read.py, datastore/services/write.py, datastore/services/streaming.py
search_datastore and search_sql_datastore compute fields from result.schema via converter. New info_datastore queries read-only engine and returns DatastoreInfoResponse.Result with meta/schema/fields. New delete_datastore calls engine.delete and returns result with resource_id/filters. Streaming functions (stream_objects, stream_lists, stream_csv, stream_tsv) accept schema parameter and propagate it into the streaming JSON envelope.

Application Startup & Health

Layer / File(s) Summary
App Lifespan & Health Endpoints
datastore/main.py, datastore/api/endpoints/health.py
Startup now calls warmup_engines(config) to eagerly initialize rw/ro backends; shutdown schedules reset_engine_cache(). /health endpoint returns {status: "ok"}; /ready endpoint iterates over engine modes, runs healthcheck(), and returns 503 with result.status: "not_ready" if any fail, otherwise 200 ready.

Tests & Examples

Layer / File(s) Summary
BigQuery Unit Tests
tests/conftest.py, tests/test_bigquery_metadata.py, tests/test_bigquery_tables.py
Unit tests verify BigQueryMetadataStore DDL/DML, parameterized query binding, error wrapping. Comprehensive BigQueryBackend tests cover type resolution, CREATE/ALTER DDL shape, INSERT/MERGE/UPDATE DML with JSON-array parameters, orchestration atomicity (DDL→DML→metadata ordering), upsert dispatch across methods, and BigQuery error→ValidationError translation for PKs and type coercions.
Integration Tests
tests/test_datastore_create.py, tests/test_datastore_delete.py, tests/test_datastore_info.py, tests/test_datastore_search.py, tests/test_datastore_search_sql.py, tests/test_health.py, tests/test_write_service.py
End-to-end tests for all endpoints verify success paths (create with schema/fields returning both shapes), validation (mutual exclusivity, missing PKs), aliasing (id→resource_id), auth (403 on denied keys, 404 on unknown resources), and health/readiness probes. Service-layer tests updated to use _schema() helper building canonical schema, verifying both response.schema and response.primary_key consistency.
Example Payloads & Documentation
example_payload/*, CLAUDE.md, README.md, .env.example, .gitignore
Example payloads added for datastore_create (with_schema), datastore_delete (filters/whole_table/force), datastore_info (basic/id_alias), datastore_search_sql (basic/aggregate/CTE). Docs updated to reflect engine architecture (Backend alias, plugin layout), Frictionless schema as canonical, and updated environment variables (BIGQUERY_* credentials).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

"A rabbit hops through schemas with glee,
Frictionless paths now set engines free,
BigQuery hums with metadata bright,
Legacy fields bow to the new light,
I nibble warnings, then dance into night." 🐇✨

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/bigquery-backend

@sagargg sagargg changed the title Feat/bigquery backend BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health May 20, 2026
sagargg and others added 2 commits May 20, 2026 22:27
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…w_count

- Backend reads the stored Frictionless schema from `_table_metadata` so `primaryKey` and the per-field `info` data dictionary round-trip exactly as declared; raises NotFoundError when undeclared.
- Row count comes from BigQuery's per-dataset `__TABLES__.row_count` (metadata-only, no full-table scan) — `COUNT(*)` would have billed bytes per call.
- Drop `fields` from InfoResult; the service derives legacy fields via `frictionless_schema_to_fields`, mirroring how `search` already works (engine stays out of the schemas/ layer).
- Soft fallback: missing data table while metadata exists reports total=0 with a warning, instead of 500-ing the call.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
datastore/infrastructure/engines/bigquery/backend.py (1)

539-585: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

search and search_sql are still hard-coded empty results.

Both methods ignore the actual table data and always return empty iterators, so every read path will behave as if the resource has zero rows. That is still a missing core backend implementation, not a stub that can ship behind the “real CRUD” PR scope.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 539 - 585,
The search and search_sql methods currently return hard-coded empty SearchResult
objects; implement real BigQuery queries in search and search_sql: in
search(build a parameterised SELECT using the resource_id/table name, filters,
q, distinct, fields, sort, limit and offset), execute via the BigQuery client
(client.query(...)), use query_job.result() as a lazy iterator to populate
records, construct schema from the returned query schema or provided fields, run
a COUNT(*) when include_total is true to set total, and set
records_truncated=True if the iterator reached the limit; in search_sql(execute
the raw SQL with client.query(sql, job_config=...) , derive schema from
query_job.schema/result, yield rows lazily from query_job.result(), and set
records_truncated when the limit is hit). Ensure you reference and update the
SearchResult construction in both search and search_sql to use the real schema,
records iterator, total, and records_truncated values.
🟡 Minor comments (2)
README.md-56-63 (1)

56-63: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Refresh BigQuery backend status in the project tree.

backend.py is still described as placeholder here; that no longer matches the implemented backend and may confuse readers.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@README.md` around lines 56 - 63, README's project tree still calls
bigquery/backend.py a "DatastoreBackend subclass (placeholder)"; update the
description to reflect that the BigQuery backend is implemented (reference the
concrete class BigQueryBackend in bigquery/backend.py) and briefly list what the
file provides (implemented backend logic rather than a placeholder). Keep
surrounding entries (bigquery/client.py, lib.py, allowed_functions.txt)
unchanged and ensure the wording matches the actual implemented functionality.
CLAUDE.md-378-379 (1)

378-379: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update status text that still says engine logic is placeholder.

These lines now read as outdated and can mislead contributors about what is already implemented in this PR.

Also applies to: 807-808

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@CLAUDE.md` around lines 378 - 379, Replace the outdated status sentence
"Engine business logic is still placeholder (returns empty results / echoes
inputs)" with an accurate summary of current state: note that call path,
validation, auth, streaming and per-engine datastore_search_sql allow-list are
implemented and only the BigQuery adapter remains (referenced in §7); update the
same phrasing wherever it repeats to avoid misleading contributors and ensure
the status reflects what is implemented versus what remains.
🧹 Nitpick comments (1)
tests/test_health.py (1)

39-47: ⚡ Quick win

Add a /docs smoke test in this health suite.

A tiny assertion for GET /docs would catch OpenAPI regressions early alongside liveness/readiness checks.

Based on learnings: Ensure OpenAPI documentation loads at /docs without errors.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_health.py` around lines 39 - 47, Add a small smoke test that
requests the OpenAPI UI so regressions are caught alongside liveness checks:
either extend test_health_returns_ok or add a new test (e.g., test_docs_served)
that calls client.get("/docs") and asserts a 200 response and basic content
(e.g., response.status_code == 200 and response.headers["content-type"] contains
"text/html" or response.text contains "Swagger" / "ReDoc") to ensure the docs
page loads without errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.env.example:
- Around line 10-18: The .env.example is missing entries that match the fields
declared in datastore/core/config.py; add entries for HTTP_MAX_CONNECTIONS,
HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT (with example values or
blanks) using the exact variable names used in Config so pydantic-settings
doesn't ignore them, and update the other relevant section mentioned in the
comment to include the same variables for consistency.

In `@datastore/api/endpoints/health.py`:
- Around line 47-61: The 503 branch in datastore/api/endpoints/health.py returns
an "error" key instead of the CKAN envelope "result" object; update the
JSONResponse content for the failing branch to use the CKAN envelope shape {
"help": str(request.url), "success": False, "result": { ... } } (move the
existing "__type" and "message" fields currently under "error" into "result"),
keep status_code=503 and the same message building that uses failing and
request.url so the /ready response conforms to the required contract.

In `@datastore/core/constants.py`:
- Around line 61-115: The POSTGRES_TO_FRICTIONLESS and FRICTIONLESS_TO_POSTGRES
maps currently introduce non-canonical Frictionless types (interval -> duration
and entries for duration, year, yearmonth); remove those non-canonical mappings
and normalize to the repo's canonical Frictionless subset. Concretely, change
POSTGRES_TO_FRICTIONLESS to map "interval" to a canonical type (e.g., "string"
or "any") instead of "duration", and remove the "duration", "year", and
"yearmonth" keys from FRICTIONLESS_TO_POSTGRES (do not add new non-canonical
keys); leave only mappings for the canonical types listed in the repo (integer,
number, string, boolean, date, datetime, time, object, array, geopoint, geojson,
any) so legacy fields round-trip into canonical schemas.

In `@datastore/infrastructure/engines/base.py`:
- Around line 17-18: Change all bare collection annotations to be parameterized
with typing.Any: import Any from typing and update schema: dict to schema:
dict[str, Any], records: Iterator[tuple] to records: Iterator[tuple[Any, ...]]
(or Iterator[tuple[int, ...]] if elements have known types), and similarly
replace any bare list/dict/tuple elsewhere (e.g., the other annotated blocks
referenced) with list[dict[str, Any]] / dict[str, Any] / tuple[Any, ...] or more
specific element types where known; ensure typing.Iterator/Sequence/Mapping are
used consistently and add the necessary typing imports.

In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 102-110: The readiness check currently returns healthy even when
BIGQUERY_DATASET is unset and metadata is disabled; modify BigQueryBackend so
initialize() records the disabled state (e.g., set self.metadata_available or
leave self.metadata as None) and update healthcheck() to return False when the
dataset/metadata layer is unavailable (check BIGQUERY_DATASET.strip() or the new
self.metadata_available flag), so /ready fails if create()/info() would be
disabled; apply the same change to the other identical dataset-check block later
in BigQueryBackend (the duplicate around the other initialization path).
- Around line 587-595: The delete() method in backend.py currently returns an
empty WriteResult and must be implemented to actually remove data: when filters
is None issue a DROP TABLE IF EXISTS <resource> via the BigQuery client (or use
client.delete_table on the fully-qualified table reference) and when filters is
provided build a parameterised DELETE FROM `<resource>` WHERE ... SQL (use
parameter binding to avoid injection) and execute it via client.query(), then
construct and return a WriteResult containing the row count/affected status;
update the delete() implementation to use the existing BigQuery client instance,
resource_id and filters parameters and populate WriteResult accordingly.

In `@datastore/infrastructure/engines/bigquery/client.py`:
- Around line 31-35: The RO credential path currently ignores
BIGQUERY_CREDENTIALS when BIGQUERY_CREDENTIALS_RO is blank; update the logic
that sets creds_raw (referencing config.BIGQUERY_CREDENTIALS_RO and
config.BIGQUERY_CREDENTIALS) so that when mode == "ro" and
BIGQUERY_CREDENTIALS_RO.strip() is empty it falls back to using
BIGQUERY_CREDENTIALS.strip() before falling back to ADC; ensure the resulting
creds_raw variable is the chosen string used downstream (same variable name) so
RO behaves like RW when only one service account is configured.

In `@datastore/infrastructure/engines/bigquery/lib.py`:
- Around line 15-17: The code currently silently drops user-declared reserved
columns (defined by SYSTEM_COLUMN_NAMES) when building schemas; instead,
validate incoming schema.fields against SYSTEM_COLUMN_NAMES and raise a clear,
fast-failing exception (e.g., ValueError or a domain-specific SchemaError) if
any field name intersects with SYSTEM_COLUMN_NAMES; update the validation logic
that iterates schema.fields (the same area referenced around lines 32-40) to
perform this check before any mutation so callers receive an explicit error
mentioning the offending field name(s) and that `_id`/`_updated_at` are
reserved.
- Around line 130-143: The current id allocation using id_expr (MAX(`_id`) +
ROW_NUMBER()) is unsafe for concurrent writers (see id_expr, table_ref,
sys_vals, include_updated_at, sys_cols, data_cols, data_extractors) and must be
replaced with a serialized or collision-free allocator; change the code to
either use a BigQuery SEQUENCE (NEXTVAL(sequence_name) per row) or switch to a
collision-free ID like GENERATE_UUID() for _id, and update both occurrences
(this block and the repeated logic around lines 199-210) so inserts select
NEXTVAL(...) or GENERATE_UUID() instead of MAX(_id)+ROW_NUMBER(). Ensure the
chosen approach preserves the sys_vals construction (include_updated_at branch)
and works for batch UNNEST(`@rows`) inserts.

In `@datastore/infrastructure/engines/bigquery/metadata.py`:
- Around line 90-112: The insert() method can race and create multiple rows for
the same resource_id because it uses a plain INSERT; change it to an idempotent,
atomic operation (e.g., MERGE ... WHEN NOT MATCHED THEN INSERT or INSERT ...
SELECT WHERE NOT EXISTS) against self.table_ref so a second concurrent create
will be no-op rather than a duplicate; use the same parameters currently passed
into _run and _schema_params and keep the op string (e.g., "metadata INSERT")
while ensuring the query only inserts when resource_id is absent; also apply the
same MERGE/conditional-insert pattern to the analogous code referenced at the
151-172 range to preserve the one-row-per-resource invariant.

In `@datastore/infrastructure/engines/registry.py`:
- Line 67: The helper signatures need concrete types to avoid leaking Any:
annotate _build_engine(engine: str, mode: Mode, *, config: Mapping[str, Any],
context: Optional[Mapping[str, Any]] = None) (or replace Mapping[...] with your
project-specific Config/Context type if one exists) and similarly annotate
warmup_engines(config: Mapping[str, Any]) (and its return type, e.g., None or
Coroutine[...] if async). Import Optional and Mapping/Any from typing and use
the project Config type if present so strict mypy sees concrete parameter types
through the backend construction path.
- Around line 95-100: warmup_engines currently caches engines under
_INSTANCES[(engine, mode)] that may capture a per-request RequestContext and
cause auth/CKAN state leakage; change the registry so only context-free
instances are cached and any engine built with a RequestContext is not stored in
_INSTANCES. Specifically: ensure warmup_engines calls _build_engine with
context=None and stores only context-free engines; update get_datastore_engine
(and the similar block around the other occurrence) to detect when a
RequestContext is provided and avoid writing that instance into _INSTANCES
(instead return a transient engine or clone a context-free engine with the
request context applied), and rely on cached engines only when context is None.
Ensure references: warmup_engines, get_datastore_engine, _INSTANCES, and
_build_engine are updated accordingly.

In `@datastore/main.py`:
- Around line 44-47: The warmed read/write engines created by warmup_engines
aren't being attached to the FastAPI app state; modify the lifespan startup to
store concrete engine instances on app.state (e.g. app.state.rw_engine and
app.state.ro_engine) after calling warmup_engines and ensure the
reset_engine_cache callback clears those same app.state entries; update
warmup_engines (or its caller) to return or assign the actual engine objects so
the /ready endpoint (datastore/api/endpoints/health.py) can call
app.state.rw_engine.healthcheck() and app.state.ro_engine.healthcheck(),
returning 503 if either healthcheck fails.

In `@datastore/schemas/request.py`:
- Around line 280-286: The current model validator _require_resource_id_or_id
allows both resource_id and id to be set and silently prefers resource_id;
change it to enforce an exclusive-or: if both resource_id and id are provided
raise ValueError("only one of 'resource_id' or 'id' may be set"), if neither
provided raise the existing error, and if only id is provided normalize by
assigning self.resource_id = self.id before returning; apply the same XOR
enforcement and normalization change to the corresponding validator method in
DatastoreDeleteRequest so both request types reject ambiguous dual-field
requests and still normalize id -> resource_id when appropriate.

In `@datastore/schemas/validators.py`:
- Around line 186-207: The validate_frictionless_schema function must reject
Frictionless field types outside our canonical vocabulary before calling
Schema.from_descriptor; update validate_frictionless_schema to, when value is a
dict containing a "fields" list, iterate each field and verify its "type" is one
of the canonical types (integer, number, string, boolean, date, datetime, time,
object, array, geopoint, geojson, any) and raise ValueError listing offending
types if any are found; keep the existing Schema.from_descriptor call for full
descriptor validation and do not perform alias normalization here (that belongs
in storage code).
- Around line 117-132: fields_to_frictionless_schema() currently flattens
non-title/description info keys onto the top-level field object (see variables
info, extra, fr), but frictionless_schema_to_fields() only reads back extras
from field["info"], losing flattened keys like unit or notes; update
frictionless_schema_to_fields() so when rebuilding legacy field dicts it
collects any top-level keys on the frictionless field object that are not the
canonical keys (e.g., name, type, title, description, info) and merges them into
the restored info/extras before appending to fr_fields (mirror the inverse of
the extra merging done in fields_to_frictionless_schema()); apply the same logic
where similar reconstruction occurs in the 140-173 range.

In `@datastore/services/write.py`:
- Around line 83-96: The service delete_datastore currently returns a Pydantic
response model DatastoreDeleteResponse.Result; change it to return a plain
Python data structure (e.g., a dict) with the same fields so the service layer
is not coupled to response schemas; locate delete_datastore (and its use of
get_datastore_engine and engine.delete), remove the Pydantic model from the
return and instead return {"resource_id": resource_id, "filters": filters} (or
an equivalent tuple/dataclass) and ensure any callers expect the plain dict
rather than DatastoreDeleteResponse.Result.
- Around line 88-91: The code currently does `filters = data_dict.get("filters")
or None`, which converts empty dicts to None and can change delete semantics;
change it to preserve explicit empty filters by assigning `filters =
data_dict.get("filters")` (or use `data_dict["filters"]` with a key-existence
check) and pass that through to
`get_datastore_engine(...).delete(resource_id=resource_id, filters=filters)` so
only a missing key yields None while `{}` remains `{}`.

In `@tests/test_health.py`:
- Around line 20-23: The function _clean_engine_cache is untyped and fails
strict mypy; add explicit typing by importing typing.Iterator and annotating the
generator as def _clean_engine_cache() -> Iterator[None]: (or Generator[None,
None, None] if you prefer), and annotate any other untyped generator/fixture
defs in the file (the other defs flagged in the review) similarly; ensure you
add the necessary import (from typing import Iterator or Generator) and any
missing parameter/return type annotations to satisfy strict = true.

---

Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 539-585: The search and search_sql methods currently return
hard-coded empty SearchResult objects; implement real BigQuery queries in search
and search_sql: in search(build a parameterised SELECT using the
resource_id/table name, filters, q, distinct, fields, sort, limit and offset),
execute via the BigQuery client (client.query(...)), use query_job.result() as a
lazy iterator to populate records, construct schema from the returned query
schema or provided fields, run a COUNT(*) when include_total is true to set
total, and set records_truncated=True if the iterator reached the limit; in
search_sql(execute the raw SQL with client.query(sql, job_config=...) , derive
schema from query_job.schema/result, yield rows lazily from query_job.result(),
and set records_truncated when the limit is hit). Ensure you reference and
update the SearchResult construction in both search and search_sql to use the
real schema, records iterator, total, and records_truncated values.

---

Minor comments:
In `@CLAUDE.md`:
- Around line 378-379: Replace the outdated status sentence "Engine business
logic is still placeholder (returns empty results / echoes inputs)" with an
accurate summary of current state: note that call path, validation, auth,
streaming and per-engine datastore_search_sql allow-list are implemented and
only the BigQuery adapter remains (referenced in §7); update the same phrasing
wherever it repeats to avoid misleading contributors and ensure the status
reflects what is implemented versus what remains.

In `@README.md`:
- Around line 56-63: README's project tree still calls bigquery/backend.py a
"DatastoreBackend subclass (placeholder)"; update the description to reflect
that the BigQuery backend is implemented (reference the concrete class
BigQueryBackend in bigquery/backend.py) and briefly list what the file provides
(implemented backend logic rather than a placeholder). Keep surrounding entries
(bigquery/client.py, lib.py, allowed_functions.txt) unchanged and ensure the
wording matches the actual implemented functionality.

---

Nitpick comments:
In `@tests/test_health.py`:
- Around line 39-47: Add a small smoke test that requests the OpenAPI UI so
regressions are caught alongside liveness checks: either extend
test_health_returns_ok or add a new test (e.g., test_docs_served) that calls
client.get("/docs") and asserts a 200 response and basic content (e.g.,
response.status_code == 200 and response.headers["content-type"] contains
"text/html" or response.text contains "Swagger" / "ReDoc") to ensure the docs
page loads without errors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 40646cc2-c6d2-4774-90ba-512c1e517b04

📥 Commits

Reviewing files that changed from the base of the PR and between be3a91a and 2bf0577.

📒 Files selected for processing (44)
  • .env.example
  • .gitignore
  • CLAUDE.md
  • README.md
  • datastore/api/endpoints/datastore.py
  • datastore/api/endpoints/health.py
  • datastore/api/responses.py
  • datastore/core/config.py
  • datastore/core/constants.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/__init__.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/infrastructure/engines/bigquery/client.py
  • datastore/infrastructure/engines/bigquery/lib.py
  • datastore/infrastructure/engines/bigquery/metadata.py
  • datastore/infrastructure/engines/bigquery/types.py
  • datastore/infrastructure/engines/registry.py
  • datastore/main.py
  • datastore/schemas/request.py
  • datastore/schemas/responses.py
  • datastore/schemas/validators.py
  • datastore/services/read.py
  • datastore/services/streaming.py
  • datastore/services/write.py
  • example_payload/README.md
  • example_payload/datastore_create/with_schema.json
  • example_payload/datastore_delete/force_readonly.json
  • example_payload/datastore_delete/whole_table.json
  • example_payload/datastore_delete/with_filters.json
  • example_payload/datastore_info/basic.json
  • example_payload/datastore_info/with_id_alias.json
  • example_payload/datastore_search_sql/aggregate.json
  • example_payload/datastore_search_sql/basic.json
  • example_payload/datastore_search_sql/with_cte.json
  • tests/conftest.py
  • tests/test_bigquery_metadata.py
  • tests/test_bigquery_tables.py
  • tests/test_datastore_create.py
  • tests/test_datastore_delete.py
  • tests/test_datastore_info.py
  • tests/test_datastore_search.py
  • tests/test_datastore_search_sql.py
  • tests/test_health.py
  • tests/test_write_service.py

Comment thread .env.example
Comment on lines +10 to +18
# Selects the storage backend (must match a folder under
# `datastore/infrastructure/engines/`):
# bigquery — real BigQuery adapter (placeholder while being built).
# ducklake — Future planned
# ducklake — Future planned.
DATASTORE_ENGINE=bigquery
BQ_PROJECT=
BIGQUERY_PROJECT=
BIGQUERY_DATASET=
BIGQUERY_CREDENTIALS=
BIGQUERY_CREDENTIALS_RO=
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep .env.example aligned with the actual settings model.

HTTP_MAX_CONNECTIONS and HTTP_MAX_KEEPALIVE_CONNECTIONS are documented here, but datastore/core/config.py does not declare either field, so pydantic-settings will silently ignore them. INCLUDE_UPDATED_AT was added to Config as well, but operators still don't get an example for that toggle here.

Also applies to: 26-27

🧰 Tools
🪛 dotenv-linter (4.0.0)

[warning] 15-15: [UnorderedKey] The BIGQUERY_PROJECT key should go before the DATASTORE_ENGINE key

(UnorderedKey)


[warning] 16-16: [UnorderedKey] The BIGQUERY_DATASET key should go before the BIGQUERY_PROJECT key

(UnorderedKey)


[warning] 17-17: [UnorderedKey] The BIGQUERY_CREDENTIALS key should go before the BIGQUERY_DATASET key

(UnorderedKey)


[warning] 18-18: [UnorderedKey] The BIGQUERY_CREDENTIALS_RO key should go before the BIGQUERY_DATASET key

(UnorderedKey)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In @.env.example around lines 10 - 18, The .env.example is missing entries that
match the fields declared in datastore/core/config.py; add entries for
HTTP_MAX_CONNECTIONS, HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT
(with example values or blanks) using the exact variable names used in Config so
pydantic-settings doesn't ignore them, and update the other relevant section
mentioned in the comment to include the same variables for consistency.

Comment on lines +47 to +61
if failing:
return JSONResponse(
status_code=503,
content={
"help": str(request.url),
"success": False,
"error": {
"__type": "Service Unavailable",
"message": (
f"engine healthcheck failed for mode(s): "
f"{', '.join(failing)}"
),
},
},
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep /ready failure responses in CKAN health envelope shape.

The 503 branch returns an error object instead of a result object, which breaks the required health endpoint response contract.

As per coding guidelines "datastore/api/endpoints/health.py: Health endpoints (/, /health, /ready) must return CKAN envelope shape: {help, success, result: {...}}".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/api/endpoints/health.py` around lines 47 - 61, The 503 branch in
datastore/api/endpoints/health.py returns an "error" key instead of the CKAN
envelope "result" object; update the JSONResponse content for the failing branch
to use the CKAN envelope shape { "help": str(request.url), "success": False,
"result": { ... } } (move the existing "__type" and "message" fields currently
under "error" into "result"), keep status_code=503 and the same message building
that uses failing and request.url so the /ready response conforms to the
required contract.

Comment thread datastore/core/constants.py
Comment thread datastore/infrastructure/engines/base.py Outdated
Comment thread datastore/infrastructure/engines/bigquery/backend.py
Comment thread datastore/schemas/validators.py
Comment on lines +150 to +168
async def info_datastore(
context: RequestContext, data_dict: dict[str, Any]
) -> DatastoreInfoResponse.Result:
"""Look up table metadata for a single `resource_id`.

Endpoint authorizes the caller first (same gate as `search`). This
service just asks the read-only engine for its `InfoResult` and
re-shapes it as the response's typed `Result`. No streaming —
`info` responses are small enough for the standard `_success_response`
path.
"""
engine = get_datastore_engine(context, mode="ro")
result = engine.info(resource_id=data_dict["resource_id"])
fields, _ = frictionless_schema_to_fields(result.schema)
return DatastoreInfoResponse.Result(
meta=result.meta,
schema=result.schema,
fields=fields,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

info_datastore should return a plain dict/dataclass, not DatastoreInfoResponse.Result.

Line 152 and Line 164 couple the service layer to API response models. Keep response-model construction at the API boundary to preserve one-way layering.

As per coding guidelines: "{datastore/services,datastore/infrastructure/engines}/**/*.py: Use plain dicts, tuples, and dataclasses for data passed between services and engines—never Pydantic models."

Comment on lines +83 to +96
async def delete_datastore(
context: RequestContext, data_dict: dict[str, Any]
) -> DatastoreDeleteResponse.Result:
"""Delete rows matching `filters`, or drop the whole table."""
resource_id = data_dict["resource_id"]
filters = data_dict.get("filters") or None

engine = get_datastore_engine(context, mode="rw")
engine.delete(resource_id=resource_id, filters=filters)

return DatastoreDeleteResponse.Result(
resource_id=resource_id,
filters=filters,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Service layer should return plain data structures, not Pydantic Result models.

delete_datastore returns DatastoreDeleteResponse.Result, which couples services to response-schema models. That breaks the layer contract and makes services API-shape aware.

As per coding guidelines: "{datastore/services,datastore/infrastructure/engines}/**/*.py: Use plain dicts, tuples, and dataclasses for data passed between services and engines—never Pydantic models."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/write.py` around lines 83 - 96, The service
delete_datastore currently returns a Pydantic response model
DatastoreDeleteResponse.Result; change it to return a plain Python data
structure (e.g., a dict) with the same fields so the service layer is not
coupled to response schemas; locate delete_datastore (and its use of
get_datastore_engine and engine.delete), remove the Pydantic model from the
return and instead return {"resource_id": resource_id, "filters": filters} (or
an equivalent tuple/dataclass) and ensure any callers expect the plain dict
rather than DatastoreDeleteResponse.Result.

Comment thread datastore/services/write.py Outdated
Comment thread tests/test_health.py Outdated
sagargg and others added 2 commits May 21, 2026 14:34
… richer _links

- New `bigquery/search.py` builders: parameterised SELECT (typed filter binds, IN UNNEST for list filters, native `SEARCH()` for full-text on whole row or per-column), validated sort/projection, and a matching COUNT subquery for filtered/distinct totals.
- Backend dispatches search + count jobs before awaiting either, so they run in parallel — wall time ≈ max(both). Unfiltered+non-distinct totals fall back to the cheap row-count helper.
- Schema-level limit cap dropped; new `Config.SEARCH_RESULT_ROWS_MAX` (default 32000) is the env-driven ceiling enforced at the service boundary with a "paginate with offset" hint.
- `_links` now carries conditional `prev` / `next` plus `page_size`, `page`, and `total_pages`. `page` / `total_pages` are suppressed when the current page has no rows (empty resource or past-end) so a UI never sees an incoherent "page 5 of 4".

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… defaults

- Don't silently coerce `filters={}` to None on delete; an empty dict now reaches the engine as-is so the contract distinguishes "no-WHERE delete" from "drop table".
- Reject `_id` / `_updated_at` in user-provided field/schema names instead of silently dropping them — a hidden drop would leave the response advertising columns the engine refuses to populate.
- Reject non-canonical Frictionless types (`duration`, `year`, `yearmonth`, …) at the request boundary and trim them from the type maps; the datastore commits to a narrower vocabulary.
- 400 when `resource_id` and `id` arrive with different values on `datastore_info` / `datastore_delete` (legacy clients echoing the same value still work).
- `/ready` 503 stays in the StatusResponse envelope (`result.status: "not_ready"`) instead of an error envelope; mode names no longer leak into the response.
- Mount `/health` and `/ready` under both `/` and `/api/3/action/` so k8s probes and CKAN clients both reach them; welcome stays root-only.
- `Config` and `.env.example` realigned: drop the orphan `HTTP_MAX_*` from the template, document `INCLUDE_UPDATED_AT` + `SEARCH_RESULT_ROWS_MAX`.
- `healthcheck()` fails when the dataset / metadata store is unconfigured (was passing on `SELECT 1` even when writes would silently no-op).
- Preserve all `info` keys on the legacy ⇄ Frictionless roundtrip (extras now nest under `info` instead of being flattened and lost).
- Type annotations across base.py, registry.py helpers, and test_health.py to stop leaking `Any` under strict mypy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
datastore/services/read.py (1)

104-105: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

datastore_search_sql ignores the configurable row cap.

search_sql_datastore still hardcodes 32000 via _SQL_DEFAULT_LIMIT (Line 104, Line 140, Line 149), so Config.SEARCH_RESULT_ROWS_MAX does not govern SQL search results.

Suggested fix
-_SQL_DEFAULT_LIMIT = 32000
-
-
 async def search_sql_datastore(
@@
-    result = engine.search_sql(
-        sql=data_dict["sql"], limit=_SQL_DEFAULT_LIMIT
-    )
+    max_limit = context.config.SEARCH_RESULT_ROWS_MAX
+    result = engine.search_sql(sql=data_dict["sql"], limit=max_limit)
@@
-        limit=_SQL_DEFAULT_LIMIT,
+        limit=max_limit,
@@
-        links=_build_pagination_links(
-            request_url, limit=_SQL_DEFAULT_LIMIT, offset=0, total=None,
-        ),
+        links=_build_pagination_links(
+            request_url, limit=max_limit, offset=0, total=None,
+        ),

Also applies to: 140-155

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/read.py` around lines 104 - 105, The SQL search path
currently uses the hardcoded _SQL_DEFAULT_LIMIT (32000) which bypasses the
runtime configuration; update datastore_search_sql to derive its limit from
Config.SEARCH_RESULT_ROWS_MAX (with a sensible fallback if unset) instead of
using _SQL_DEFAULT_LIMIT, and replace uses of _SQL_DEFAULT_LIMIT in the
query-building/limit-clamping logic so the effective LIMIT value is calculated
from Config.SEARCH_RESULT_ROWS_MAX (and still enforces any minimum/maximum
checks); ensure references in datastore_search_sql and any helper that currently
reference _SQL_DEFAULT_LIMIT are switched to the new config-driven value.
datastore/infrastructure/engines/bigquery/backend.py (2)

465-470: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

create() returns a dict instead of WriteResult.

The method signature declares -> WriteResult but returns a plain dict. This breaks the type contract and will fail strict type checking. The same issue exists in upsert() at lines 503-509 and 531-537.

Proposed fix for create()
-        return {
-            "schema": schema,
-            "records": records,
-            "include_total": include_total,
-            "total": len(records or []) if include_total else None,
-        }
+        return WriteResult(
+            rows_written=len(records or []),
+            total=len(records or []) if include_total else None,
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 465 - 470,
The create() and upsert() functions currently return a plain dict but declare ->
WriteResult; update both to return an actual WriteResult instance instead of a
dict: import or reference the WriteResult class/type and construct
WriteResult(schema=schema, records=records, include_total=include_total,
total=(len(records or []) if include_total else None)) (apply the same change in
both create() and the two upsert() return sites) so the return value matches the
declared type and passes static typing.

501-537: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

upsert() returns a dict instead of WriteResult.

Both placeholder-mode (lines 503-509) and normal (lines 531-537) return paths return plain dicts instead of WriteResult dataclass instances, breaking the declared return type.

Proposed fix
         if self.metadata is None:
             # Placeholder mode — echo (matches the create() pattern).
-            return {
-                "resource_id": resource_id,
-                "records": records,
-                "method": method,
-                "include_total": include_total,
-                "total": len(records or []),
-            }
+            return WriteResult(
+                rows_written=len(records or []),
+                total=len(records or []) if include_total else None,
+            )
         ...
-        return {
-            "resource_id": resource_id,
-            "records": records,
-            "method": method,
-            "include_total": include_total,
-            "total": len(rows) if include_total else None,
-        }
+        return WriteResult(
+            rows_written=len(rows),
+            total=len(rows) if include_total else None,
+        )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 501 - 537,
The upsert() implementation currently returns plain dicts in both placeholder
mode and the normal path; change both return statements to construct and return
a WriteResult dataclass instance (instead of a dict) with the same fields
(resource_id, records, method, include_total, total). Locate the upsert method
and replace the first return block that handles placeholder mode and the final
return block after calling _insert_records/_merge_records/_update_records to
return WriteResult(...) with the appropriate field values (compute total as
len(rows) or len(records or []) as before). Ensure WriteResult is imported where
needed and that the object shape matches the declared return type.
♻️ Duplicate comments (2)
datastore/services/write.py (1)

83-103: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return plain data from the service instead of DatastoreDeleteResponse.Result.

Line 85 and Line 100 keep this service coupled to response-schema models. Return a plain dict (or dataclass) from delete_datastore, and let the API layer shape it into the response model.

Suggested minimal change
-async def delete_datastore(
-    context: RequestContext, data_dict: dict[str, Any]
-) -> DatastoreDeleteResponse.Result:
+async def delete_datastore(
+    context: RequestContext, data_dict: dict[str, Any]
+) -> dict[str, Any]:
@@
-    return DatastoreDeleteResponse.Result(
-        resource_id=resource_id,
-        filters=filters,
-    )
+    return {
+        "resource_id": resource_id,
+        "filters": filters,
+    }

As per coding guidelines, "datastore/{services,infrastructure}/**/*.py: Use Pydantic v2 for request/response validation at boundaries only; never pass Pydantic models between services or return them from engines."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/services/write.py` around lines 83 - 103, The service function
delete_datastore currently returns a Pydantic response model
DatastoreDeleteResponse.Result, coupling the service layer to the API schema;
change it to return plain data (a dict or simple dataclass) containing the same
fields (resource_id and filters) instead. Locate delete_datastore, keep the call
to get_datastore_engine(...) and engine.delete(resource_id=..., filters=...)
unchanged, then construct and return a plain Python dict (or dataclass instance)
with keys resource_id and filters instead of DatastoreDeleteResponse.Result so
the API layer can perform Pydantic v2 shaping.
datastore/schemas/validators.py (1)

133-134: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Extra metadata still spread onto field top-level, contradicting docstring and breaking round-trip.

The docstring at lines 104-109 states extras "stays nested under a custom info key", but line 134 spreads extra onto the top-level field object (fr = {**fr, **extra}). This means frictionless_schema_to_fields() (which only reads from field["info"]) cannot recover these extras, losing keys like unit, notes, etc.

Proposed fix
         if extra:
-            fr = {**fr, **extra}
+            fr["info"] = extra
         fr_fields.append(fr)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@datastore/schemas/validators.py` around lines 133 - 134, The extras dict is
being spread onto the field top-level (fr = {**fr, **extra}) which contradicts
the docstring and prevents frictionless_schema_to_fields() from round-tripping
extras from field["info"]; instead merge extras into the field's "info" sub-dict
so existing info is preserved (e.g., set fr["info"] = {**fr.get("info", {}),
**extra}) and remove the top-level spread; update the block around the function
that constructs fr to nest extra under "info" rather than merging into fr
directly.
🧹 Nitpick comments (1)
tests/test_bigquery_tables.py (1)

1060-1070: 💤 Low value

Test comment references __TABLES__ but implementation uses COUNT(*).

The comment at line 1062 mentions "__TABLES__/_count_rows path" but the actual _count_rows implementation now uses SELECT COUNT(*) AS n FROM <table>, not BigQuery's __TABLES__ metadata. Consider updating the comment for accuracy.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_bigquery_tables.py` around lines 1060 - 1070, The test docstring
mentions the `__TABLES__`/`_count_rows` path but the current `_count_rows`
implementation uses a `SELECT COUNT(*) AS n FROM <table>` query; update the
`test_needs_count_query_only_when_filtering_or_distinct` docstring (or inline
comment) to accurately describe that unfiltered + non-distinct searches use the
cheaper COUNT(*) path (or remove the `__TABLES__` reference) and keep references
to `needs_count_query` and `_count_rows` so reviewers can locate the related
logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 465-470: The create() and upsert() functions currently return a
plain dict but declare -> WriteResult; update both to return an actual
WriteResult instance instead of a dict: import or reference the WriteResult
class/type and construct WriteResult(schema=schema, records=records,
include_total=include_total, total=(len(records or []) if include_total else
None)) (apply the same change in both create() and the two upsert() return
sites) so the return value matches the declared type and passes static typing.
- Around line 501-537: The upsert() implementation currently returns plain dicts
in both placeholder mode and the normal path; change both return statements to
construct and return a WriteResult dataclass instance (instead of a dict) with
the same fields (resource_id, records, method, include_total, total). Locate the
upsert method and replace the first return block that handles placeholder mode
and the final return block after calling
_insert_records/_merge_records/_update_records to return WriteResult(...) with
the appropriate field values (compute total as len(rows) or len(records or [])
as before). Ensure WriteResult is imported where needed and that the object
shape matches the declared return type.

In `@datastore/services/read.py`:
- Around line 104-105: The SQL search path currently uses the hardcoded
_SQL_DEFAULT_LIMIT (32000) which bypasses the runtime configuration; update
datastore_search_sql to derive its limit from Config.SEARCH_RESULT_ROWS_MAX
(with a sensible fallback if unset) instead of using _SQL_DEFAULT_LIMIT, and
replace uses of _SQL_DEFAULT_LIMIT in the query-building/limit-clamping logic so
the effective LIMIT value is calculated from Config.SEARCH_RESULT_ROWS_MAX (and
still enforces any minimum/maximum checks); ensure references in
datastore_search_sql and any helper that currently reference _SQL_DEFAULT_LIMIT
are switched to the new config-driven value.

---

Duplicate comments:
In `@datastore/schemas/validators.py`:
- Around line 133-134: The extras dict is being spread onto the field top-level
(fr = {**fr, **extra}) which contradicts the docstring and prevents
frictionless_schema_to_fields() from round-tripping extras from field["info"];
instead merge extras into the field's "info" sub-dict so existing info is
preserved (e.g., set fr["info"] = {**fr.get("info", {}), **extra}) and remove
the top-level spread; update the block around the function that constructs fr to
nest extra under "info" rather than merging into fr directly.

In `@datastore/services/write.py`:
- Around line 83-103: The service function delete_datastore currently returns a
Pydantic response model DatastoreDeleteResponse.Result, coupling the service
layer to the API schema; change it to return plain data (a dict or simple
dataclass) containing the same fields (resource_id and filters) instead. Locate
delete_datastore, keep the call to get_datastore_engine(...) and
engine.delete(resource_id=..., filters=...) unchanged, then construct and return
a plain Python dict (or dataclass instance) with keys resource_id and filters
instead of DatastoreDeleteResponse.Result so the API layer can perform Pydantic
v2 shaping.

---

Nitpick comments:
In `@tests/test_bigquery_tables.py`:
- Around line 1060-1070: The test docstring mentions the
`__TABLES__`/`_count_rows` path but the current `_count_rows` implementation
uses a `SELECT COUNT(*) AS n FROM <table>` query; update the
`test_needs_count_query_only_when_filtering_or_distinct` docstring (or inline
comment) to accurately describe that unfiltered + non-distinct searches use the
cheaper COUNT(*) path (or remove the `__TABLES__` reference) and keep references
to `needs_count_query` and `_count_rows` so reviewers can locate the related
logic.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d9d8da2-9761-48ea-9fe6-1b5c8ffa3d4b

📥 Commits

Reviewing files that changed from the base of the PR and between 2bf0577 and f13589c.

📒 Files selected for processing (22)
  • .env.example
  • datastore/api/endpoints/health.py
  • datastore/api/routes.py
  • datastore/core/config.py
  • datastore/core/constants.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/infrastructure/engines/bigquery/client.py
  • datastore/infrastructure/engines/bigquery/search.py
  • datastore/infrastructure/engines/registry.py
  • datastore/schemas/request.py
  • datastore/schemas/responses.py
  • datastore/schemas/validators.py
  • datastore/services/read.py
  • datastore/services/write.py
  • tests/test_bigquery_tables.py
  • tests/test_datastore_delete.py
  • tests/test_datastore_info.py
  • tests/test_datastore_search.py
  • tests/test_datastore_search_sql.py
  • tests/test_health.py
  • tests/test_read_service.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant