Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions packages/climate-ref/src/climate_ref/executor/hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import os
import re
import resource
import time
from typing import Annotated, Any, Literal
from collections.abc import Callable
from typing import Annotated, Any, Literal, TypeVar, cast

import parsl
from loguru import logger
Expand All @@ -44,6 +46,8 @@
from .local import ExecutionFuture, process_result
from .pbs_scheduler import SmartPBSProvider

F = TypeVar("F", bound=Callable[..., Any])


class SlurmConfig(BaseModel):
"""Slurm Configurations"""
Expand All @@ -61,7 +65,7 @@ class SlurmConfig(BaseModel):
validation: StrictBool = False
walltime: str = "00:30:00"
scheduler_options: str = ""
retries: Annotated[int, Field(strict=True, ge=1, le=3)] = 2
retries: Annotated[int, Field(strict=True, ge=0, le=3)] = 2
max_blocks: Annotated[int, Field(strict=True, ge=1)] = 1 # one block mean one job?
worker_init: str = ""
overrides: str = ""
Expand Down Expand Up @@ -111,7 +115,23 @@ def _validate_walltime(cls, v: str) -> str:
return v


def with_memory_limit(limit_gb: float) -> Callable[[F], F]:
"""Set memory limit for a parsl worker"""

def decorator(func: F) -> F:
def wrapper(*args: Any, **kwargs: Any) -> Any:
bytes_limit = int(limit_gb * 1024 * 1024 * 1024)
soft, hard = bytes_limit, bytes_limit
resource.setrlimit(resource.RLIMIT_AS, (soft, hard))
return func(*args, **kwargs)

return cast(F, wrapper)

return decorator


@python_app
@with_memory_limit(7.0)
def _process_run(definition: ExecutionDefinition, log_level: str) -> ExecutionResult:
"""Run the function on computer nodes"""
# This is a catch-all for any exceptions that occur in the process and need to raise for
Expand Down
Loading