Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions bluemath_tk/downloaders/ecmwf/OpenData/OpenData_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"datasets": {
"forecast_data": {
"description": "ECMWF OpenData Forecast Data",
"url": "https://confluence.ecmwf.int/display/DAC/ECMWF+open+data%3A+real-time+forecasts+from+IFS+and+AIFS",
"models": [
"ifs",
"aifs"
],
"resolutions": [
"0p25"
]
}
}
}
Empty file.
246 changes: 246 additions & 0 deletions bluemath_tk/downloaders/ecmwf/ecmwf_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
import json
import os
from typing import List, Union

import xarray as xr
from ecmwf.opendata import Client

from .._base_downloaders import BaseDownloader


class ECMWFDownloader(BaseDownloader):
"""
This is the main class to download data from the ECMWF.

Attributes
----------
product : str
The product to download data from. Currently only OpenData is supported.
product_config : dict
The configuration for the product to download data from.
client : ecmwf.opendata.Client
The client to interact with the ECMWF API.

Examples
--------
.. jupyter-execute::

from bluemath_tk.downloaders.ecmwf.ecmwf_downloader import ECMWFDownloader

ecmwf_downloader = ECMWFDownloader(
product="OpenData",
base_path_to_download="/path/to/ECMWF/", # Will be created if not available
check=True,
)
dataset = ecmwf_downloader.download_data(
load_data=True,
param=["msl"],
step=[0, 240],
type="fc",
)
print(dataset)
"""

products_configs = {
"OpenData": json.load(
open(
os.path.join(
os.path.dirname(__file__), "OpenData", "OpenData_config.json"
)
)
)
}

def __init__(
self,
product: str,
base_path_to_download: str,
model: str = "ifs",
resolution: str = "0p25",
debug: bool = True,
check: bool = True,
) -> None:
"""
This is the constructor for the ECMWFDownloader class.

Parameters
----------
product : str
The product to download data from. Currently only OpenData is supported.
base_path_to_download : str
The base path to download the data to.
model : str, optional
The model to download data from. Default is "ifs".
resolution : str, optional
The resolution to download data from. Default is "0p25".
debug : bool, optional
Whether to run in debug mode. Default is True.
check : bool, optional
Whether to just check the data. Default is True.

Raises
------
ValueError
If the product configuration is not found.
"""

super().__init__(
base_path_to_download=base_path_to_download, debug=debug, check=check
)
self._product = product
self._product_config = self.products_configs.get(product)
if self._product_config is None:
raise ValueError(f"{product} configuration not found")
self.set_logger_name(
f"ECMWFDownloader-{product}", level="DEBUG" if debug else "INFO"
)
if not self.check:
if model not in self.product_config["datasets"]["forecast_data"]["models"]:
raise ValueError(f"Model {model} not supported for {self.product}")
if (
resolution
not in self.product_config["datasets"]["forecast_data"]["resolutions"]
):
raise ValueError(
f"Resolution {resolution} not supported for {self.product}"
)
self._client = Client(
source="ecmwf",
model=model,
resol=resolution,
preserve_request_order=False,
infer_stream_keyword=True,
)
self.logger.info("---- DOWNLOADING DATA ----")
else:
self.logger.info("---- CHECKING DATA ----")

# Set the model and resolution parameters
self.model = model
self.resolution = resolution

@property
def product(self) -> str:
return self._product

@property
def product_config(self) -> dict:
return self._product_config

@property
def client(self) -> Client:
return self._client

def list_datasets(self) -> List[str]:
"""
Lists the datasets available for the product.

Returns
-------
List[str]
The list of datasets available for the product.
"""

return list(self.product_config["datasets"].keys())

def download_data(
self, load_data: bool = False, *args, **kwargs
) -> Union[str, xr.Dataset]:
"""
Downloads the data for the product.

Parameters
----------
load_data : bool, optional
Whether to load the data into an xarray.Dataset. Default is False.
*args
The arguments to pass to the download function.
**kwargs
The keyword arguments to pass to the download function.

Returns
-------
Union[str, xr.Dataset]
The path to the downloaded file if load_data is False, otherwise the xarray.Dataset.

Raises
------
ValueError
If the product is not supported.
"""

if self.product == "OpenData":
downloaded_file_path = self.download_data_open_data(*args, **kwargs)
if load_data:
return xr.open_dataset(downloaded_file_path, engine="cfgrib")
else:
return downloaded_file_path
else:
raise ValueError(f"Download for product {self.product} not supported")

def download_data_open_data(
self,
force: bool = False,
**kwargs,
) -> str:
"""
Downloads the data for the OpenData product.

Parameters
----------
force : bool, optional
Whether to force the download. Default is False.
**kwargs
The keyword arguments to pass to the download function.

Returns
-------
str
The path to the downloaded file.
"""

if "param" in kwargs:
variables = kwargs["param"]
else:
variables = []
if "step" in kwargs:
steps = kwargs["step"]
if not isinstance(steps, list):
steps = [steps]
else:
steps = []
if "type" in kwargs:
type = kwargs["type"]
else:
type = "fc"

output_grib_file = os.path.join(
self.base_path_to_download,
self.product,
self.model,
self.resolution,
f"{'_'.join(variables)}_{'_'.join(str(step) for step in steps)}_{type}.grib2",
)
if not self.check:
os.makedirs(os.path.dirname(output_grib_file), exist_ok=True)

if self.check or not force:
if os.path.exists(output_grib_file):
self.logger.debug(f"{output_grib_file} already downloaded")
else:
if self.check:
self.logger.debug(f"{output_grib_file} not downloaded")
else:
self.logger.debug(f"Downloading: {output_grib_file}")
self.client.retrieve(
target=output_grib_file,
**kwargs,
)
else:
self.logger.debug(f"Downloading: {output_grib_file}")
self.client.retrieve(
target=output_grib_file,
**kwargs,
)

return output_grib_file
37 changes: 37 additions & 0 deletions tests/downloaders/test_ecmwf_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import tempfile
import unittest

import xarray as xr

from bluemath_tk.downloaders.ecmwf.ecmwf_downloader import ECMWFDownloader


class TestECMWFDownloader(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.downloader = ECMWFDownloader(
product="OpenData",
base_path_to_download="OpenDataJavixu", # self.temp_dir,
check=False, # Just check paths to download, do not actually download
)

def test_list_datasets(self):
datasets = self.downloader.list_datasets()
self.assertIsInstance(datasets, list)
self.assertTrue(len(datasets) > 0)
print(f"Available datasets: {datasets}")

def test_download_data(self):
dataset = self.downloader.download_data(
load_data=True,
param=["msl"],
step=[0, 240],
type="fc",
force=False,
)
self.assertIsInstance(dataset, xr.Dataset)
print(dataset)


if __name__ == "__main__":
unittest.main()
Loading