Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ climate-api.yaml
eo_api.egg-info/
data/downloads
data/artifacts
data/derived
data/pygeoapi
docs/internal/
site/
33 changes: 32 additions & 1 deletion climate_api/data/datasets/chirps3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,40 @@
sync_execution: append
sync_availability:
latest_available_function: climate_api.providers.availability.chirps3_daily_latest_available
ingestion:
ingestion:
function: dhis2eo.data.chc.chirps3.daily.download
units: mm
resolution: 5 km x 5 km
source: CHIRPS v3
source_url: https://www.chc.ucsb.edu/data/chirps3

- id: chirps3_precipitation_weekly
name: Total precipitation weekly (CHIRPS3)
short_name: Total precipitation weekly
variable: precip
period_type: weekly
sync_kind: derived
processing:
process_id: resample
source_dataset_id: chirps3_precipitation_daily
method: sum
week_start: monday
units: mm
resolution: 5 km x 5 km
source: CHIRPS v3
source_url: https://www.chc.ucsb.edu/data/chirps3

- id: chirps3_precipitation_monthly
name: Total precipitation monthly (CHIRPS3)
short_name: Total precipitation monthly
variable: precip
period_type: monthly
sync_kind: derived
processing:
process_id: resample
source_dataset_id: chirps3_precipitation_daily
method: sum
units: mm
resolution: 5 km x 5 km
source: CHIRPS v3
source_url: https://www.chc.ucsb.edu/data/chirps3
32 changes: 31 additions & 1 deletion climate_api/data/datasets/era5_land.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
sync_availability:
latest_available_function: climate_api.providers.availability.lagged_latest_available
lag_hours: 120
ingestion:
ingestion:
function: dhis2eo.data.destine.era5_land.hourly.download
default_params:
variables: ['tp']
Expand All @@ -38,3 +38,33 @@
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://earthdatahub.destine.eu/collections/era5/datasets/reanalysis-era5-land

- id: era5land_temperature_daily
name: Daily mean 2m temperature (ERA5-Land)
short_name: Daily mean 2m temperature
variable: t2m
period_type: daily
sync_kind: derived
processing:
process_id: resample
source_dataset_id: era5land_temperature_hourly
method: mean
units: degC
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://earthdatahub.destine.eu/collections/era5/datasets/reanalysis-era5-land

- id: era5land_precipitation_daily
name: Daily total precipitation (ERA5-Land)
short_name: Daily total precipitation
variable: tp
period_type: daily
sync_kind: derived
processing:
process_id: resample
source_dataset_id: era5land_precipitation_hourly
method: sum
units: mm
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://earthdatahub.destine.eu/collections/era5/datasets/reanalysis-era5-land
12 changes: 10 additions & 2 deletions climate_api/data_accessor/services/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tempfile
from typing import Any

import numpy as np
import xarray as xr

from ...data_manager.services.downloader import get_cache_files, get_zarr_path
Expand Down Expand Up @@ -135,8 +136,8 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any
time_dim = get_time_dim(ds)
lon_dim, lat_dim = get_lon_lat_dims(ds)

start = numpy_datetime_to_period_string(ds[time_dim].min(), period_type) # type: ignore[arg-type]
end = numpy_datetime_to_period_string(ds[time_dim].max(), period_type) # type: ignore[arg-type]
start = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].min(), period_type)) # type: ignore[arg-type]
end = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].max(), period_type)) # type: ignore[arg-type]

xmin, xmax = ds[lon_dim].min().item(), ds[lon_dim].max().item()
ymin, ymax = ds[lat_dim].min().item(), ds[lat_dim].max().item()
Expand All @@ -150,6 +151,13 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any
}


def _period_string_scalar(value: Any) -> str:
"""Normalize a numpy scalar or 0-d array period string to plain Python str."""
if isinstance(value, np.ndarray):
return str(value.item())
return str(value)


def xarray_to_temporary_netcdf(ds: xr.Dataset) -> str:
"""Write a dataset to a temporary NetCDF file and return the path."""
fd = tempfile.NamedTemporaryFile(suffix=".nc", delete=False)
Expand Down
74 changes: 67 additions & 7 deletions climate_api/data_registry/services/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
# When set, only this directory is loaded (no built-ins, no config override).
CONFIGS_DIR: Path | None = None

SUPPORTED_SYNC_KINDS = {"temporal", "release", "static"}
SUPPORTED_SYNC_KINDS = {"temporal", "release", "static", "derived"}
SUPPORTED_SYNC_EXECUTIONS = {"append", "rematerialize"}
SUPPORTED_RESAMPLE_METHODS = {"mean", "sum", "min", "max"}
SUPPORTED_WEEK_STARTS = {"monday", "sunday"}


def list_datasets() -> list[dict[str, Any]]:
Expand Down Expand Up @@ -133,18 +135,76 @@ def _validate_dataset_template(dataset: object, *, source: str) -> None:
f"'{sync_execution}'. Supported values: {supported}"
)

