BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health#1
BigQuery datastore backend: real CRUD + Frictionless schemas + lifespan health#1sagargg wants to merge 14 commits into
Conversation
- New GET /api/3/action/datastore_info. Returns
`result: { meta: dict, fields: [{...}] }` — `fields` is the column
schema, `meta` is a free-form dict (engine populates whatever extras
it has: row count, table size, etc.).
- Schema accepts either `resource_id` or `id`; model_validator requires
at least one and normalises `id` → `resource_id` so downstream code
reads a single field.
- New `InfoResult` dataclass on the engine ABC; `bigquery.info()` stub
fixed (previously a no-op that returned None) — now returns a real
InfoResult.
- Service `info_datastore` calls the read-only engine and reshapes
into `DatastoreInfoResponse.Result`. Endpoint authorizes the caller
(same gate as datastore_search) and uses the standard
`_success_response` envelope — info responses are small, no streaming.
- Also: re-add `.gitignore` entry for the local-only test engine
(the earlier follow-up commit was lost in a rebase / sync).
- New POST /api/3/action/datastore_delete. Body accepts `resource_id` /
`id` (one required, normalised); optional `filters` (dict) — omit to
drop the whole table; optional `force` for read-only resources.
- Response echoes back the resource + filters (CKAN convention), with
`filters` excluded from the wire when omitted.
- Service `delete_datastore` calls the rw engine and reshapes into
DatastoreDeleteResponse.Result. Engine placeholder fixed (was a
no-op `{}` expression) — now returns a real WriteResult.
- Endpoint authorizes the caller with `permission="delete"` (same gate
pattern as create / upsert).
…earch_sql - CLAUDE.md + README.md: roadmap shows all six datastore_* actions now wired end-to-end; "Next" trimmed to BigQuery adapter + readiness + observability + DuckLake. §5.2 status table refreshed (no more 501 stubs); §3 services tree lists read.py + streaming.py. - example_payload/: new folders for datastore_delete (with_filters, whole_table, force_readonly), datastore_info (basic + id alias), and datastore_search_sql (basic, aggregate, with_cte). All use a UUID-shaped resource_id matching the real CKAN datastore table-naming convention. - example_payload/README.md tree updated.
Engines are now built once during lifespan (`warmup_engines`) and probed by `/ready` via `engine.healthcheck()` — 503 with a Service Unavailable envelope if rw or ro fails. Registry imports a `Backend` alias from each engine package so the concrete class name stays engine-private; adding a backend remains "drop a folder". Renames BQ_* env vars to BIGQUERY_* and splits credentials into rw / ro. Docs refreshed.
… fields Make Frictionless Table Schema the native column shape for `datastore_create` while keeping the legacy CKAN `fields` + `primary_key` input working end-to-end for back-compat. Request accepts exactly one of `schema` or legacy `fields`; the validator folds the legacy pair into a canonical Frictionless schema so the service and engine only ever see one shape. Response surfaces both for callers on either side of the migration — `schema` is canonical, `fields` / `primary_key` are mirrors marked `deprecated` in OpenAPI / IDE tooltips. When legacy input is used, the envelope grows a `warnings` array pulled directly from `Field(deprecated=...)` on the model — single source of truth for the warning text. Engine `create()` ABC takes a single `schema: dict` (no more separate `fields` / `unique_keys`); BigQuery + bigquery_test backends updated. Postgres <-> Frictionless type maps added in `core/constants.py`; conversion helpers in `schemas/validators.py`. README has a brief "Column definitions" section explaining the migration goal, and `example_payload/datastore_create/with_schema.json` provides a ready-to-curl Frictionless payload. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…cy fields
Extend the same dual-shape pattern shipped for `datastore_create` to
`datastore_info`. The response now carries both `schema` (canonical
Frictionless Table Schema) and `fields` (legacy `{id, type, info}` list
marked `deprecated` in OpenAPI), so clients on either side of the
migration see what they expect.
Engine `InfoResult` grows a `schema: dict` field next to the existing
`fields` and `meta`. Both placeholder backends populate both shapes —
the local `bigquery_test` engine projects its `DEMO_SCHEMA` into the
Frictionless `{name, type}` form without disturbing `search`, which
still uses the legacy shape via the same `DEMO_SCHEMA` constant.
Service passes both through verbatim; tests pin the new envelope keys.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…gacy fields
Extend the dual-shape response pattern shipped for `datastore_create` /
`datastore_info` to the streaming endpoints `datastore_search` and
`datastore_search_sql`. The CKAN envelope now carries `schema` (canonical
Frictionless Table Schema, same shape `datastore_create` stores) ahead
of `fields` (legacy `{id, type}` list, marked `deprecated` in OpenAPI),
so clients on either side of the migration see what they expect.
Engine `SearchResult` swaps `fields` for `schema` — engines produce the
canonical shape, the service derives the legacy list via the existing
`frictionless_schema_to_fields` helper. Per the migration direction,
schema is what we'll persist; legacy fields is a derived view.
Streaming writers (objects / lists / csv / tsv) all thread the new
`schema` kwarg through `_stream_envelope` so it's emitted lazily into
the body before the `records` block — peak memory still ≈ 1 row.
BigQuery placeholder + local `bigquery_test` backend updated. Test
mock-helper still accepts the legacy `{id, type}` shape for caller
convenience but converts to Frictionless before constructing
`SearchResult`. `test_datastore_search_sql` pins the new envelope key.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…e tables
First real BigQuery write path. `datastore_create` now persists the
declared Frictionless schema and creates / migrates the per-resource
data table.
Layout:
- `metadata.py` — `BigQueryMetadataStore` implements the new
engine-agnostic `MetadataStore` Protocol added to `engines/base.py`.
Owns the `_table_metadata` table (resource_id, schema JSON,
created_at, updated_at) with `initialize` / `insert` / `update` /
`get` / `delete`. Future engines drop a sibling `metadata.py` to
satisfy the same Protocol.
- `types.py` — Frictionless → BigQuery type map + `can_widen` for
`ALTER COLUMN SET DATA TYPE` rules (INT64 → NUMERIC/BIGNUMERIC/
FLOAT64; DATE → DATETIME/TIMESTAMP; everything else rejected).
- `lib.py` — pure helpers (schema diff, ALTER-clause rendering,
JSON-column serialisation, error formatting).
- `backend.py` — orchestration. `create()` runs DDL → records insert
→ metadata write in that order so any failure short-circuits before
the metadata row is written; metadata never describes a state the
table doesn't match. Two branch helpers (`_apply_new_resource` /
`_apply_existing_resource`) make the per-path sequence explicit.
`ALTER TABLE` packs added columns + supported type widenings into
one atomic statement; unsupported transitions raise `ConflictError`
with a recreate-the-resource hint up-front. Records are
stream-inserted via `insert_rows_json`; `object` / `array` /
`geojson` field values are JSON-encoded first since BigQuery's
`JSON` type expects strings on the wire.
- Every `client.query` and `client.insert_rows_json` call is funnelled
through `_run_query` / `_run_insert_rows` so transport / SQL errors
surface as `ServerError` with `op` + `resource_id` baked in — never
as raw `google.api_core` exceptions. Metadata store does the same
via its own `_run`.
Config: new `BIGQUERY_DATASET` env var (the dataset that holds
`_table_metadata` and all user data tables). Placeholder mode
(BIGQUERY_PROJECT or BIGQUERY_DATASET unset) keeps the unit suite
runnable without GCP creds — `create()` is a no-op echo there.
Tests: 46 new (`tests/test_bigquery_metadata.py` + `tests/test_bigquery_tables.py`)
covering SQL shape, parameter binding, the diff/widening rules, JSON
serialisation, atomicity (data-op failure must not write metadata), and
error wrapping at every layer.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…rror mapping
Wire the upsert action end-to-end with three methods on a single
backend method:
- "upsert" → MERGE keyed on schema.primaryKey (update existing,
insert new).
- "insert" → DML INSERT INTO ... SELECT FROM UNNEST(@rows).
Replaces the old streaming insert_rows_json — that API
parks rows in BigQuery's streaming buffer for 30-90 min
and silently blocks any follow-up MERGE/UPDATE on the
same table. DML writes go straight to storage, so a
datastore_create + immediate datastore_upsert now works.
- "update" → DML UPDATE keyed on primaryKey. Compares
num_dml_affected_rows against input row count and
raises NotFoundError if any row had no matching key
(UPDATE silently no-ops on misses otherwise).
Records ride as one JSON-array string parameter @rows for every path;
BigQuery unpacks via JSON_QUERY_ARRAY and type-aware extractors
(CAST(JSON_VALUE), PARSE_JSON(JSON_QUERY) for JSON columns). One
statement regardless of batch size.
BigQuery raw errors are mapped to clear ValidationErrors:
- "Scalar subquery produced more than one element" → "duplicated
rows with the same primary key".
- "Bad <type> value: <v>" / "Could not cast '<v>' to type <T>" /
"Could not parse '<v>' as <T>" → "Value <v> is not a valid
<integer|number|boolean|...>".
- "Value out of range for <T>: <v>" → "out of range" message.
- "Invalid <date|timestamp|datetime|time>: <v>" → typed message.
Refactor: _run_query and _run_dml merged — _run_query now returns the
QueryJob so DML callers can grab num_dml_affected_rows without a
second helper. Module docstring + section markers updated. Per-row
JSON-column serialiser removed (handled inside SQL via PARSE_JSON).
Tests:
- Test file restructured to one test per behaviour (~22 essentials,
no duplicates).
- New autouse conftest fixture clears BIGQUERY_PROJECT/DATASET/creds
+ resets engine cache so the suite stays hermetic regardless of a
developer's .env. Restores the placeholder-echo branch for tests
that predate the upsert wiring.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…E_UPDATED_AT - Always inject `_id` INT64; assign via inlined `(SELECT MAX(_id) FROM tbl) + ROW_NUMBER()` so no extra round-trip. - Add `_updated_at` TIMESTAMP behind `Config.INCLUDE_UPDATED_AT` (default on); MERGE bumps it only when a non-PK column actually differs (NULL-safe `IS DISTINCT FROM`, JSON canonicalised via `TO_JSON_STRING`). - Trim verbose docstrings/comments in bigquery/lib.py; rename `user_*` locals to `data_*` / `insert_*`. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR implements Frictionless Table Schema as the canonical datastore column representation, introduces a fully functional BigQuery backend with metadata persistence, refactors the engine plugin architecture for caching and warmup, and updates all API endpoints to expose both new and legacy schema formats while marking legacy fields as deprecated. ChangesFrictionless Schema Foundation & Configuration
Engine Architecture & Registry
BigQuery Backend Implementation
API Endpoints & Service Layer
Application Startup & Health
Tests & Examples
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…w_count - Backend reads the stored Frictionless schema from `_table_metadata` so `primaryKey` and the per-field `info` data dictionary round-trip exactly as declared; raises NotFoundError when undeclared. - Row count comes from BigQuery's per-dataset `__TABLES__.row_count` (metadata-only, no full-table scan) — `COUNT(*)` would have billed bytes per call. - Drop `fields` from InfoResult; the service derives legacy fields via `frictionless_schema_to_fields`, mirroring how `search` already works (engine stays out of the schemas/ layer). - Soft fallback: missing data table while metadata exists reports total=0 with a warning, instead of 500-ing the call. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 20
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datastore/infrastructure/engines/bigquery/backend.py (1)
539-585:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift
searchandsearch_sqlare still hard-coded empty results.Both methods ignore the actual table data and always return empty iterators, so every read path will behave as if the resource has zero rows. That is still a missing core backend implementation, not a stub that can ship behind the “real CRUD” PR scope.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 539 - 585, The search and search_sql methods currently return hard-coded empty SearchResult objects; implement real BigQuery queries in search and search_sql: in search(build a parameterised SELECT using the resource_id/table name, filters, q, distinct, fields, sort, limit and offset), execute via the BigQuery client (client.query(...)), use query_job.result() as a lazy iterator to populate records, construct schema from the returned query schema or provided fields, run a COUNT(*) when include_total is true to set total, and set records_truncated=True if the iterator reached the limit; in search_sql(execute the raw SQL with client.query(sql, job_config=...) , derive schema from query_job.schema/result, yield rows lazily from query_job.result(), and set records_truncated when the limit is hit). Ensure you reference and update the SearchResult construction in both search and search_sql to use the real schema, records iterator, total, and records_truncated values.
🟡 Minor comments (2)
README.md-56-63 (1)
56-63:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRefresh BigQuery backend status in the project tree.
backend.pyis still described as placeholder here; that no longer matches the implemented backend and may confuse readers.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@README.md` around lines 56 - 63, README's project tree still calls bigquery/backend.py a "DatastoreBackend subclass (placeholder)"; update the description to reflect that the BigQuery backend is implemented (reference the concrete class BigQueryBackend in bigquery/backend.py) and briefly list what the file provides (implemented backend logic rather than a placeholder). Keep surrounding entries (bigquery/client.py, lib.py, allowed_functions.txt) unchanged and ensure the wording matches the actual implemented functionality.CLAUDE.md-378-379 (1)
378-379:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate status text that still says engine logic is placeholder.
These lines now read as outdated and can mislead contributors about what is already implemented in this PR.
Also applies to: 807-808
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@CLAUDE.md` around lines 378 - 379, Replace the outdated status sentence "Engine business logic is still placeholder (returns empty results / echoes inputs)" with an accurate summary of current state: note that call path, validation, auth, streaming and per-engine datastore_search_sql allow-list are implemented and only the BigQuery adapter remains (referenced in §7); update the same phrasing wherever it repeats to avoid misleading contributors and ensure the status reflects what is implemented versus what remains.
🧹 Nitpick comments (1)
tests/test_health.py (1)
39-47: ⚡ Quick winAdd a
/docssmoke test in this health suite.A tiny assertion for
GET /docswould catch OpenAPI regressions early alongside liveness/readiness checks.Based on learnings: Ensure OpenAPI documentation loads at
/docswithout errors.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_health.py` around lines 39 - 47, Add a small smoke test that requests the OpenAPI UI so regressions are caught alongside liveness checks: either extend test_health_returns_ok or add a new test (e.g., test_docs_served) that calls client.get("/docs") and asserts a 200 response and basic content (e.g., response.status_code == 200 and response.headers["content-type"] contains "text/html" or response.text contains "Swagger" / "ReDoc") to ensure the docs page loads without errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.env.example:
- Around line 10-18: The .env.example is missing entries that match the fields
declared in datastore/core/config.py; add entries for HTTP_MAX_CONNECTIONS,
HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT (with example values or
blanks) using the exact variable names used in Config so pydantic-settings
doesn't ignore them, and update the other relevant section mentioned in the
comment to include the same variables for consistency.
In `@datastore/api/endpoints/health.py`:
- Around line 47-61: The 503 branch in datastore/api/endpoints/health.py returns
an "error" key instead of the CKAN envelope "result" object; update the
JSONResponse content for the failing branch to use the CKAN envelope shape {
"help": str(request.url), "success": False, "result": { ... } } (move the
existing "__type" and "message" fields currently under "error" into "result"),
keep status_code=503 and the same message building that uses failing and
request.url so the /ready response conforms to the required contract.
In `@datastore/core/constants.py`:
- Around line 61-115: The POSTGRES_TO_FRICTIONLESS and FRICTIONLESS_TO_POSTGRES
maps currently introduce non-canonical Frictionless types (interval -> duration
and entries for duration, year, yearmonth); remove those non-canonical mappings
and normalize to the repo's canonical Frictionless subset. Concretely, change
POSTGRES_TO_FRICTIONLESS to map "interval" to a canonical type (e.g., "string"
or "any") instead of "duration", and remove the "duration", "year", and
"yearmonth" keys from FRICTIONLESS_TO_POSTGRES (do not add new non-canonical
keys); leave only mappings for the canonical types listed in the repo (integer,
number, string, boolean, date, datetime, time, object, array, geopoint, geojson,
any) so legacy fields round-trip into canonical schemas.
In `@datastore/infrastructure/engines/base.py`:
- Around line 17-18: Change all bare collection annotations to be parameterized
with typing.Any: import Any from typing and update schema: dict to schema:
dict[str, Any], records: Iterator[tuple] to records: Iterator[tuple[Any, ...]]
(or Iterator[tuple[int, ...]] if elements have known types), and similarly
replace any bare list/dict/tuple elsewhere (e.g., the other annotated blocks
referenced) with list[dict[str, Any]] / dict[str, Any] / tuple[Any, ...] or more
specific element types where known; ensure typing.Iterator/Sequence/Mapping are
used consistently and add the necessary typing imports.
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 102-110: The readiness check currently returns healthy even when
BIGQUERY_DATASET is unset and metadata is disabled; modify BigQueryBackend so
initialize() records the disabled state (e.g., set self.metadata_available or
leave self.metadata as None) and update healthcheck() to return False when the
dataset/metadata layer is unavailable (check BIGQUERY_DATASET.strip() or the new
self.metadata_available flag), so /ready fails if create()/info() would be
disabled; apply the same change to the other identical dataset-check block later
in BigQueryBackend (the duplicate around the other initialization path).
- Around line 587-595: The delete() method in backend.py currently returns an
empty WriteResult and must be implemented to actually remove data: when filters
is None issue a DROP TABLE IF EXISTS <resource> via the BigQuery client (or use
client.delete_table on the fully-qualified table reference) and when filters is
provided build a parameterised DELETE FROM `<resource>` WHERE ... SQL (use
parameter binding to avoid injection) and execute it via client.query(), then
construct and return a WriteResult containing the row count/affected status;
update the delete() implementation to use the existing BigQuery client instance,
resource_id and filters parameters and populate WriteResult accordingly.
In `@datastore/infrastructure/engines/bigquery/client.py`:
- Around line 31-35: The RO credential path currently ignores
BIGQUERY_CREDENTIALS when BIGQUERY_CREDENTIALS_RO is blank; update the logic
that sets creds_raw (referencing config.BIGQUERY_CREDENTIALS_RO and
config.BIGQUERY_CREDENTIALS) so that when mode == "ro" and
BIGQUERY_CREDENTIALS_RO.strip() is empty it falls back to using
BIGQUERY_CREDENTIALS.strip() before falling back to ADC; ensure the resulting
creds_raw variable is the chosen string used downstream (same variable name) so
RO behaves like RW when only one service account is configured.
In `@datastore/infrastructure/engines/bigquery/lib.py`:
- Around line 15-17: The code currently silently drops user-declared reserved
columns (defined by SYSTEM_COLUMN_NAMES) when building schemas; instead,
validate incoming schema.fields against SYSTEM_COLUMN_NAMES and raise a clear,
fast-failing exception (e.g., ValueError or a domain-specific SchemaError) if
any field name intersects with SYSTEM_COLUMN_NAMES; update the validation logic
that iterates schema.fields (the same area referenced around lines 32-40) to
perform this check before any mutation so callers receive an explicit error
mentioning the offending field name(s) and that `_id`/`_updated_at` are
reserved.
- Around line 130-143: The current id allocation using id_expr (MAX(`_id`) +
ROW_NUMBER()) is unsafe for concurrent writers (see id_expr, table_ref,
sys_vals, include_updated_at, sys_cols, data_cols, data_extractors) and must be
replaced with a serialized or collision-free allocator; change the code to
either use a BigQuery SEQUENCE (NEXTVAL(sequence_name) per row) or switch to a
collision-free ID like GENERATE_UUID() for _id, and update both occurrences
(this block and the repeated logic around lines 199-210) so inserts select
NEXTVAL(...) or GENERATE_UUID() instead of MAX(_id)+ROW_NUMBER(). Ensure the
chosen approach preserves the sys_vals construction (include_updated_at branch)
and works for batch UNNEST(`@rows`) inserts.
In `@datastore/infrastructure/engines/bigquery/metadata.py`:
- Around line 90-112: The insert() method can race and create multiple rows for
the same resource_id because it uses a plain INSERT; change it to an idempotent,
atomic operation (e.g., MERGE ... WHEN NOT MATCHED THEN INSERT or INSERT ...
SELECT WHERE NOT EXISTS) against self.table_ref so a second concurrent create
will be no-op rather than a duplicate; use the same parameters currently passed
into _run and _schema_params and keep the op string (e.g., "metadata INSERT")
while ensuring the query only inserts when resource_id is absent; also apply the
same MERGE/conditional-insert pattern to the analogous code referenced at the
151-172 range to preserve the one-row-per-resource invariant.
In `@datastore/infrastructure/engines/registry.py`:
- Line 67: The helper signatures need concrete types to avoid leaking Any:
annotate _build_engine(engine: str, mode: Mode, *, config: Mapping[str, Any],
context: Optional[Mapping[str, Any]] = None) (or replace Mapping[...] with your
project-specific Config/Context type if one exists) and similarly annotate
warmup_engines(config: Mapping[str, Any]) (and its return type, e.g., None or
Coroutine[...] if async). Import Optional and Mapping/Any from typing and use
the project Config type if present so strict mypy sees concrete parameter types
through the backend construction path.
- Around line 95-100: warmup_engines currently caches engines under
_INSTANCES[(engine, mode)] that may capture a per-request RequestContext and
cause auth/CKAN state leakage; change the registry so only context-free
instances are cached and any engine built with a RequestContext is not stored in
_INSTANCES. Specifically: ensure warmup_engines calls _build_engine with
context=None and stores only context-free engines; update get_datastore_engine
(and the similar block around the other occurrence) to detect when a
RequestContext is provided and avoid writing that instance into _INSTANCES
(instead return a transient engine or clone a context-free engine with the
request context applied), and rely on cached engines only when context is None.
Ensure references: warmup_engines, get_datastore_engine, _INSTANCES, and
_build_engine are updated accordingly.
In `@datastore/main.py`:
- Around line 44-47: The warmed read/write engines created by warmup_engines
aren't being attached to the FastAPI app state; modify the lifespan startup to
store concrete engine instances on app.state (e.g. app.state.rw_engine and
app.state.ro_engine) after calling warmup_engines and ensure the
reset_engine_cache callback clears those same app.state entries; update
warmup_engines (or its caller) to return or assign the actual engine objects so
the /ready endpoint (datastore/api/endpoints/health.py) can call
app.state.rw_engine.healthcheck() and app.state.ro_engine.healthcheck(),
returning 503 if either healthcheck fails.
In `@datastore/schemas/request.py`:
- Around line 280-286: The current model validator _require_resource_id_or_id
allows both resource_id and id to be set and silently prefers resource_id;
change it to enforce an exclusive-or: if both resource_id and id are provided
raise ValueError("only one of 'resource_id' or 'id' may be set"), if neither
provided raise the existing error, and if only id is provided normalize by
assigning self.resource_id = self.id before returning; apply the same XOR
enforcement and normalization change to the corresponding validator method in
DatastoreDeleteRequest so both request types reject ambiguous dual-field
requests and still normalize id -> resource_id when appropriate.
In `@datastore/schemas/validators.py`:
- Around line 186-207: The validate_frictionless_schema function must reject
Frictionless field types outside our canonical vocabulary before calling
Schema.from_descriptor; update validate_frictionless_schema to, when value is a
dict containing a "fields" list, iterate each field and verify its "type" is one
of the canonical types (integer, number, string, boolean, date, datetime, time,
object, array, geopoint, geojson, any) and raise ValueError listing offending
types if any are found; keep the existing Schema.from_descriptor call for full
descriptor validation and do not perform alias normalization here (that belongs
in storage code).
- Around line 117-132: fields_to_frictionless_schema() currently flattens
non-title/description info keys onto the top-level field object (see variables
info, extra, fr), but frictionless_schema_to_fields() only reads back extras
from field["info"], losing flattened keys like unit or notes; update
frictionless_schema_to_fields() so when rebuilding legacy field dicts it
collects any top-level keys on the frictionless field object that are not the
canonical keys (e.g., name, type, title, description, info) and merges them into
the restored info/extras before appending to fr_fields (mirror the inverse of
the extra merging done in fields_to_frictionless_schema()); apply the same logic
where similar reconstruction occurs in the 140-173 range.
In `@datastore/services/write.py`:
- Around line 83-96: The service delete_datastore currently returns a Pydantic
response model DatastoreDeleteResponse.Result; change it to return a plain
Python data structure (e.g., a dict) with the same fields so the service layer
is not coupled to response schemas; locate delete_datastore (and its use of
get_datastore_engine and engine.delete), remove the Pydantic model from the
return and instead return {"resource_id": resource_id, "filters": filters} (or
an equivalent tuple/dataclass) and ensure any callers expect the plain dict
rather than DatastoreDeleteResponse.Result.
- Around line 88-91: The code currently does `filters = data_dict.get("filters")
or None`, which converts empty dicts to None and can change delete semantics;
change it to preserve explicit empty filters by assigning `filters =
data_dict.get("filters")` (or use `data_dict["filters"]` with a key-existence
check) and pass that through to
`get_datastore_engine(...).delete(resource_id=resource_id, filters=filters)` so
only a missing key yields None while `{}` remains `{}`.
In `@tests/test_health.py`:
- Around line 20-23: The function _clean_engine_cache is untyped and fails
strict mypy; add explicit typing by importing typing.Iterator and annotating the
generator as def _clean_engine_cache() -> Iterator[None]: (or Generator[None,
None, None] if you prefer), and annotate any other untyped generator/fixture
defs in the file (the other defs flagged in the review) similarly; ensure you
add the necessary import (from typing import Iterator or Generator) and any
missing parameter/return type annotations to satisfy strict = true.
---
Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 539-585: The search and search_sql methods currently return
hard-coded empty SearchResult objects; implement real BigQuery queries in search
and search_sql: in search(build a parameterised SELECT using the
resource_id/table name, filters, q, distinct, fields, sort, limit and offset),
execute via the BigQuery client (client.query(...)), use query_job.result() as a
lazy iterator to populate records, construct schema from the returned query
schema or provided fields, run a COUNT(*) when include_total is true to set
total, and set records_truncated=True if the iterator reached the limit; in
search_sql(execute the raw SQL with client.query(sql, job_config=...) , derive
schema from query_job.schema/result, yield rows lazily from query_job.result(),
and set records_truncated when the limit is hit). Ensure you reference and
update the SearchResult construction in both search and search_sql to use the
real schema, records iterator, total, and records_truncated values.
---
Minor comments:
In `@CLAUDE.md`:
- Around line 378-379: Replace the outdated status sentence "Engine business
logic is still placeholder (returns empty results / echoes inputs)" with an
accurate summary of current state: note that call path, validation, auth,
streaming and per-engine datastore_search_sql allow-list are implemented and
only the BigQuery adapter remains (referenced in §7); update the same phrasing
wherever it repeats to avoid misleading contributors and ensure the status
reflects what is implemented versus what remains.
In `@README.md`:
- Around line 56-63: README's project tree still calls bigquery/backend.py a
"DatastoreBackend subclass (placeholder)"; update the description to reflect
that the BigQuery backend is implemented (reference the concrete class
BigQueryBackend in bigquery/backend.py) and briefly list what the file provides
(implemented backend logic rather than a placeholder). Keep surrounding entries
(bigquery/client.py, lib.py, allowed_functions.txt) unchanged and ensure the
wording matches the actual implemented functionality.
---
Nitpick comments:
In `@tests/test_health.py`:
- Around line 39-47: Add a small smoke test that requests the OpenAPI UI so
regressions are caught alongside liveness checks: either extend
test_health_returns_ok or add a new test (e.g., test_docs_served) that calls
client.get("/docs") and asserts a 200 response and basic content (e.g.,
response.status_code == 200 and response.headers["content-type"] contains
"text/html" or response.text contains "Swagger" / "ReDoc") to ensure the docs
page loads without errors.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 40646cc2-c6d2-4774-90ba-512c1e517b04
📒 Files selected for processing (44)
.env.example.gitignoreCLAUDE.mdREADME.mddatastore/api/endpoints/datastore.pydatastore/api/endpoints/health.pydatastore/api/responses.pydatastore/core/config.pydatastore/core/constants.pydatastore/infrastructure/engines/base.pydatastore/infrastructure/engines/bigquery/__init__.pydatastore/infrastructure/engines/bigquery/backend.pydatastore/infrastructure/engines/bigquery/client.pydatastore/infrastructure/engines/bigquery/lib.pydatastore/infrastructure/engines/bigquery/metadata.pydatastore/infrastructure/engines/bigquery/types.pydatastore/infrastructure/engines/registry.pydatastore/main.pydatastore/schemas/request.pydatastore/schemas/responses.pydatastore/schemas/validators.pydatastore/services/read.pydatastore/services/streaming.pydatastore/services/write.pyexample_payload/README.mdexample_payload/datastore_create/with_schema.jsonexample_payload/datastore_delete/force_readonly.jsonexample_payload/datastore_delete/whole_table.jsonexample_payload/datastore_delete/with_filters.jsonexample_payload/datastore_info/basic.jsonexample_payload/datastore_info/with_id_alias.jsonexample_payload/datastore_search_sql/aggregate.jsonexample_payload/datastore_search_sql/basic.jsonexample_payload/datastore_search_sql/with_cte.jsontests/conftest.pytests/test_bigquery_metadata.pytests/test_bigquery_tables.pytests/test_datastore_create.pytests/test_datastore_delete.pytests/test_datastore_info.pytests/test_datastore_search.pytests/test_datastore_search_sql.pytests/test_health.pytests/test_write_service.py
| # Selects the storage backend (must match a folder under | ||
| # `datastore/infrastructure/engines/`): | ||
| # bigquery — real BigQuery adapter (placeholder while being built). | ||
| # ducklake — Future planned | ||
| # ducklake — Future planned. | ||
| DATASTORE_ENGINE=bigquery | ||
| BQ_PROJECT= | ||
| BIGQUERY_PROJECT= | ||
| BIGQUERY_DATASET= | ||
| BIGQUERY_CREDENTIALS= | ||
| BIGQUERY_CREDENTIALS_RO= |
There was a problem hiding this comment.
Keep .env.example aligned with the actual settings model.
HTTP_MAX_CONNECTIONS and HTTP_MAX_KEEPALIVE_CONNECTIONS are documented here, but datastore/core/config.py does not declare either field, so pydantic-settings will silently ignore them. INCLUDE_UPDATED_AT was added to Config as well, but operators still don't get an example for that toggle here.
Also applies to: 26-27
🧰 Tools
🪛 dotenv-linter (4.0.0)
[warning] 15-15: [UnorderedKey] The BIGQUERY_PROJECT key should go before the DATASTORE_ENGINE key
(UnorderedKey)
[warning] 16-16: [UnorderedKey] The BIGQUERY_DATASET key should go before the BIGQUERY_PROJECT key
(UnorderedKey)
[warning] 17-17: [UnorderedKey] The BIGQUERY_CREDENTIALS key should go before the BIGQUERY_DATASET key
(UnorderedKey)
[warning] 18-18: [UnorderedKey] The BIGQUERY_CREDENTIALS_RO key should go before the BIGQUERY_DATASET key
(UnorderedKey)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In @.env.example around lines 10 - 18, The .env.example is missing entries that
match the fields declared in datastore/core/config.py; add entries for
HTTP_MAX_CONNECTIONS, HTTP_MAX_KEEPALIVE_CONNECTIONS, and INCLUDE_UPDATED_AT
(with example values or blanks) using the exact variable names used in Config so
pydantic-settings doesn't ignore them, and update the other relevant section
mentioned in the comment to include the same variables for consistency.
| if failing: | ||
| return JSONResponse( | ||
| status_code=503, | ||
| content={ | ||
| "help": str(request.url), | ||
| "success": False, | ||
| "error": { | ||
| "__type": "Service Unavailable", | ||
| "message": ( | ||
| f"engine healthcheck failed for mode(s): " | ||
| f"{', '.join(failing)}" | ||
| ), | ||
| }, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Keep /ready failure responses in CKAN health envelope shape.
The 503 branch returns an error object instead of a result object, which breaks the required health endpoint response contract.
As per coding guidelines "datastore/api/endpoints/health.py: Health endpoints (/, /health, /ready) must return CKAN envelope shape: {help, success, result: {...}}".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@datastore/api/endpoints/health.py` around lines 47 - 61, The 503 branch in
datastore/api/endpoints/health.py returns an "error" key instead of the CKAN
envelope "result" object; update the JSONResponse content for the failing branch
to use the CKAN envelope shape { "help": str(request.url), "success": False,
"result": { ... } } (move the existing "__type" and "message" fields currently
under "error" into "result"), keep status_code=503 and the same message building
that uses failing and request.url so the /ready response conforms to the
required contract.
| async def info_datastore( | ||
| context: RequestContext, data_dict: dict[str, Any] | ||
| ) -> DatastoreInfoResponse.Result: | ||
| """Look up table metadata for a single `resource_id`. | ||
|
|
||
| Endpoint authorizes the caller first (same gate as `search`). This | ||
| service just asks the read-only engine for its `InfoResult` and | ||
| re-shapes it as the response's typed `Result`. No streaming — | ||
| `info` responses are small enough for the standard `_success_response` | ||
| path. | ||
| """ | ||
| engine = get_datastore_engine(context, mode="ro") | ||
| result = engine.info(resource_id=data_dict["resource_id"]) | ||
| fields, _ = frictionless_schema_to_fields(result.schema) | ||
| return DatastoreInfoResponse.Result( | ||
| meta=result.meta, | ||
| schema=result.schema, | ||
| fields=fields, | ||
| ) |
There was a problem hiding this comment.
info_datastore should return a plain dict/dataclass, not DatastoreInfoResponse.Result.
Line 152 and Line 164 couple the service layer to API response models. Keep response-model construction at the API boundary to preserve one-way layering.
As per coding guidelines: "{datastore/services,datastore/infrastructure/engines}/**/*.py: Use plain dicts, tuples, and dataclasses for data passed between services and engines—never Pydantic models."
| async def delete_datastore( | ||
| context: RequestContext, data_dict: dict[str, Any] | ||
| ) -> DatastoreDeleteResponse.Result: | ||
| """Delete rows matching `filters`, or drop the whole table.""" | ||
| resource_id = data_dict["resource_id"] | ||
| filters = data_dict.get("filters") or None | ||
|
|
||
| engine = get_datastore_engine(context, mode="rw") | ||
| engine.delete(resource_id=resource_id, filters=filters) | ||
|
|
||
| return DatastoreDeleteResponse.Result( | ||
| resource_id=resource_id, | ||
| filters=filters, | ||
| ) |
There was a problem hiding this comment.
Service layer should return plain data structures, not Pydantic Result models.
delete_datastore returns DatastoreDeleteResponse.Result, which couples services to response-schema models. That breaks the layer contract and makes services API-shape aware.
As per coding guidelines: "{datastore/services,datastore/infrastructure/engines}/**/*.py: Use plain dicts, tuples, and dataclasses for data passed between services and engines—never Pydantic models."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@datastore/services/write.py` around lines 83 - 96, The service
delete_datastore currently returns a Pydantic response model
DatastoreDeleteResponse.Result; change it to return a plain Python data
structure (e.g., a dict) with the same fields so the service layer is not
coupled to response schemas; locate delete_datastore (and its use of
get_datastore_engine and engine.delete), remove the Pydantic model from the
return and instead return {"resource_id": resource_id, "filters": filters} (or
an equivalent tuple/dataclass) and ensure any callers expect the plain dict
rather than DatastoreDeleteResponse.Result.
… richer _links - New `bigquery/search.py` builders: parameterised SELECT (typed filter binds, IN UNNEST for list filters, native `SEARCH()` for full-text on whole row or per-column), validated sort/projection, and a matching COUNT subquery for filtered/distinct totals. - Backend dispatches search + count jobs before awaiting either, so they run in parallel — wall time ≈ max(both). Unfiltered+non-distinct totals fall back to the cheap row-count helper. - Schema-level limit cap dropped; new `Config.SEARCH_RESULT_ROWS_MAX` (default 32000) is the env-driven ceiling enforced at the service boundary with a "paginate with offset" hint. - `_links` now carries conditional `prev` / `next` plus `page_size`, `page`, and `total_pages`. `page` / `total_pages` are suppressed when the current page has no rows (empty resource or past-end) so a UI never sees an incoherent "page 5 of 4". Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… defaults
- Don't silently coerce `filters={}` to None on delete; an empty dict now reaches the engine as-is so the contract distinguishes "no-WHERE delete" from "drop table".
- Reject `_id` / `_updated_at` in user-provided field/schema names instead of silently dropping them — a hidden drop would leave the response advertising columns the engine refuses to populate.
- Reject non-canonical Frictionless types (`duration`, `year`, `yearmonth`, …) at the request boundary and trim them from the type maps; the datastore commits to a narrower vocabulary.
- 400 when `resource_id` and `id` arrive with different values on `datastore_info` / `datastore_delete` (legacy clients echoing the same value still work).
- `/ready` 503 stays in the StatusResponse envelope (`result.status: "not_ready"`) instead of an error envelope; mode names no longer leak into the response.
- Mount `/health` and `/ready` under both `/` and `/api/3/action/` so k8s probes and CKAN clients both reach them; welcome stays root-only.
- `Config` and `.env.example` realigned: drop the orphan `HTTP_MAX_*` from the template, document `INCLUDE_UPDATED_AT` + `SEARCH_RESULT_ROWS_MAX`.
- `healthcheck()` fails when the dataset / metadata store is unconfigured (was passing on `SELECT 1` even when writes would silently no-op).
- Preserve all `info` keys on the legacy ⇄ Frictionless roundtrip (extras now nest under `info` instead of being flattened and lost).
- Type annotations across base.py, registry.py helpers, and test_health.py to stop leaking `Any` under strict mypy.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
datastore/services/read.py (1)
104-105:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
datastore_search_sqlignores the configurable row cap.
search_sql_datastorestill hardcodes32000via_SQL_DEFAULT_LIMIT(Line 104, Line 140, Line 149), soConfig.SEARCH_RESULT_ROWS_MAXdoes not govern SQL search results.Suggested fix
-_SQL_DEFAULT_LIMIT = 32000 - - async def search_sql_datastore( @@ - result = engine.search_sql( - sql=data_dict["sql"], limit=_SQL_DEFAULT_LIMIT - ) + max_limit = context.config.SEARCH_RESULT_ROWS_MAX + result = engine.search_sql(sql=data_dict["sql"], limit=max_limit) @@ - limit=_SQL_DEFAULT_LIMIT, + limit=max_limit, @@ - links=_build_pagination_links( - request_url, limit=_SQL_DEFAULT_LIMIT, offset=0, total=None, - ), + links=_build_pagination_links( + request_url, limit=max_limit, offset=0, total=None, + ),Also applies to: 140-155
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@datastore/services/read.py` around lines 104 - 105, The SQL search path currently uses the hardcoded _SQL_DEFAULT_LIMIT (32000) which bypasses the runtime configuration; update datastore_search_sql to derive its limit from Config.SEARCH_RESULT_ROWS_MAX (with a sensible fallback if unset) instead of using _SQL_DEFAULT_LIMIT, and replace uses of _SQL_DEFAULT_LIMIT in the query-building/limit-clamping logic so the effective LIMIT value is calculated from Config.SEARCH_RESULT_ROWS_MAX (and still enforces any minimum/maximum checks); ensure references in datastore_search_sql and any helper that currently reference _SQL_DEFAULT_LIMIT are switched to the new config-driven value.datastore/infrastructure/engines/bigquery/backend.py (2)
465-470:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
create()returns a dict instead ofWriteResult.The method signature declares
-> WriteResultbut returns a plain dict. This breaks the type contract and will fail strict type checking. The same issue exists inupsert()at lines 503-509 and 531-537.Proposed fix for create()
- return { - "schema": schema, - "records": records, - "include_total": include_total, - "total": len(records or []) if include_total else None, - } + return WriteResult( + rows_written=len(records or []), + total=len(records or []) if include_total else None, + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 465 - 470, The create() and upsert() functions currently return a plain dict but declare -> WriteResult; update both to return an actual WriteResult instance instead of a dict: import or reference the WriteResult class/type and construct WriteResult(schema=schema, records=records, include_total=include_total, total=(len(records or []) if include_total else None)) (apply the same change in both create() and the two upsert() return sites) so the return value matches the declared type and passes static typing.
501-537:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
upsert()returns a dict instead ofWriteResult.Both placeholder-mode (lines 503-509) and normal (lines 531-537) return paths return plain dicts instead of
WriteResultdataclass instances, breaking the declared return type.Proposed fix
if self.metadata is None: # Placeholder mode — echo (matches the create() pattern). - return { - "resource_id": resource_id, - "records": records, - "method": method, - "include_total": include_total, - "total": len(records or []), - } + return WriteResult( + rows_written=len(records or []), + total=len(records or []) if include_total else None, + ) ... - return { - "resource_id": resource_id, - "records": records, - "method": method, - "include_total": include_total, - "total": len(rows) if include_total else None, - } + return WriteResult( + rows_written=len(rows), + total=len(rows) if include_total else None, + )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@datastore/infrastructure/engines/bigquery/backend.py` around lines 501 - 537, The upsert() implementation currently returns plain dicts in both placeholder mode and the normal path; change both return statements to construct and return a WriteResult dataclass instance (instead of a dict) with the same fields (resource_id, records, method, include_total, total). Locate the upsert method and replace the first return block that handles placeholder mode and the final return block after calling _insert_records/_merge_records/_update_records to return WriteResult(...) with the appropriate field values (compute total as len(rows) or len(records or []) as before). Ensure WriteResult is imported where needed and that the object shape matches the declared return type.
♻️ Duplicate comments (2)
datastore/services/write.py (1)
83-103:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReturn plain data from the service instead of
DatastoreDeleteResponse.Result.Line 85 and Line 100 keep this service coupled to response-schema models. Return a plain dict (or dataclass) from
delete_datastore, and let the API layer shape it into the response model.Suggested minimal change
-async def delete_datastore( - context: RequestContext, data_dict: dict[str, Any] -) -> DatastoreDeleteResponse.Result: +async def delete_datastore( + context: RequestContext, data_dict: dict[str, Any] +) -> dict[str, Any]: @@ - return DatastoreDeleteResponse.Result( - resource_id=resource_id, - filters=filters, - ) + return { + "resource_id": resource_id, + "filters": filters, + }As per coding guidelines, "
datastore/{services,infrastructure}/**/*.py: Use Pydantic v2 for request/response validation at boundaries only; never pass Pydantic models between services or return them from engines."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@datastore/services/write.py` around lines 83 - 103, The service function delete_datastore currently returns a Pydantic response model DatastoreDeleteResponse.Result, coupling the service layer to the API schema; change it to return plain data (a dict or simple dataclass) containing the same fields (resource_id and filters) instead. Locate delete_datastore, keep the call to get_datastore_engine(...) and engine.delete(resource_id=..., filters=...) unchanged, then construct and return a plain Python dict (or dataclass instance) with keys resource_id and filters instead of DatastoreDeleteResponse.Result so the API layer can perform Pydantic v2 shaping.datastore/schemas/validators.py (1)
133-134:⚠️ Potential issue | 🟠 Major | ⚡ Quick winExtra metadata still spread onto field top-level, contradicting docstring and breaking round-trip.
The docstring at lines 104-109 states extras "stays nested under a custom
infokey", but line 134 spreadsextraonto the top-level field object (fr = {**fr, **extra}). This meansfrictionless_schema_to_fields()(which only reads fromfield["info"]) cannot recover these extras, losing keys likeunit,notes, etc.Proposed fix
if extra: - fr = {**fr, **extra} + fr["info"] = extra fr_fields.append(fr)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@datastore/schemas/validators.py` around lines 133 - 134, The extras dict is being spread onto the field top-level (fr = {**fr, **extra}) which contradicts the docstring and prevents frictionless_schema_to_fields() from round-tripping extras from field["info"]; instead merge extras into the field's "info" sub-dict so existing info is preserved (e.g., set fr["info"] = {**fr.get("info", {}), **extra}) and remove the top-level spread; update the block around the function that constructs fr to nest extra under "info" rather than merging into fr directly.
🧹 Nitpick comments (1)
tests/test_bigquery_tables.py (1)
1060-1070: 💤 Low valueTest comment references
__TABLES__but implementation usesCOUNT(*).The comment at line 1062 mentions "
__TABLES__/_count_rowspath" but the actual_count_rowsimplementation now usesSELECT COUNT(*) AS n FROM <table>, not BigQuery's__TABLES__metadata. Consider updating the comment for accuracy.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_bigquery_tables.py` around lines 1060 - 1070, The test docstring mentions the `__TABLES__`/`_count_rows` path but the current `_count_rows` implementation uses a `SELECT COUNT(*) AS n FROM <table>` query; update the `test_needs_count_query_only_when_filtering_or_distinct` docstring (or inline comment) to accurately describe that unfiltered + non-distinct searches use the cheaper COUNT(*) path (or remove the `__TABLES__` reference) and keep references to `needs_count_query` and `_count_rows` so reviewers can locate the related logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@datastore/infrastructure/engines/bigquery/backend.py`:
- Around line 465-470: The create() and upsert() functions currently return a
plain dict but declare -> WriteResult; update both to return an actual
WriteResult instance instead of a dict: import or reference the WriteResult
class/type and construct WriteResult(schema=schema, records=records,
include_total=include_total, total=(len(records or []) if include_total else
None)) (apply the same change in both create() and the two upsert() return
sites) so the return value matches the declared type and passes static typing.
- Around line 501-537: The upsert() implementation currently returns plain dicts
in both placeholder mode and the normal path; change both return statements to
construct and return a WriteResult dataclass instance (instead of a dict) with
the same fields (resource_id, records, method, include_total, total). Locate the
upsert method and replace the first return block that handles placeholder mode
and the final return block after calling
_insert_records/_merge_records/_update_records to return WriteResult(...) with
the appropriate field values (compute total as len(rows) or len(records or [])
as before). Ensure WriteResult is imported where needed and that the object
shape matches the declared return type.
In `@datastore/services/read.py`:
- Around line 104-105: The SQL search path currently uses the hardcoded
_SQL_DEFAULT_LIMIT (32000) which bypasses the runtime configuration; update
datastore_search_sql to derive its limit from Config.SEARCH_RESULT_ROWS_MAX
(with a sensible fallback if unset) instead of using _SQL_DEFAULT_LIMIT, and
replace uses of _SQL_DEFAULT_LIMIT in the query-building/limit-clamping logic so
the effective LIMIT value is calculated from Config.SEARCH_RESULT_ROWS_MAX (and
still enforces any minimum/maximum checks); ensure references in
datastore_search_sql and any helper that currently reference _SQL_DEFAULT_LIMIT
are switched to the new config-driven value.
---
Duplicate comments:
In `@datastore/schemas/validators.py`:
- Around line 133-134: The extras dict is being spread onto the field top-level
(fr = {**fr, **extra}) which contradicts the docstring and prevents
frictionless_schema_to_fields() from round-tripping extras from field["info"];
instead merge extras into the field's "info" sub-dict so existing info is
preserved (e.g., set fr["info"] = {**fr.get("info", {}), **extra}) and remove
the top-level spread; update the block around the function that constructs fr to
nest extra under "info" rather than merging into fr directly.
In `@datastore/services/write.py`:
- Around line 83-103: The service function delete_datastore currently returns a
Pydantic response model DatastoreDeleteResponse.Result, coupling the service
layer to the API schema; change it to return plain data (a dict or simple
dataclass) containing the same fields (resource_id and filters) instead. Locate
delete_datastore, keep the call to get_datastore_engine(...) and
engine.delete(resource_id=..., filters=...) unchanged, then construct and return
a plain Python dict (or dataclass instance) with keys resource_id and filters
instead of DatastoreDeleteResponse.Result so the API layer can perform Pydantic
v2 shaping.
---
Nitpick comments:
In `@tests/test_bigquery_tables.py`:
- Around line 1060-1070: The test docstring mentions the
`__TABLES__`/`_count_rows` path but the current `_count_rows` implementation
uses a `SELECT COUNT(*) AS n FROM <table>` query; update the
`test_needs_count_query_only_when_filtering_or_distinct` docstring (or inline
comment) to accurately describe that unfiltered + non-distinct searches use the
cheaper COUNT(*) path (or remove the `__TABLES__` reference) and keep references
to `needs_count_query` and `_count_rows` so reviewers can locate the related
logic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8d9d8da2-9761-48ea-9fe6-1b5c8ffa3d4b
📒 Files selected for processing (22)
.env.exampledatastore/api/endpoints/health.pydatastore/api/routes.pydatastore/core/config.pydatastore/core/constants.pydatastore/infrastructure/engines/base.pydatastore/infrastructure/engines/bigquery/backend.pydatastore/infrastructure/engines/bigquery/client.pydatastore/infrastructure/engines/bigquery/search.pydatastore/infrastructure/engines/registry.pydatastore/schemas/request.pydatastore/schemas/responses.pydatastore/schemas/validators.pydatastore/services/read.pydatastore/services/write.pytests/test_bigquery_tables.pytests/test_datastore_delete.pytests/test_datastore_info.pytests/test_datastore_search.pytests/test_datastore_search_sql.pytests/test_health.pytests/test_read_service.py
Summary
Replaces the BigQuery engine placeholder with a working backend covering all six
datastore_*actions, plus the surrounding wiring (Frictionless schemas as the canonical column shape, real/readyhealthcheck, system columns).Highlights
infrastructure/engines/bigquery/backend.pynow implementsdatastore_create,upsert,delete,search,search_sql,infoagainst real BigQuery. Split intoclient.py(auth/Client construction),lib.py(pure SQL helpers — DDL, MERGE, INSERT, UPDATE),metadata.py(_table_metadatarow store for per-resource Frictionless schemas),types.py(Frictionless ↔ BigQuery type map + widening rules).datastore_createacceptsschema(Frictionless Table Schema); legacyfields+primary_keyare auto-converted and emitDeprecationWarning.datastore_info/datastore_search/datastore_search_sqlreturn both legacyfieldsand Frictionlessschemaso clients can migrate at their own pace.INSERT INTO ... SELECT FROM UNNEST(JSON_QUERY_ARRAY(@rows))/MERGE/UPDATE … FROM) to avoid BigQuery's 30–90 min streaming-buffer lockout that blocks follow-up MERGE/UPDATE on the same row._id(INT64, monotonically increasing via inlined(SELECT IFNULL(MAX(_id), 0) FROM tbl) + ROW_NUMBER()) and, behindConfig.INCLUDE_UPDATED_AT(default on),_updated_atTIMESTAMP. MERGE only bumps_updated_atwhen a non-PK column actually differs (NULL-safeIS DISTINCT FROM, JSON canonicalised viaTO_JSON_STRING)./ready— engines are constructed once at startup and stashed onapp.state;/readycallsengine.healthcheck()for both read and write engines and returns 503 when either fails.ValidationError/ConflictError/ServerError(duplicate-PK on MERGE, bad-type casts, out-of-range, invalid date/timestamp, etc.).Layout (new files)
MetadataStoreis a Protocol so future engines (DuckLake, Postgres) plug in without copying the backend.Test plan
pytest— all 180 tests green (engine-level tables tests, metadata store tests, endpoint tests, write service units)rg "from (fastapi|starlette)" datastore/services datastore/infrastructure datastore/corereturns nothingdatastore_createwith Frictionless schema → table +_table_metadatarow createddatastore_createwith legacyfields+primary_key→ deprecation warning, same resultdatastore_upsert(method=upsert) — new rows inserted, matching rows updated only when non-PK columns differ,_updated_atadvances only on real changedatastore_upsert(method=update) —NotFoundErrorwhen any PK is unmatcheddatastore_searchreturns bothfieldsandschemadatastore_search_sqlblocks functions outsideallowed_functions.txtdatastore_inforound-trips theinfodata dictionary verbatimValidationError, not a 500"jk"into a number column) surfaces asValidationErrorwith the column name/readyreturns 503 when BigQuery credentials are badConfig.INCLUDE_UPDATED_AT=false→ table created without_updated_at; MERGE/UPDATE SQL omits the column🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Deprecations
Documentation
Tests