Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ __pycache__/
.env
eo_api.egg-info/
data/downloads
data/derived
data/artifacts
data/pygeoapi
docs/internal/
29 changes: 29 additions & 0 deletions data/datasets/chirps3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,32 @@
resolution: 5 km x 5 km
source: CHIRPS v3
source_url: https://www.chc.ucsb.edu/data/chirps3

- id: chirps3_precipitation_weekly
name: Total precipitation weekly (CHIRPS3)
short_name: Total precipitation weekly
variable: precip
period_type: weekly
sync_kind: derived
resample:
source_dataset_id: chirps3_precipitation_daily
method: sum
week_start: monday
units: mm
resolution: 5 km x 5 km
source: CHIRPS v3
source_url: https://www.chc.ucsb.edu/data/chirps3

- id: chirps3_precipitation_monthly
name: Total precipitation monthly (CHIRPS3)
short_name: Total precipitation monthly
variable: precip
period_type: monthly
sync_kind: derived
resample:
source_dataset_id: chirps3_precipitation_daily
method: sum
units: mm
resolution: 5 km x 5 km
source: CHIRPS v3
source_url: https://www.chc.ucsb.edu/data/chirps3
28 changes: 28 additions & 0 deletions data/datasets/era5_land.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@
source: ERA5-Land Reanalysis
source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land

- id: era5land_temperature_daily
name: Daily mean 2m temperature (ERA5-Land)
short_name: Daily mean 2m temperature
variable: t2m
period_type: daily
sync_kind: derived
resample:
source_dataset_id: era5land_temperature_hourly
method: mean
units: degC
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land

- id: era5land_precipitation_hourly
name: Total precipitation (ERA5-Land)
short_name: Total precipitation
Expand All @@ -38,3 +52,17 @@
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land

- id: era5land_precipitation_daily
name: Daily total precipitation (ERA5-Land)
short_name: Daily total precipitation
variable: tp
period_type: daily
sync_kind: derived
resample:
source_dataset_id: era5land_precipitation_hourly
method: sum
units: mm
resolution: 9 km x 9 km
source: ERA5-Land Reanalysis
source_url: https://cds.climate.copernicus.eu/datasets/reanalysis-era5-land
12 changes: 10 additions & 2 deletions src/climate_api/data_accessor/services/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import tempfile
from typing import Any

import numpy as np
import xarray as xr

from ...data_manager.services.downloader import get_cache_files, get_zarr_path
Expand Down Expand Up @@ -135,8 +136,8 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any
time_dim = get_time_dim(ds)
lon_dim, lat_dim = get_lon_lat_dims(ds)

start = numpy_datetime_to_period_string(ds[time_dim].min(), period_type) # type: ignore[arg-type]
end = numpy_datetime_to_period_string(ds[time_dim].max(), period_type) # type: ignore[arg-type]
start = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].min(), period_type)) # type: ignore[arg-type]
end = _period_string_scalar(numpy_datetime_to_period_string(ds[time_dim].max(), period_type)) # type: ignore[arg-type]

xmin, xmax = ds[lon_dim].min().item(), ds[lon_dim].max().item()
ymin, ymax = ds[lat_dim].min().item(), ds[lat_dim].max().item()
Expand All @@ -150,6 +151,13 @@ def _coverage_from_dataset(*, ds: xr.Dataset, period_type: str) -> dict[str, Any
}


def _period_string_scalar(value: Any) -> str:
"""Normalize a numpy scalar or 0-d array period string to plain Python str."""
if isinstance(value, np.ndarray):
return str(value.item())
return str(value)


def xarray_to_temporary_netcdf(ds: xr.Dataset) -> str:
"""Write a dataset to a temporary NetCDF file and return the path."""
fd = tempfile.NamedTemporaryFile(suffix=".nc", delete=False)
Expand Down
61 changes: 60 additions & 1 deletion src/climate_api/data_registry/services/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

SCRIPT_DIR = Path(__file__).parent.resolve()
CONFIGS_DIR = SCRIPT_DIR.parent.parent.parent.parent / "data" / "datasets"
SUPPORTED_SYNC_KINDS = {"temporal", "release", "static"}
SUPPORTED_SYNC_KINDS = {"temporal", "release", "static", "derived"}
SUPPORTED_SYNC_EXECUTIONS = {"append", "rematerialize"}
SUPPORTED_RESAMPLE_METHODS = {"mean", "sum", "min", "max", "first", "last"}
SUPPORTED_WEEK_STARTS = {"monday", "sunday"}


def list_datasets() -> list[dict[str, Any]]:
Expand Down Expand Up @@ -71,6 +73,20 @@ def _validate_dataset_template(dataset: object, *, file_path: Path) -> None:
f"'{sync_execution}'. Supported values: {supported}"
)

resample = dataset.get("resample")
if sync_kind == "derived":
if resample is None:
raise ValueError(
f"Dataset template '{dataset_id}' in {file_path.name} must define resample when sync_kind is derived"
)
elif resample is not None:
raise ValueError(
f"Dataset template '{dataset_id}' in {file_path.name} may only define resample when sync_kind is derived"
)