ingestion = dataset.get("ingestion")
if not isinstance(ingestion, dict):
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define an 'ingestion' block")
function = ingestion.get("function")
if not isinstance(function, str) or not function:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define ingestion.function")
if sync_kind != "derived":
ingestion = dataset.get("ingestion")
if not isinstance(ingestion, dict):
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define an 'ingestion' block")
function = ingestion.get("function")
if not isinstance(function, str) or not function:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define ingestion.function")

processing = dataset.get("processing")
if sync_kind == "derived":
if processing is None:
raise ValueError(
f"Dataset template '{dataset_id}' in {source} must define processing when sync_kind is derived"
)
_validate_processing_config(processing, dataset=dataset, dataset_id=dataset_id, source=source)
elif processing is not None:
raise ValueError(
f"Dataset template '{dataset_id}' in {source} may only define processing when sync_kind is derived"
)

sync_availability = dataset.get("sync_availability")
if sync_availability is not None:
_validate_sync_availability(sync_availability, dataset_id=dataset_id, source=source)


def _validate_processing_config(processing: object, *, dataset: dict[str, Any], dataset_id: str, source: str) -> None:
"""Validate a processing block for a derived dataset."""
if not isinstance(processing, dict):
raise ValueError(f"Dataset template '{dataset_id}' in {source} has invalid processing block")
process_id = processing.get("process_id")
if not isinstance(process_id, str) or not process_id:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define processing.process_id")
if process_id == "resample":
_validate_resample_params(processing, dataset=dataset, dataset_id=dataset_id, source=source)
else:
raise ValueError(
f"Dataset template '{dataset_id}' in {source} has unsupported processing.process_id '{process_id}'"
)


def _validate_resample_params(params: dict[str, Any], *, dataset: dict[str, Any], dataset_id: str, source: str) -> None:
"""Validate the parameters for a resample processing block."""
source_dataset_id = params.get("source_dataset_id")
if not isinstance(source_dataset_id, str) or not source_dataset_id:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define processing.source_dataset_id")
method = params.get("method")
if not isinstance(method, str) or not method:
raise ValueError(f"Dataset template '{dataset_id}' in {source} must define processing.method")
if method not in SUPPORTED_RESAMPLE_METHODS:
supported = ", ".join(sorted(SUPPORTED_RESAMPLE_METHODS))
raise ValueError(
f"Dataset template '{dataset_id}' in {source} has unsupported processing.method '{method}'. "
f"Supported values: {supported}"
)
week_start = params.get("week_start")
period_type = dataset.get("period_type")
if week_start is not None:
if period_type != "weekly":
raise ValueError(
f"Dataset template '{dataset_id}' in {source} may only define processing.week_start "
"when period_type is weekly"
)
if week_start not in SUPPORTED_WEEK_STARTS:
supported = ", ".join(sorted(SUPPORTED_WEEK_STARTS))
raise ValueError(
f"Dataset template '{dataset_id}' in {source} has unsupported processing.week_start "
f"'{week_start}'. Supported values: {supported}"
)


def _validate_sync_availability(sync_availability: object, *, dataset_id: str, source: str) -> None:
"""Validate optional source availability policy metadata."""
if not isinstance(sync_availability, dict):
Expand Down
1 change: 1 addition & 0 deletions climate_api/ingestions/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SyncKind(StrEnum):
TEMPORAL = "temporal"
RELEASE = "release"
STATIC = "static"
DERIVED = "derived"


class SyncAction(StrEnum):
Expand Down
90 changes: 90 additions & 0 deletions climate_api/ingestions/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,56 @@ def mutate(records: list[ArtifactRecord]) -> ArtifactRecord:
return _mutate_records(mutate)


def store_materialized_zarr_artifact(
*,
dataset: dict[str, object],
start: str,
end: str | None,
extent_id: str | None,
bbox: list[float] | None,
zarr_path: Path,
overwrite: bool,
publish: bool,
) -> ArtifactRecord:
"""Store metadata for a locally materialized Zarr artifact."""
period_type = str(dataset["period_type"])
normalized_start = _normalize_request_period(start, period_type=period_type, field_name="start")
normalized_end = _normalize_optional_request_period(end, period_type=period_type, field_name="end")
request_scope = ArtifactRequestScope(
start=normalized_start,
end=normalized_end,
extent_id=extent_id,
bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None else None,
)
coverage_data = get_data_coverage_for_paths(dataset, zarr_path=str(zarr_path.resolve()))
if not coverage_data.get("has_data", True):
raise HTTPException(status_code=409, detail="Materialized artifact contains no data for the requested scope")
coverage = ArtifactCoverage(
temporal=CoverageTemporal(**coverage_data["coverage"]["temporal"]),
spatial=CoverageSpatial(**coverage_data["coverage"]["spatial"]),
)
request_scope = request_scope.model_copy(update={"end": coverage.temporal.end})

