Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/kubeflow/hello_kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,10 @@
],
# Sync the generated launch script to the pod via PVC before launch.
# Required whenever you use a custom launcher (e.g. run.Torchrun()).
workdir_pvc=args.pvc,
workdir_pvc_path="/nemo-workspace",
# Must match the workdir volume name and mountPath above.
workdir_volume_mount=(
{"name": "workdir", "mountPath": "/nemo-workspace"} if args.pvc else None
),
labels={"app": JOB_NAME},
)

Expand Down
111 changes: 70 additions & 41 deletions nemo_run/core/execution/kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import getpass
import logging
import os
Expand Down Expand Up @@ -92,10 +93,15 @@ class KubeflowExecutor(Executor):
image_pull_secrets: list[str] = field(default_factory=list)
spec_kwargs: dict[str, Any] = field(default_factory=dict)
container_kwargs: dict[str, Any] = field(default_factory=dict)
# Workdir sync: if set, package() rsyncs job_dir → PVC before launch and
# pull_results() rsyncs the PVC back to job_dir after the job completes.
workdir_pvc: Optional[str] = None
workdir_pvc_path: str = "/nemo_run"
# Workdir sync: when ``workdir_volume_mount`` is set, package() rsyncs job_dir → PVC
# before launch and pull_results() rsyncs back. You must declare the matching Volume
# in ``volumes`` (typically persistentVolumeClaim). On construction,
# ``__post_init__`` validates that layout and merges work/data mounts into ``volume_mounts``
# when the same ``(name, mountPath, subPath)`` is not already listed.
workdir_volume_mount: Optional[dict[str, Any]] = None
# Optional subdirectory appended to ``mountPath`` to form ``code_dir``.
# Defaults to OS username. Set to ``None`` or ``""`` to use``mountPath``
workdir_subdir: Optional[str] = field(default_factory=lambda: getpass.getuser())
# Optional local directory whose contents are merged into job_dir before
# the PVC sync. Use this to include local scripts/files that are not
# generated by the packager (e.g. a hand-written training script).
Expand Down Expand Up @@ -132,12 +138,53 @@ def nnodes(self) -> int:

@property
def code_dir(self) -> str:
"""Subdirectory on the PVC where user code (launch.sh, scripts) is synced.

Scoped to ``<workdir_pvc_path>/<username>/code`` so multiple users sharing
the same PVC never clobber each other's files.
"""Directory inside the trainer pod where user code (launch.sh, scripts) lives.

