Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dependencies = [
"starlette",
"typing_extensions",
"shapely",

]

[project.urls]
Expand Down
101 changes: 88 additions & 13 deletions stac_fastapi/eodag/extensions/data_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from eodag.api.product.metadata_mapping import ONLINE_STATUS, STAGING_STATUS, get_metadata_path_value
from eodag.utils.exceptions import EodagError
from fastapi import APIRouter, FastAPI, Path, Request
from fastapi.responses import RedirectResponse, StreamingResponse
from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse
from stac_fastapi.api.errors import NotFoundError
from stac_fastapi.api.routes import create_async_endpoint
from stac_fastapi.types.extension import ApiExtension
Expand All @@ -51,6 +51,25 @@
class BaseDataDownloadClient:
"""Defines a pattern for implementing the data download extension."""

def _try_presign_asset(
self,
product: EOProduct,
asset_name: Optional[str],
auth: Optional[dict],
) -> Optional[RedirectResponse]:
"""Return a presigned URL redirect when available."""
if product.downloader_auth and asset_name and asset_name not in ["downloadLink", "zarr"]:
asset_values = product.assets[asset_name]
# return presigned url if available
try:
presigned_url = product.downloader_auth.presign_url(asset_values)
return RedirectResponse(presigned_url, status_code=302)
except NotImplementedError:
logger.info("Presigned urls not supported for %s with auth %s", product.downloader, auth)
except EodagError:
logger.info("Presigned url could not be fetched for %s", asset_name)
return None