if resample is not None:
_validate_resample(resample, dataset=dataset, dataset_id=dataset_id, file_path=file_path)

sync_availability = dataset.get("sync_availability")
if sync_availability is not None:
_validate_sync_availability(sync_availability, dataset_id=dataset_id, file_path=file_path)
Expand All @@ -89,3 +105,46 @@ def _validate_sync_availability(sync_availability: object, *, dataset_id: str, f
f"Dataset template '{dataset_id}' in {file_path.name} has invalid "
"sync_availability.latest_available_function"
)


def _validate_resample(
resample: object,
*,
dataset: dict[str, Any],
dataset_id: str,
file_path: Path,
) -> None:
"""Validate optional derived resampling metadata."""
if not isinstance(resample, dict):
raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample")

source_dataset_id = resample.get("source_dataset_id")
if not isinstance(source_dataset_id, str) or not source_dataset_id:
raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample.source_dataset_id")

method = resample.get("method")
if not isinstance(method, str) or not method:
raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample.method")
if method not in SUPPORTED_RESAMPLE_METHODS:
supported = ", ".join(sorted(SUPPORTED_RESAMPLE_METHODS))
raise ValueError(
f"Dataset template '{dataset_id}' in {file_path.name} has unsupported resample.method "
f"'{method}'. Supported values: {supported}"
)

week_start = resample.get("week_start")
period_type = dataset.get("period_type")
if week_start is not None:
if period_type != "weekly":
raise ValueError(
f"Dataset template '{dataset_id}' in {file_path.name} may only define "
"resample.week_start when period_type is weekly"
)
if not isinstance(week_start, str) or not week_start:
raise ValueError(f"Dataset template '{dataset_id}' in {file_path.name} has invalid resample.week_start")
if week_start not in SUPPORTED_WEEK_STARTS:
supported = ", ".join(sorted(SUPPORTED_WEEK_STARTS))
raise ValueError(
f"Dataset template '{dataset_id}' in {file_path.name} has unsupported resample.week_start "
f"'{week_start}'. Supported values: {supported}"
)
1 change: 1 addition & 0 deletions src/climate_api/ingestions/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class SyncKind(StrEnum):
TEMPORAL = "temporal"
RELEASE = "release"
STATIC = "static"
DERIVED = "derived"


class SyncAction(StrEnum):
Expand Down
90 changes: 90 additions & 0 deletions src/climate_api/ingestions/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,56 @@ def mutate(records: list[ArtifactRecord]) -> ArtifactRecord:
return _mutate_records(mutate)


def store_materialized_zarr_artifact(
*,
dataset: dict[str, object],
start: str,
end: str | None,
extent_id: str | None,
bbox: list[float] | None,
zarr_path: Path,
overwrite: bool,
publish: bool,
) -> ArtifactRecord:
"""Store metadata for a locally materialized Zarr artifact."""
period_type = str(dataset["period_type"])
start = _normalize_request_period(start, period_type=period_type, field_name="start")
end = _normalize_optional_request_period(end, period_type=period_type, field_name="end")
request_scope = ArtifactRequestScope(
start=start,
end=end,
extent_id=extent_id,
bbox=(bbox[0], bbox[1], bbox[2], bbox[3]) if bbox is not None else None,
)
coverage_data = get_data_coverage_for_paths(dataset, zarr_path=str(zarr_path.resolve()))
if not coverage_data.get("has_data", True):
raise HTTPException(status_code=409, detail="Materialized artifact contains no data for the requested scope")
coverage = ArtifactCoverage(
temporal=CoverageTemporal(**coverage_data["coverage"]["temporal"]),
spatial=CoverageSpatial(**coverage_data["coverage"]["spatial"]),
)
request_scope = request_scope.model_copy(update={"end": coverage.temporal.end})

record = ArtifactRecord(
artifact_id=str(uuid4()),
dataset_id=str(dataset["id"]),
dataset_name=str(dataset["name"]),
variable=str(dataset["variable"]),
format=ArtifactFormat.ZARR,
path=str(zarr_path.resolve()),
asset_paths=[str(zarr_path.resolve())],
variables=[str(dataset["variable"])],
request_scope=request_scope,
coverage=coverage,
created_at=datetime.now(UTC),
publication=ArtifactPublication(),
)
stored_record = _upsert_artifact_record(record, prefer_zarr=True, publish=publish, overwrite=overwrite)
if publish and stored_record.publication.status != PublicationStatus.PUBLISHED:
return publish_artifact_record(stored_record.artifact_id)
return stored_record