Computed as ``mountPath[/workdir_subdir]``. ``workdir_subdir`` defaults to
the OS username so that multiple users sharing a PVC don't collide; set it to
``None`` or ``""`` when ``subPath`` or ``mountPath`` already provides scoping.
When ``workdir_volume_mount`` is unset, falls back to ``/nemo_run`` for local
planning only (no PVC sync). When ``workdir_volume_mount`` is set, ``mountPath``
must be present or a ``ValueError`` is raised.
"""
return f"{self.workdir_pvc_path.rstrip('/')}/{getpass.getuser()}/code"
root = self._code_mount_root()
if self.workdir_subdir:
return f"{root.rstrip('/')}/{self.workdir_subdir.strip('/')}"
return root

def _code_mount_root(self) -> str:
wm = self.workdir_volume_mount
if not wm:
return "/nemo_run"
mp = wm.get("mountPath")
if not mp:
raise ValueError(
"workdir_volume_mount must include mountPath "
"(or omit workdir_volume_mount when not using PVC code sync)"
)
return str(mp)

def _get_volume_spec_copy_by_name(self, volume_name: str) -> dict[str, Any]:
"""Get a copy of a volume spec by name, e.g. the name used in a mount"""
for v in self.volumes:
if v.get("name") == volume_name:
return copy.deepcopy(dict(v))
raise ValueError(
f"volumes must include an entry named {volume_name!r} referenced by "
"workdir_volume_mount (needed for TrainJob pod and data-mover pod)"
)

@staticmethod
def _volume_mount_identity(vm: dict[str, Any]) -> tuple[Any, ...]:
"""For checking if a volume mount is already in the list of volume mounts."""
sp = vm.get("subPath")
return (vm.get("name"), vm.get("mountPath"), sp)

def _append_volume_mount_if_missing(self, spec: dict[str, Any]) -> None:
want = self._volume_mount_identity(spec)
if any(self._volume_mount_identity(vm) == want for vm in self.volume_mounts):
return
self.volume_mounts.append(dict(spec))

def nproc_per_node(self) -> int:
"""Return processes per node: nprocs_per_node → gpus_per_node → 1."""
Expand Down Expand Up @@ -449,30 +496,25 @@ def _data_mover_pod_name(self, job_name: str) -> str:
return f"{job_name}-data-mover"

def _start_data_mover_pod(self, pod_name: str, timeout: int = 120) -> None:
"""Spin up a throw-away Alpine pod that mounts workdir_pvc and blocks until Running.
"""Spin up a throw-away Alpine pod that mounts the work PVC and blocks until Running.

Uses ``kubectl cp`` (tar-based, built into Alpine — no internet needed) for data
transfer. The pod inherits tolerations, affinity, and imagePullSecrets from the
main workload so it can be scheduled on the same nodes (required when the PVC is
zone- or node-local).
"""
vol_name = "nemo-run-workdir"
volume_spec = self._get_volume_spec_copy_by_name(self.workdir_volume_mount["name"])
pod_spec: dict[str, Any] = {
"restartPolicy": "Never",
"containers": [
{
"name": "mover",
"image": self.data_mover_image,
"command": ["sleep", "infinity"],
"volumeMounts": [{"name": vol_name, "mountPath": self.workdir_pvc_path}],
}
],
"volumes": [
{
"name": vol_name,
"persistentVolumeClaim": {"claimName": self.workdir_pvc},
"volumeMounts": [self.workdir_volume_mount],
}
],
"volumes": [volume_spec],
}
if self.tolerations:
pod_spec["tolerations"] = self.tolerations
Expand Down Expand Up @@ -586,13 +628,13 @@ def materialize_launch_script(self, cmd: list[str], max_retries: int = 0) -> Non
logger.info("Wrote launch script to %s", launch_script_path)

def package(self, packager: Packager, job_name: str) -> None:
"""Sync job_dir to the workdir PVC via a temporary data-mover pod before launch.
"""Sync job_dir to the PVC declared for ``workdir_volume_mount`` via a data-mover pod.

