Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion bluemath_tk/core/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ def wrapper(
if not isinstance(normalize_data, bool):
raise TypeError("Normalize data must be a boolean")
return func(
self, data, directional_variables, custom_scale_factor, min_number_of_points
self,
data,
directional_variables,
custom_scale_factor,
min_number_of_points,
max_number_of_iterations,
normalize_data,
)

return wrapper
Expand Down
10 changes: 5 additions & 5 deletions bluemath_tk/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ def __init__(self) -> None:
bluemath_num_workers = os.environ.get("BLUEMATH_NUM_WORKERS", None)
omp_num_threads = os.environ.get("OMP_NUM_THREADS", None)
if bluemath_num_workers is not None:
self.logger.warning(
self.logger.info(
f"Setting self.num_workers to {bluemath_num_workers} due to BLUEMATH_NUM_WORKERS. \n"
"Change it using self.set_num_processors_to_use method. \n"
"Also setting OMP_NUM_THREADS to 1, to avoid conflicts with BlueMath parallel processing."
)
self.set_num_processors_to_use(num_processors=int(bluemath_num_workers))
self.set_omp_num_threads(num_threads=1)
elif omp_num_threads is not None:
self.logger.warning(
self.logger.info(
f"Changing variable OMP_NUM_THREADS from {omp_num_threads} to 1. \n"
f"And setting self.num_workers to {omp_num_threads}. \n"
"To avoid conflicts with BlueMath parallel processing."
Expand All @@ -66,7 +66,7 @@ def __init__(self) -> None:
self.set_num_processors_to_use(num_processors=int(omp_num_threads))
else:
self.num_workers = 1 # self.get_num_processors_available()
self.logger.warning(
self.logger.info(
f"Setting self.num_workers to {self.num_workers}. "
"Change it using self.set_num_processors_to_use method."
)
Expand Down Expand Up @@ -511,8 +511,8 @@ def set_num_processors_to_use(self, num_processors: int) -> None:
elif num_processors <= 0:
raise ValueError("Number of processors must be greater than 0")
elif (num_processors_available - num_processors) < 2:
self.logger.warning(
"Number of processors requested is less than 2 processors available"
self.logger.info(
"Number of processors requested leaves less than 2 processors available"
)

# Set the number of processors to use
Expand Down
13 changes: 13 additions & 0 deletions bluemath_tk/waves/binwaves.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from typing import List, Tuple

Expand Down Expand Up @@ -123,6 +124,7 @@ def reconstruc_spectra(
num_workers: int = None,
memory_limit: float = 0.5,
chunk_sizes: dict = {"time": 24},
verbose: bool = False,
):
"""
Reconstruct the onshore spectra using offshore spectra and kp coefficients.
Expand All @@ -146,6 +148,17 @@ def reconstruc_spectra(
The reconstructed onshore spectra dataset.
"""

if not verbose:
# Suppress Dask logs
logging.getLogger("distributed").setLevel(logging.ERROR)
logging.getLogger("distributed.client").setLevel(logging.ERROR)
logging.getLogger("distributed.scheduler").setLevel(logging.ERROR)
logging.getLogger("distributed.worker").setLevel(logging.ERROR)
logging.getLogger("distributed.nanny").setLevel(logging.ERROR)
# Also suppress bokeh and tornado logs that Dask uses
logging.getLogger("bokeh").setLevel(logging.ERROR)
logging.getLogger("tornado").setLevel(logging.ERROR)

# Setup Dask client
if num_workers is None:
num_workers = os.environ.get("BLUEMATH_NUM_WORKERS", 4)
Expand Down
1 change: 1 addition & 0 deletions bluemath_tk/wrappers/_base_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def set_cases_dirs_from_output_dir(self) -> None:
[
op.join(self.output_dir, case_dir)
for case_dir in os.listdir(self.output_dir)
if op.isdir(op.join(self.output_dir, case_dir))
]
)

Expand Down
2 changes: 1 addition & 1 deletion bluemath_tk/wrappers/swash/swash_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def build_case(
# Usage example
if __name__ == "__main__":
# Define the output directory
output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath/test_cases/CHY" # CHANGE THIS TO YOUR DESIRED OUTPUT DIRECTORY!
output_dir = "/home/tausiaj/GitHub-GeoOcean/BlueMath_tk/test_cases/CHY" # CHANGE THIS TO YOUR DESIRED OUTPUT DIRECTORY!
# Templates directory
swash_file_path = op.dirname(inspect.getfile(SwashModelWrapper))
templates_dir = op.join(swash_file_path, "templates")
Expand Down
77 changes: 65 additions & 12 deletions bluemath_tk/wrappers/swash/swash_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import re
from typing import List, Tuple
from typing import List, Tuple, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -341,32 +341,78 @@ def get_case_percentage_from_file(self, output_log_file: str) -> str:

return "0 %" # if no progress is found

def monitor_cases(self) -> pd.DataFrame:
def monitor_cases(self, value_counts: str = None) -> Union[pd.DataFrame, dict]:
"""
Monitor the cases and log relevant information.

Parameters
----------
value_counts : str, optional
The value counts to be returned.
If "simple", it returns a dictionary with the number of cases in each status.
If "cases", it returns a dictionary with the cases in each status.
Default is None.

Returns
-------
pd.DataFrame
The cases percentage.
Union[pd.DataFrame, dict]
The cases status as a pandas DataFrame or a dictionary with aggregated info.
"""

cases_percentage = {}

for case_dir in self.cases_dirs:
output_log_file = os.path.join(case_dir, "wrapper_out.log")
progress = self.get_case_percentage_from_file(
output_log_file=output_log_file
)
cases_percentage[os.path.basename(case_dir)] = progress
case_dir_name = os.path.basename(case_dir)
if os.path.exists(os.path.join(case_dir, "Errfile")):
cases_percentage[case_dir_name] = "Errfile"
elif os.path.exists(os.path.join(case_dir, "norm_end")):
cases_percentage[case_dir_name] = "END"
else:
run_tab_file = os.path.join(case_dir, "run.tab")
if os.path.exists(run_tab_file):
run_tab = self._read_tabfile(file_path=run_tab_file)
if run_tab.isnull().values.any():
cases_percentage[case_dir_name] = "NaN"
continue
else:
cases_percentage[case_dir_name] = "No run.tab"
continue
output_log_file = os.path.join(case_dir, "wrapper_out.log")
progress = self.get_case_percentage_from_file(
output_log_file=output_log_file
)
cases_percentage[case_dir_name] = progress

return pd.DataFrame(cases_percentage.items(), columns=["Case", "Percentage"])
full_monitorization_df = pd.DataFrame(
cases_percentage.items(), columns=["Case", "Percentage"]
)
if value_counts:
value_counts_df = full_monitorization_df.set_index("Case").value_counts()
if value_counts == "simple":
return value_counts_df
value_counts_unique_values = [
run_type[0] for run_type in value_counts_df.index.values
]
value_counts_dict = {
run_type: list(
full_monitorization_df.where(
full_monitorization_df["Percentage"] == run_type
)
.dropna()["Case"]
.values
)
for run_type in value_counts_unique_values
}
return value_counts_dict
else:
return full_monitorization_df

def postprocess_case(
self,
case_num: int,
case_dir: str,
output_vars: List[str] = None,
force: bool = False,
remove_tab: bool = False,
remove_nc: bool = False,
) -> xr.Dataset:
Expand All @@ -379,6 +425,8 @@ def postprocess_case(
The case number.
case_dir : str
The case directory.
force : bool, optional
Force the postprocessing, re-creating the output.nc file. Default is False.
output_vars : list, optional
The output variables to postprocess. Default is None.
remove_tab : bool, optional
Expand All @@ -403,13 +451,15 @@ def postprocess_case(
output_vars = list(self.postprocess_functions.keys())

output_nc_path = os.path.join(case_dir, "output.nc")
if not os.path.exists(output_nc_path):
if not os.path.exists(output_nc_path) or force:
# Convert tab files to netCDF file
output_path = os.path.join(case_dir, "output.tab")
run_path = os.path.join(case_dir, "run.tab")
output_nc = self._convert_case_output_files_to_nc(
case_num=case_num, output_path=output_path, run_path=run_path
)
if os.path.exists(output_nc_path):
os.remove(output_nc_path)
output_nc.to_netcdf(output_nc_path)
else:
self.logger.info("Reading existing output.nc file.")
Expand All @@ -432,7 +482,10 @@ def postprocess_case(
ds = xr.merge(var_ds_list, compat="no_conflicts")

# Save Dataset to netCDF file
ds.to_netcdf(os.path.join(case_dir, "output_postprocessed.nc"))
processed_nc_path = os.path.join(case_dir, "output_postprocessed.nc")
if os.path.exists(processed_nc_path):
os.remove(processed_nc_path)
ds.to_netcdf(processed_nc_path)

# Remove raw files to save space
if remove_tab:
Expand Down