def sync_dataset(
*,
dataset_id: str,
Expand Down Expand Up @@ -455,6 +505,44 @@ def mutate(records: list[ArtifactRecord]) -> ArtifactRecord:
return _mutate_records(mutate)


def _upsert_artifact_record(
record: ArtifactRecord,
*,
prefer_zarr: bool,
publish: bool,
overwrite: bool,
) -> ArtifactRecord:
"""Persist a new or replacement artifact record for the same logical request scope."""
if not overwrite:
return _store_artifact_record(record, prefer_zarr=prefer_zarr, publish=publish)

def mutate(records: list[ArtifactRecord]) -> ArtifactRecord:
existing = _find_existing_artifact_in_records(
records=records,
dataset_id=record.dataset_id,
request_scope=record.request_scope,
prefer_zarr=prefer_zarr,
)
if existing is None:
records.append(record)
return record

replacement = record.model_copy(
update={
"artifact_id": existing.artifact_id,
"publication": existing.publication,
}
)
for index, current in enumerate(records):
if current.artifact_id != existing.artifact_id:
continue
records[index] = replacement
return replacement
raise HTTPException(status_code=404, detail=f"Artifact '{existing.artifact_id}' not found")

return _mutate_records(mutate)


def _mutate_records(mutation: Callable[[list[ArtifactRecord]], ArtifactRecord]) -> ArtifactRecord:
"""Apply a read-modify-write mutation under an exclusive file lock."""
ensure_store()
Expand Down Expand Up @@ -558,6 +646,8 @@ def _default_request_end(period_type: str) -> str:
return datetime_to_period_string(utc_now(), period_type)
if period_type == "daily":
return utc_today().isoformat()
if period_type == "weekly":
return datetime_to_period_string(utc_now(), period_type)
if period_type == "monthly":
today = utc_today()
return f"{today.year:04d}-{today.month:02d}"
Expand Down
22 changes: 21 additions & 1 deletion src/climate_api/ingestions/sync_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import inspect
import logging
from collections.abc import Callable
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from typing import Any

from climate_api.ingestions.schemas import ArtifactRecord, SyncAction, SyncDetail, SyncKind, SyncResponse
Expand All @@ -25,6 +25,7 @@
datetime_to_period_string,
normalize_period_string,
parse_hourly_period_string,
parse_period_string_to_datetime,
utc_now,
utc_today,
)
Expand Down Expand Up @@ -75,6 +76,19 @@ def plan_sync(
target_end=current_end,
target_end_source="current_coverage",
)
if sync_kind == SyncKind.DERIVED:
return SyncDetail(
source_dataset_id=latest_artifact.dataset_id,
extent_id=latest_artifact.request_scope.extent_id,
sync_kind=sync_kind,
action=SyncAction.NOT_SYNCABLE,
reason="derived_sync_not_implemented",
message="This derived dataset does not support sync execution yet.",
current_start=current_start,
current_end=current_end,
target_end=current_end,
target_end_source="current_coverage",
)

period_type = str(source_dataset["period_type"])
normalized_requested_end = requested_end.strip() if isinstance(requested_end, str) else None
Expand Down Expand Up @@ -304,6 +318,10 @@ def _next_period_start(latest_period_end: str, *, period_type: str) -> str:
if period_type == "daily":
current = date.fromisoformat(latest_period_end)
return (current + timedelta(days=1)).isoformat()
if period_type == "weekly":
current = parse_period_string_to_datetime(latest_period_end).date()
next_week = datetime.combine(current + timedelta(days=7), datetime.min.time())
return datetime_to_period_string(next_week, period_type)
if period_type == "monthly":
current = date.fromisoformat(f"{latest_period_end}-01")
month = current.month + 1
Expand All @@ -322,6 +340,8 @@ def _default_target_end(*, period_type: str) -> str:
return datetime_to_period_string(utc_now(), period_type)
if period_type == "daily":
return today.isoformat()
if period_type == "weekly":
return datetime_to_period_string(utc_now(), period_type)
if period_type == "monthly":
return f"{today.year:04d}-{today.month:02d}"
if period_type == "yearly":
Expand Down
2 changes: 2 additions & 0 deletions src/climate_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from climate_api.data_registry import routes as dataset_template_routes
from climate_api.extents import routes as extent_routes
from climate_api.ingestions import routes as ingestion_routes
from climate_api.processing import routes as processing_routes
from climate_api.pygeoapi_app import mount_pygeoapi
from climate_api.stac import routes as stac_routes
from climate_api.system import routes as system_routes
Expand Down Expand Up @@ -79,6 +80,7 @@ async def add_zarr_browser_access_headers(
app.include_router(stac_routes.router, prefix="/stac", tags=["STAC"])
app.include_router(extent_routes.router, prefix="/extents", tags=["Extents"])
app.include_router(dataset_template_routes.router, prefix="/dataset-templates", tags=["Dataset templates"])
app.include_router(processing_routes.router, prefix="/processes", tags=["Processes"])
app.include_router(ingestion_routes.datasets_router, prefix="/datasets", tags=["Datasets"])
app.include_router(ingestion_routes.ingestions_router, prefix="/ingestions", tags=["Ingestions"])
app.include_router(ingestion_routes.zarr_router, prefix="/zarr", tags=["Zarr"])
Expand Down
1 change: 1 addition & 0 deletions src/climate_api/processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Derived data processing services."""
Loading
Loading