Does nothing when ``workdir_pvc`` is unset. If ``workdir_local_path`` is set,
Does nothing when ``workdir_volume_mount`` is unset. If ``workdir_local_path`` is set,
its contents are first rsynced into ``job_dir`` so hand-written scripts are
included alongside generated files such as ``launch.sh``.
"""
if not self.workdir_pvc:
if not self.workdir_volume_mount:
return
# Merge extra local files (e.g. training scripts) into job_dir so they
# are included alongside generated files like launch.sh.
Expand All @@ -608,33 +650,18 @@ def package(self, packager: Packager, job_name: str) -> None:
)
logger.info("Merged '%s' into job_dir '%s'", self.workdir_local_path, self.job_dir)

# Sync job_dir to <workdir_pvc_path>/<username>/code on the PVC via a
# throw-away data-mover pod. Scoping to a user subdirectory means we
# never clobber other data already on the shared volume.
# Sync job_dir to ``code_dir`` on the PVC via throw-away data-mover pod.
pod_name = self._data_mover_pod_name(job_name)
self._start_data_mover_pod(pod_name)
try:
self._rsync_to_pod(pod_name, self.job_dir, self.code_dir)
finally:
self._delete_data_mover_pod(pod_name)

# Mount the PVC so the training container can reach code_dir.
# If the PVC is already declared (e.g. explicitly by the caller for data),
# reuse that existing volume rather than adding a duplicate entry.
already_mounted = any(
v.get("persistentVolumeClaim", {}).get("claimName") == self.workdir_pvc
for v in self.volumes
)
if not already_mounted:
vol_name = "nemo-run-workdir"
self.volumes.append(
{"name": vol_name, "persistentVolumeClaim": {"claimName": self.workdir_pvc}}
)
if not any(vm.get("mountPath") == self.workdir_pvc_path for vm in self.volume_mounts):
self.volume_mounts.append({"name": vol_name, "mountPath": self.workdir_pvc_path})
self._append_volume_mount_if_missing(self.workdir_volume_mount)

def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None:
"""Sync workdir_pvc_path back to a local directory after the job completes.
"""Sync the remote code subtree (``code_dir``) back to a local directory.

Args:
job_name: The job name used when the job was launched.
Expand All @@ -643,8 +670,10 @@ def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None:
persisted job state in ``~/.nemo_run/.kubeflow_jobs.json`` to
find the original ``job_dir``.
"""
if not self.workdir_pvc:
logger.warning("pull_results called but workdir_pvc is not set — nothing to sync")
if not self.workdir_volume_mount:
logger.warning(
"pull_results called but workdir_volume_mount is not set — nothing to sync"
)
return

local_path = dest_dir or getattr(self, "job_dir", "") or ""
Expand Down
47 changes: 47 additions & 0 deletions nemo_run/core/execution/kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Based on https://github.com/ray-project/kuberay/blob/master/clients/python-client/python_client/utils/kuberay_cluster_utils.py

import copy
import getpass
import logging
import os
import re
Expand Down Expand Up @@ -92,6 +93,13 @@ class KubeRayExecutor(Executor):
spec_kwargs: dict[str, Any] = field(default_factory=dict)
container_kwargs: dict[str, Any] = field(default_factory=dict)
lifecycle_kwargs: dict[str, Any] = field(default_factory=dict)
# Workdir sync: when ``workdir_volume_mount`` is set, syncs local
# directory into ``code_dir`` on the PVC via a throw-away data-mover
# You must declare the matching Volume in ``volumes``
workdir_volume_mount: Optional[dict[str, Any]] = None
# Optional subdirectory appended to ``mountPath`` to form ``code_dir``.
# Defaults to OS username. Set to ``None`` or ``""`` to use``mountPath``
workdir_subdir: Optional[str] = field(default_factory=lambda: getpass.getuser())

def __post_init__(self):
# Set default image based on ray_version if not provided
Expand All @@ -103,6 +111,45 @@ def __post_init__(self):
worker_group.volumes = copy.deepcopy(self.volumes)
worker_group.volume_mounts = copy.deepcopy(self.volume_mounts)

def _code_mount_root(self) -> str:
wm = self.workdir_volume_mount
if not wm:
raise ValueError(
"workdir_volume_mount is not set — cannot compute code_dir. "
"Set workdir_volume_mount on the executor, or omit workdir when starting a job."
)
mp = wm.get("mountPath")
if not mp:
raise ValueError(
"workdir_volume_mount must include mountPath "
"(or omit workdir_volume_mount when not using PVC code sync)"
)
return str(mp)

@property
def code_dir(self) -> str:
"""Remote directory where workdir contents are synced on the PVC.

Computed as ``mountPath[/workdir_subdir]``. ``workdir_subdir`` defaults to
the OS username so that multiple users sharing a PVC don't collide; set it to
``None`` or ``""`` when ``subPath`` or ``mountPath`` already provides scoping.
Raises ``ValueError`` when ``workdir_volume_mount`` is not configured.
"""
root = self._code_mount_root()
if self.workdir_subdir:
return f"{root.rstrip('/')}/{self.workdir_subdir.strip('/')}"
return root

