Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config/evaluate/config_zarr2cf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ variables:
wg_unit: Pa
std_unit: Pa
level_type: sfc
tp:
var: tp
long: total_precipitation
std: precipitation_amount
wg_unit: kg/m^2
std_unit: kg/m^2
level_type: sfc
tp_imerg_0:
var: tp_imerg_0
long: imerg_total_precipitation
Expand Down
2 changes: 1 addition & 1 deletion config/evaluate/config_zarr2verif.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ variables:
var: tp
long: Total precipitation amount
wg_unit: {CERRA: kg/m^2,
MEPS: kg/m^2,
MEPS: m,
NORA3: kg/m^2,
ERA5: m,
DEFAULT: kg/m^2}
Expand Down
11 changes: 6 additions & 5 deletions packages/evaluate/src/weathergen/evaluate/export/export_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
) as pool:
samples_written = 0

processed_samples = []

for batch_idx in range(n_batches):
batch_start = batch_idx * batch_size
batch_end = min(batch_start + batch_size, len(samples))
Expand Down Expand Up @@ -377,8 +379,6 @@ def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
desc=f" Batch {batch_idx + 1}/{n_batches}",
)

processed_samples = []

for sample, _fstep, data in pool.imap_unordered(
get_data_worker, batch_tasks, chunksize=1
):
Expand All @@ -403,9 +403,6 @@ def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
del sample_results[sample]
batch_written += 1

# Only save here if need to merge samples, otherwise saved in process_sample
if processed_samples[0] is not None:
parser.save(processed_samples)
pbar.close()

samples_written += batch_written
Expand All @@ -419,4 +416,8 @@ def export_model_outputs(data_type: str, config: OmegaConf, **kwargs) -> None:
# Free any remaining refs before next batch.
del sample_results

# Only save here if need to merge samples, otherwise saved in process_sample
if processed_samples[0] is not None:
parser.save(processed_samples)

_logger.info(f"Export complete. Wrote {samples_written}/{len(samples)} samples.")
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ def get_zarr_dt(
Time difference between source interval start and end in hours.
"""
zarr_dt = (source_interval_end - source_interval_start).astype("timedelta64[h]")

# TODO: pull default value from config
if zarr_dt == np.timedelta64(0, "h"):
zarr_dt = np.timedelta64(6, "h")
return zarr_dt

def get_output_filename(self, variable: str) -> Path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def compute_mslp(obs: xr.DataArray, time: np.datetime64) -> np.typing.NDArray:


def compute_precip(
obs_data: xr.Dataset, zarr_dt: np.timedelta64, frt: np.datetime64
obs_data: xr.Dataset, zarr_dt: np.timedelta64, valid_time: np.datetime64
) -> np.typing.NDArray:
"""
Compute accumulated precipitation over the forecast time step.
Expand All @@ -264,7 +264,7 @@ def compute_precip(
Input data containing precipitation observations.
zarr_dt : np.timedelta64
Time difference between forecast steps in hours.
frt : np.datetime64
valid_time : np.datetime64
Forecast reference time for which to compute accumulated precipitation.
Returns
-------
Expand All @@ -274,13 +274,19 @@ def compute_precip(
obs_dt = obs_dt.astype("timedelta64[h]")

if obs_dt >= zarr_dt:
return obs_data["precipitation_amount_1h"].values
if obs_dt % zarr_dt == np.timedelta64(0):
return obs_data["precipitation_amount_1h"].sel(time=valid_time).values.squeeze()
else:
# return empty data
empty = np.empty(obs_data.location.shape[0])
empty[:] = np.nan
return empty
else:
accumulate = np.zeros(obs_data.location.shape[0])
int_factor = int(zarr_dt / obs_dt)

for i in range(int_factor):
back_time = frt - zarr_dt + (i + 1) * obs_dt
back_time = valid_time - zarr_dt + (i + 1) * obs_dt
accumulate += (
obs_data.data_vars["precipitation_amount_1h"].sel(time=back_time).squeeze()
)
Expand Down
Loading