Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/
tests_data/

# mypy
.mypy_cache/
Expand Down Expand Up @@ -99,4 +98,5 @@ venv.bak/
# Local notebooks
notebooks/
test_cases/
test_data/
TODO.md
269 changes: 227 additions & 42 deletions bluemath_tk/wrappers/_base_wrappers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
import copy
import itertools
from abc import abstractmethod
from typing import List
import subprocess
import numpy as np
from jinja2 import Environment, FileSystemLoader
from ..core.models import BlueMathModel
from ._utils_wrappers import write_array_in_file, copy_files
from concurrent.futures import ThreadPoolExecutor, as_completed


class BaseModelWrapper(BlueMathModel):
Expand All @@ -14,6 +16,10 @@ class BaseModelWrapper(BlueMathModel):

Attributes
----------
available_launchers : List[str]
The available launchers.
available_schedulers : List[str]
The available schedulers.
templates_dir : str
The directory where the templates are stored.
templates_name : List[str]
Expand All @@ -35,6 +41,10 @@ class BaseModelWrapper(BlueMathModel):
Check if the parameters have the correct type.
_exec_bash_commands(str_cmd, out_file=None, err_file=None)
Execute bash commands.
list_available_launchers()
List the available launchers.
list_available_schedulers()
List the available schedulers.
create_cases_context_one_by_one()
Create an array of dictionaries with the combinations of values from the
input dictionary, one by one.
Expand All @@ -47,14 +57,26 @@ class BaseModelWrapper(BlueMathModel):
Write an array in a file.
copy_files(src, dst)
Copy file(s) from source to destination.
build_cases(mode="all_combinations")
build_cases(mode="one_by_one")
Create the cases folders and render the input files.
run_cases()
Run the cases.
run_model(case_dir)
Run the model for a specific case (abstract method).
run_case(case_dir, launcher=None, script=None, params=None)
Run a single case based on the launcher, script, and parameters.
run_cases(launcher=None, script=None, params=None, parallel=False)
Run the cases based on the launcher, script, and parameters.
Parallel execution is optional.
run_cases_with_scheduler(scheduler, script, params=None)
Run the cases based on the scheduler, script, and parameters.
run_model()
Run the model for a specific case.
run_model_with_apptainer()
Run the model for a specific case using Apptainer.
run_model_with_docker()
Run the model for a specific case using Docker.
"""

available_launchers = ["bash", "sh", "./", "apptainer", "docker", "qsub"]
available_schedulers = ["sbatch"]

def __init__(
self,
templates_dir: str,
Expand Down Expand Up @@ -176,6 +198,30 @@ def _exec_bash_commands(
_stderr.flush()
_stderr.close()

def list_available_launchers(self) -> List[str]:
"""
List the available launchers.

Returns
-------
list
A list with the available launchers.
"""

return self.available_launchers

def list_available_schedulers(self) -> List[str]:
"""
List the available schedulers.

Returns
-------
list
A list with the available schedulers.
"""

return self.available_schedulers

def create_cases_context_one_by_one(self) -> List[dict]:
"""
Create an array of dictionaries with the combinations of values from the
Expand Down Expand Up @@ -264,15 +310,7 @@ def write_array_in_file(self, array: np.ndarray, filename: str) -> None:
The name of the file.
"""

with open(filename, "w") as f:
if array.ndim == 1:
for item in array:
f.write(f"{item}\n")
elif array.ndim == 2:
for row in array:
f.write(" ".join(map(str, row)) + "\n")
else:
raise ValueError("Only 1D and 2D arrays are supported")
write_array_in_file(array=array, filename=filename)

def copy_files(self, src: str, dst: str) -> None:
"""
Expand All @@ -286,20 +324,9 @@ def copy_files(self, src: str, dst: str) -> None:
The destination file.
"""

if os.path.isdir(src):
os.makedirs(dst, exist_ok=True)
for file in os.listdir(src):
with open(file, "r") as f:
content = f.read()
with open(os.path.join(dst, file), "w") as f:
f.write(content)
else:
with open(src, "r") as f:
content = f.read()
with open(dst, "w") as f:
f.write(content)
copy_files(src=src, dst=dst)

def build_cases(self, mode: str = "all_combinations") -> None:
def build_cases(self, mode: str = "one_by_one") -> None:
"""
Create the cases folders and render the input files.

Expand Down Expand Up @@ -331,28 +358,186 @@ def build_cases(self, mode: str = "all_combinations") -> None:
f"{len(self.cases_dirs)} cases created in {mode} mode and saved in {self.output_dir}"
)