def _get_volume_spec_copy_by_name(self, volume_name: str) -> dict[str, Any]:
"""Return a deep copy of the volume spec matching *volume_name*."""
for v in self.volumes:
if v.get("name") == volume_name:
return copy.deepcopy(dict(v))
raise ValueError(
f"volumes must include an entry named {volume_name!r} referenced by "
"workdir_volume_mount (needed for the data-mover pod)"
)

def get_cluster_body(self, name: str) -> dict[str, Any]:
"""
Get the body for the Ray cluster custom resource.
Expand Down
32 changes: 16 additions & 16 deletions nemo_run/run/ray/kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,10 @@ def status(self, display: bool = True) -> Dict[str, Any]:
[vm.get("mountPath", "N/A") for vm in self.executor.volume_mounts]
)

# Construct workdir paths based on standard patterns
# Note: These are estimates based on the naming conventions in the code
user_workspace_base = f"{self.executor.volume_mounts[0]['mountPath']}/{self.user}/code"
if self.executor.workdir_volume_mount:
user_workspace_base = self.executor.code_dir
else:
user_workspace_base = "N/A (no workdir_volume_mount configured)"

logger.info(
f"""
Expand Down Expand Up @@ -1029,26 +1030,26 @@ def start(
user_workspace_path = None

if workdir:
if not executor.volumes or not executor.volume_mounts:
if not executor.workdir_volume_mount:
raise ValueError(
"`workdir` specified but executor has no volumes/volume_mounts to mount it."
"`workdir` specified but executor has no `workdir_volume_mount` configured."
)

user_workspace_path = os.path.join(
executor.volume_mounts[0]["mountPath"], self.user, "code", Path(workdir).name
)
# Add user-based scoping to pod name and workspace path
user_workspace_path = f"{executor.code_dir.rstrip('/')}/{Path(workdir).name}"
pod_name = f"{self.job_name}-data-mover"

if not dryrun:
volume_spec = executor._get_volume_spec_copy_by_name(
executor.workdir_volume_mount["name"]
)
sync_workdir_via_pod(
pod_name=pod_name,
namespace=namespace,
user_workspace_path=user_workspace_path,
workdir=workdir,
core_v1_api=self.core_v1_api,
volumes=executor.volumes,
volume_mounts=executor.volume_mounts,
volumes=[volume_spec],
volume_mounts=[executor.workdir_volume_mount],
)
logger.info(f"Synced workdir {workdir} to {user_workspace_path}")

Expand All @@ -1066,12 +1067,11 @@ def start(
ray_cluster_spec = ray_cluster_body.get("spec", {})

# Ensure consistent workingDir inside all Ray containers so that relative
# paths in `ray job submit` resolve as expected.
# paths in `ray job submit` resolve as expected. When a workdir was synced
# this is the same path used by sync_workdir_via_pod so files are found.
container_workdir = "/workspace"
if workdir:
container_workdir = os.path.join(
executor.volume_mounts[0]["mountPath"], Path(workdir).name
)
if workdir and executor.workdir_volume_mount:
container_workdir = f"{executor.code_dir.rstrip('/')}/{Path(workdir).name}"

def _apply_workdir(pod_template: dict):
try:
Expand Down
4 changes: 2 additions & 2 deletions nemo_run/run/torchx_backend/schedulers/kubeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def _submit_dryrun( # type: ignore

cmd = [role.entrypoint] + role.args

# When workdir_pvc is configured, materialise a launch.sh from the
# When workdir_volume_mount is configured, materialise a launch.sh from the
# Jinja2 template (env vars + training command) and point the job at
# it so torchrun / launcher details stay out of the manifest.
if executor.workdir_pvc and getattr(executor, "job_dir", None):
if executor.workdir_volume_mount and getattr(executor, "job_dir", None):
# Rewrite any local workdir_local_path references in the cmd.
if executor.workdir_local_path:
local_prefix = executor.workdir_local_path.rstrip(os.sep)
Expand Down
Loading