record = ArtifactRecord(
artifact_id=str(uuid4()),
dataset_id=str(dataset["id"]),
dataset_name=str(dataset["name"]),
variable=str(dataset["variable"]),
format=ArtifactFormat.ZARR,
path=str(zarr_path.resolve()),
asset_paths=[str(zarr_path.resolve())],
variables=[str(dataset["variable"])],
request_scope=request_scope,
coverage=coverage,
created_at=datetime.now(UTC),
publication=ArtifactPublication(),
)
stored_record = _upsert_artifact_record(record, prefer_zarr=True, publish=publish, overwrite=overwrite)
if publish and stored_record.publication.status != PublicationStatus.PUBLISHED:
return publish_artifact_record(stored_record.artifact_id)
return stored_record


def sync_dataset(
*,
dataset_id: str,
Expand Down Expand Up @@ -464,6 +514,44 @@ def mutate(records: list[ArtifactRecord]) -> ArtifactRecord:
return _mutate_records(mutate)


def _upsert_artifact_record(
record: ArtifactRecord,
*,
prefer_zarr: bool,
publish: bool,
overwrite: bool,
) -> ArtifactRecord:
"""Persist a new or replacement artifact record for the same logical request scope."""
if not overwrite:
return _store_artifact_record(record, prefer_zarr=prefer_zarr, publish=publish)

def mutate(records: list[ArtifactRecord]) -> ArtifactRecord:
existing = _find_existing_artifact_in_records(
records=records,
dataset_id=record.dataset_id,
request_scope=record.request_scope,
prefer_zarr=prefer_zarr,
)
if existing is None:
records.append(record)
return record

replacement = record.model_copy(
update={
"artifact_id": existing.artifact_id,
"publication": existing.publication,
}
)
for index, current in enumerate(records):
if current.artifact_id != existing.artifact_id:
continue
records[index] = replacement
return replacement
raise HTTPException(status_code=404, detail=f"Artifact '{existing.artifact_id}' not found")

return _mutate_records(mutate)


def _mutate_records(mutation: Callable[[list[ArtifactRecord]], ArtifactRecord]) -> ArtifactRecord:
"""Apply a read-modify-write mutation under an exclusive file lock."""
ensure_store()
Expand Down Expand Up @@ -567,6 +655,8 @@ def _default_request_end(period_type: str) -> str:
return datetime_to_period_string(utc_now(), period_type)
if period_type == "daily":
return utc_today().isoformat()
if period_type == "weekly":
return datetime_to_period_string(utc_now(), period_type)
if period_type == "monthly":
today = utc_today()
return f"{today.year:04d}-{today.month:02d}"
Expand Down
22 changes: 21 additions & 1 deletion climate_api/ingestions/sync_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import inspect
import logging
from collections.abc import Callable
from datetime import date, timedelta
from datetime import date, datetime, time, timedelta
from typing import Any

from climate_api.ingestions.schemas import ArtifactRecord, SyncAction, SyncDetail, SyncKind, SyncResponse
Expand All @@ -25,6 +25,7 @@
datetime_to_period_string,
normalize_period_string,
parse_hourly_period_string,
parse_period_string_to_datetime,
utc_now,
utc_today,
)
Expand Down Expand Up @@ -75,6 +76,19 @@ def plan_sync(
target_end=current_end,
target_end_source="current_coverage",
)
if sync_kind == SyncKind.DERIVED:
return SyncDetail(
source_dataset_id=latest_artifact.dataset_id,
extent_id=latest_artifact.request_scope.extent_id,
sync_kind=sync_kind,
action=SyncAction.NOT_SYNCABLE,
reason="derived_sync_not_implemented",
message="This derived dataset does not support sync execution yet.",
current_start=current_start,
current_end=current_end,
target_end=current_end,
target_end_source="current_coverage",
)

period_type = str(source_dataset["period_type"])
normalized_requested_end = requested_end.strip() if isinstance(requested_end, str) else None
Expand Down Expand Up @@ -304,6 +318,10 @@ def _next_period_start(latest_period_end: str, *, period_type: str) -> str:
if period_type == "daily":
current = date.fromisoformat(latest_period_end)
return (current + timedelta(days=1)).isoformat()
if period_type == "weekly":
current = parse_period_string_to_datetime(latest_period_end).date()
next_week = datetime.combine(current + timedelta(days=7), time(0))
return datetime_to_period_string(next_week, period_type)
if period_type == "monthly":
current = date.fromisoformat(f"{latest_period_end}-01")
month = current.month + 1
Expand All @@ -322,6 +340,8 @@ def _default_target_end(*, period_type: str) -> str:
return datetime_to_period_string(utc_now(), period_type)
if period_type == "daily":
return today.isoformat()
if period_type == "weekly":
return datetime_to_period_string(utc_now(), period_type)
if period_type == "monthly":
return f"{today.year:04d}-{today.month:02d}"
if period_type == "yearly":
Expand Down
Loading
Loading