def _file_to_stream(
self,
file_path: str,
Expand Down Expand Up @@ -94,14 +113,27 @@ def _read_file_chunks_and_delete(self, opened_file: BufferedReader, chunk_size:
yield data
yield data

def get_data_with_file(
self,
federation_backend: str,
collection_id: str,
item_id: str,
asset_name: Optional[str],
request: Request,
file_path: str,
) -> Union[StreamingResponse, RedirectResponse, JSONResponse]:
"""Download data with file path (wrapper for get_data)."""
return self.get_data(federation_backend, collection_id, item_id, asset_name, request, file_path)

def get_data(
self,
federation_backend: str,
collection_id: str,
item_id: str,
asset_name: Optional[str],
request: Request,
) -> Union[StreamingResponse, RedirectResponse]:
file_path: Optional[str] = None,
) -> Union[StreamingResponse, RedirectResponse, JSONResponse]:
"""Download an asset"""

dag = cast(EODataAccessGateway, request.app.state.dag) # type: ignore
Expand Down Expand Up @@ -141,7 +173,11 @@ def get_data(
raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e
raise NotFoundError(e) from e

auth = product.downloader_auth.authenticate() if product.downloader_auth else None
try:
auth = product.downloader_auth.authenticate() if product.downloader_auth else None
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise NotAvailableError("Token not ready, authentication failed. Please try again later.") from e

if product.downloader is None:
logger.error("No downloader available for %s", product)
Expand Down Expand Up @@ -186,16 +222,28 @@ def get_data(
raise NotFoundError(f"Item {item_id} does not exist. Please order it first") from e
raise NotFoundError(e) from e

if product.downloader_auth and asset_name and asset_name != "downloadLink":
asset_values = product.assets[asset_name]
# return presigned url if available
try:
presigned_url = product.downloader_auth.presign_url(asset_values)
return RedirectResponse(presigned_url, status_code=302)
except NotImplementedError:
logger.info("Presigned urls not supported for %s with auth %s", product.downloader, auth)
except EodagError:
logger.info("Presigned url could not be fetched for %s", asset_name)
zarr_asset_name = next(
(name for name in product.assets if (name.endswith("zarr") and asset_name != "downloadLink")), None
)
if zarr_asset_name:
asset_values = product.assets[zarr_asset_name]
base_url = asset_values["href"]
target_url = f"{base_url.rstrip('/')}/{file_path.lstrip('/')}"

r = product.request_asset(url=target_url)

return StreamingResponse(
r.iter_content(chunk_size=1024 * 1024),
status_code=r.status_code,
media_type=r.headers.get("Content-Type", "application/octet-stream"),
headers={
k: v for k, v in r.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding"]
},
)

presigned_response = self._try_presign_asset(product, asset_name, auth)
if presigned_response:
return presigned_response

try:
s = product.downloader.stream_download(
Expand Down Expand Up @@ -226,6 +274,17 @@ class DataDownloadUri(APIRequest):
asset_name: Annotated[str, Path(description="Item ID")] = attr.ib()


@attr.s
class DataDownloadUriWithFile(APIRequest):
"""Download data with file path."""

federation_backend: Annotated[str, Path(description="Federation backend name")] = attr.ib()
collection_id: Annotated[str, Path(description="Collection ID")] = attr.ib()
item_id: Annotated[str, Path(description="Item ID")] = attr.ib()
asset_name: Annotated[str, Path(description="Asset name")] = attr.ib()
file_path: Annotated[str, Path(description="File path within zarr store")] = attr.ib()


@attr.s
class DataDownload(ApiExtension):
"""Data-download Extension.
Expand All @@ -250,6 +309,7 @@ def register(self, app: FastAPI) -> None:
:returns: None
"""
self.router.prefix = app.state.router_prefix
# Route for /data/{backend}/{collection}/{item}/{asset_name}
self.router.add_api_route(
name="Download data",
path="/data/{federation_backend}/{collection_id}/{item_id}/{asset_name}",
Expand All @@ -263,4 +323,19 @@ def register(self, app: FastAPI) -> None:
},
endpoint=create_async_endpoint(self.client.get_data, DataDownloadUri),
)

# Route for /data/{backend}/{collection}/{item}/{asset_name}/{file_path}
self.router.add_api_route(
name="Download data with file path",
path="/data/{federation_backend}/{collection_id}/{item_id}/{asset_name}/{file_path:path}",
methods=["GET"],
responses={
200: {
"content": {
"application/octet-stream": {},
},
}
},
endpoint=create_async_endpoint(self.client.get_data_with_file, DataDownloadUriWithFile),
)
app.include_router(self.router, tags=["Data download"])
4 changes: 3 additions & 1 deletion stac_fastapi/eodag/models/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ def create_stac_item(
feature["assets"][k]["alternate"] = {"origin": origin}

# TODO: remove downloadLink asset after EODAG assets rework
if download_link := product.properties.get("eodag:download_link"):
if (download_link := product.properties.get("eodag:download_link")) and not any(
"zarr" in key for key in product.assets
):
origin_href = download_link
if asset_proxy_url:
download_link = asset_proxy_url + "/downloadLink"
Expand Down
25 changes: 25 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from stac_fastapi.eodag.app import api, stac_metadata_model
from stac_fastapi.eodag.config import get_settings
from stac_fastapi.eodag.dag import init_dag
from stac_fastapi.eodag.extensions.data_download import BaseDataDownloadClient
from tests import TEST_RESOURCES_PATH


Expand Down Expand Up @@ -526,6 +527,30 @@ def mock_http_base_stream_download(mocker):
return mocker.patch.object(HTTPDownload, "stream_download")


@pytest.fixture(scope="function")
def mock_base_data_download_get_data(mocker):
"""
Mocks the `get_data` method of the data download client.
"""
return mocker.patch.object(BaseDataDownloadClient, "get_data")


@pytest.fixture(scope="function")
def mock_data_download_requests_get(mocker):
"""
Mocks the `requests.get` call used by EOProduct.request_asset.
"""
return mocker.patch("eodag.api.product._product.requests.get")


@pytest.fixture(scope="function")
def mock_item_get_settings(mocker):
"""
Mocks the `get_settings` function used by STAC item creation.
"""
return mocker.patch("stac_fastapi.eodag.models.item.get_settings")


@pytest.fixture(scope="function")
def mock_order(mocker):
"""
Expand Down
138 changes: 138 additions & 0 deletions tests/test_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
# Copyright 2025, CS GROUP - France, https://www.cs-soprasteria.com
#
# This file is part of stac-fastapi-eodag project
# https://www.github.com/CS-SI/stac-fastapi-eodag
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Zarr tests."""

from unittest import mock

from eodag import SearchResult
from eodag.api.product import EOProduct
from eodag.config import PluginConfig
from eodag.plugins.download.http import HTTPDownload

from stac_fastapi.eodag.app import stac_metadata_model
from stac_fastapi.eodag.extensions.data_download import BaseDataDownloadClient
from stac_fastapi.eodag.models.item import create_stac_item


def test_get_data_with_file_delegates_to_get_data(mock_base_data_download_get_data):
"""get_data_with_file must delegate to get_data with the provided file path."""
client = BaseDataDownloadClient()
request = mock.Mock()
expected_response = object()
mock_base_data_download_get_data.return_value = expected_response

response = client.get_data_with_file(
"desp_cache",
"collection",
"item",
"example.zarr",
request,
"group/foo.txt",
)

assert response is expected_response
mock_base_data_download_get_data.assert_called_once_with(
"desp_cache",
"collection",
"item",
"example.zarr",
request,
"group/foo.txt",
)


def test_items_response_includes_zarr_asset(defaults, mock_search_result, mock_item_get_settings):
"""create_stac_item should include a Zarr asset when present."""
search_result = mock_search_result
product = search_result[0]
product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}})
request = mock.Mock()
request.app.state.dag.collections_config = {}
mock_item_get_settings.return_value = mock.Mock(
download_base_url="http://testserver/",
auto_order_whitelist=[],
keep_origin_url=False,
origin_url_blacklist=[],
)
response = create_stac_item(
product,
stac_metadata_model,
lambda extension_name: extension_name == "DataDownload",
request,
extension_names=[],
)

item = response
assert "example.zarr" in item["assets"]
assert "download_link" not in item["assets"]
assert (
item["assets"]["example.zarr"]["href"]
== f"http://testserver/data/cop_dataspace/{item['collection']}/{item['id']}/example.zarr"
)


async def test_zarr_file_display(
defaults,
mock_data_download_requests_get,
):
"""get_data_with_file should request streaming for a file inside a .zarr asset."""
collection = defaults.collection
item_id = "dummy_id"
client = BaseDataDownloadClient()
product = EOProduct(
"cop_dataspace",
dict(
geometry="POINT (0 0)",
title="dummy_product",
id=item_id,
),
collection=collection,
)
product.assets.update({"example.zarr": {"href": "https://data/cop_dataspace/example.zarr"}})
config = PluginConfig()
config.priority = 0
downloader = HTTPDownload("cop_dataspace", config)
product.register_downloader(downloader=downloader, authenticator=None)

dag = mock.Mock()
dag.search.return_value = SearchResult([product])
request = mock.Mock()
request.app.state.dag = dag
request.base_url._url = "http://testserver/"
mock_data_download_requests_get.return_value = mock.Mock(
status_code=200,
headers={"Content-Type": "text/plain"},
)
mock_data_download_requests_get.return_value.iter_content.return_value = iter([b"hello"])

response = client.get_data_with_file(
"cop_dataspace",
collection,
item_id,
"example.zarr",
request,
"group/foo.txt",
)

assert response.status_code == 200
assert response.headers["content-type"].startswith("text/plain")
mock_data_download_requests_get.assert_called_once_with(
"https://data/cop_dataspace/example.zarr/group/foo.txt",
headers={},
stream=True,
)
Loading