def run_cases(self) -> None:
def run_case(
self,
case_dir: str,
launcher: str = None,
script: str = None,
params: str = None,
) -> None:
"""
Run the cases.
Run a single case based on the launcher, script, and parameters.

Parameters
----------
case_dir : str
The case directory.
launcher : str, optional
The launcher to run the case. Default is None.
script : str, optional
The script to run the case. Default is None.

Notes
-----
- If launcher is None, the method run_model will be called.
- If launcher is not recognized, the method _exec_bash_commands will be called.
"""

if self.cases_dirs:
for case_dir in self.cases_dirs:
self.logger.info(f"Running case in {case_dir}")
self.run_model(case_dir=case_dir)
self.logger.info("All cases ran successfully.")
self.logger.info(f"Running case in {case_dir}")
if launcher is None:
self.run_model(case_dir=case_dir)
elif launcher == "apptainer":
self.run_model_with_apptainer(case_dir=case_dir)
elif launcher == "docker":
self.run_model_with_docker(case_dir=case_dir)
else:
raise ValueError("No cases to run.")
self._exec_bash_commands(str_cmd=f"{launcher} {params} {script}")

@abstractmethod
def run_model(self, case_dir: str) -> None:
def run_cases(
self,
launcher: str = None,
script: str = None,
params: str = None,
parallel: bool = False,
cases_to_run: List[int] = None,
) -> None:
"""
Run the model.
Run the cases based on the launcher, script, and parameters.
Parallel execution is optional.

Parameters
----------
case_dir : str
The directory of the case.
launcher : str, optional
The launcher to run the cases. Default is None.
script : str, optional
The script to run the cases. Default is None.
params : str, optional
The parameters to run the cases. Default is None.
parallel : bool, optional
If True, the cases will be run in parallel. Default is False.
cases_to_run : List[int], optional
The list with the cases to run. Default is None.

Raises
------
ValueError
If the launcher is not recognized or the script does not exist.
"""

pass
if launcher is not None:
if launcher not in self.available_launchers:
raise ValueError(
f"Invalid launcher: {launcher}, not in {self.available_launchers}."
)
else:
self.logger.warning(
"Launcher is None, so the method run_model will be called."
)

if cases_to_run is not None:
self.logger.warning(
f"Cases to run was specified, so just {cases_to_run} will be run."
)
cases_dir_to_run = [self.cases_dirs[case] for case in cases_to_run]
else:
cases_dir_to_run = copy.deepcopy(self.cases_dirs)

if parallel:
num_threads = self.get_num_processors_available()
self.logger.info(
f"Running cases in parallel with launcher={launcher}. Number of threads: {num_threads}."
)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
future_to_case = {
executor.submit(
self.run_case, case_dir, launcher, script, params
): case_dir
for case_dir in cases_dir_to_run
}
for future in as_completed(future_to_case):
case_dir = future_to_case[future]
try:
future.result()
except Exception as exc:
self.logger.error(
f"Case {case_dir} generated an exception: {exc}."
)
else:
self.logger.info(f"Running cases sequentially with launcher={launcher}.")
for case_dir in cases_dir_to_run:
try:
self.run_case(
case_dir=case_dir,
launcher=launcher,
script=script,
params=params,
)
except Exception as exc:
self.logger.error(f"Case {case_dir} generated an exception: {exc}.")

if launcher == "docker":
# Remove stopped containers after running all cases
remove_stopped_containers_cmd = 'docker ps -a --filter "ancestor=tausiaj/swash-image:latest" -q | xargs docker rm'
self._exec_bash_commands(str_cmd=remove_stopped_containers_cmd)
self.logger.info("All cases ran successfully.")

def run_cases_with_scheduler(
self,
scheduler: str,
script: str,
params: str = None,
) -> None:
"""
Run the cases based on the scheduler, script, and parameters.

Parameters
----------
scheduler : str
The scheduler to run the cases.
script : str
The script to run the cases.
params : str, optional
The parameters to run the cases. Default is None.

Raises
------
ValueError
If the scheduler is not recognized or the script does not exist.
"""

if scheduler not in self.available_schedulers:
raise ValueError(
f"Invalid scheduler: {scheduler}, not in {self.available_schedulers}."
)
if not os.path.exists(script):
raise ValueError(f"Script {script} does not exist.")
self.logger.info(f"Running cases with scheduler={scheduler}.")
self._exec_bash_commands(str_cmd=f"{scheduler} {params} {script}")

@staticmethod
def run_model() -> None:
"""
Run the model.
"""

raise NotImplementedError("The method run_model must be implemented.")

@staticmethod
def run_model_with_apptainer() -> None:
"""
Run the model for the specified case using Apptainer.
"""

raise NotImplementedError(
"The method run_model_with_apptainer must be implemented."
)

@staticmethod
def run_model_with_docker() -> None:
"""
Run the model for the specified case using Docker.
"""

raise NotImplementedError(
"The method run_model_with_docker must be implemented."
)
Loading
Loading