Skip to content

Commit 6f63116

Browse files
ko3n1gclaude
andcommitted
feat: add KubeflowExecutor for Kubeflow Training Operator on Kubernetes
Introduces KubeflowExecutor and a matching TorchX scheduler so users can deploy distributed training jobs to any Kubernetes cluster running the Kubeflow Training Operator via run.run() / run.Experiment. Supported job kinds (toggled via job_kind field): - PyTorchJob (Training Operator v1, kubeflow.org/v1) - TrainJob (Training Operator v2, trainer.kubeflow.org/v1alpha1) Key features: - Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback) - PyTorchJob: builds Master + Worker replica specs with nprocPerNode - TrainJob: builds spec.trainer + merges all pod-level config (volumes, tolerations, affinity, imagePullSecrets, resourceClaims, etc.) into a single podTemplateOverrides entry targeting "node" - env_list field supports full env var dicts (valueFrom / secretKeyRef) - pod_spec_overrides merges arbitrary extra fields into the pod spec - launch(wait=True) polls until RUNNING / SUCCEEDED / FAILED - cancel(wait=True) polls until CR is gone and all pods are terminated - TorchX scheduler persists job state in ~/.nemo_run/.kubeflow_jobs.json and maps KubeflowJobState → AppState (UNKNOWN/None → PENDING to avoid false failures on transient API errors) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: oliver könig <okoenig@nvidia.com>
1 parent e04fe9d commit 6f63116

3 files changed

Lines changed: 47 additions & 1 deletion

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,3 +183,4 @@ _version.py
183183

184184
# NeMo Run
185185
.nemo_run/
186+
local/

docs/guides/execution.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ The packager support matrix is described below:
5353
| SkypilotExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
5454
| DGXCloudExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
5555
| LeptonExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
56+
| KubeflowExecutor | run.Packager |
5657

5758
`run.Packager` is a passthrough base packager.
5859

@@ -293,6 +294,49 @@ def your_dgx_executor(nodes: int, gpus_per_node: int, container_image: str):
293294

294295
For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDIA DGX Cloud NeMo End-to-End Workflow Example](https://docs.nvidia.com/dgx-cloud/run-ai/latest/nemo-e2e-example.html).
295296

297+
#### KubeflowExecutor
298+
299+
The `KubeflowExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed training jobs on any Kubernetes cluster. It submits CRDs directly via the Kubernetes API — no `kubectl` required.
300+
301+
Two job kinds are supported via the `job_kind` parameter:
302+
303+
- **`"PyTorchJob"`** (default) — Training Operator v1 (`kubeflow.org/v1`)
304+
- **`"TrainJob"`** — Training Operator v2 (`trainer.kubeflow.org/v1alpha1`)
305+
306+
Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod.
307+
308+
Here's an example configuration:
309+
310+
```python
311+
# PyTorchJob (default)
312+
executor = run.KubeflowExecutor(
313+
namespace="runai-nemo-ci",
314+
image="nvcr.io/nvidian/nemo:nightly",
315+
num_nodes=3, # total pods: 1 Master + (num_nodes-1) Workers
316+
gpus_per_node=8, # also sets nproc_per_node unless overridden explicitly
317+
cpu_requests="16",
318+
memory_requests="64Gi",
319+
volumes=[
320+
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "nemo-ci-datasets-project-nkf5l"}}
321+
],
322+
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
323+
labels={"app": "nemo-ci-training"},
324+
env_vars={"NCCL_DEBUG": "INFO"},
325+
)
326+
327+
# TrainJob (Training Operator v2)
328+
executor = run.KubeflowExecutor(
329+
job_kind="TrainJob",
330+
runtime_ref="torch-distributed", # name of the ClusterTrainingRuntime
331+
namespace="runai-nemo-ci",
332+
image="nvcr.io/nvidian/nemo:nightly",
333+
num_nodes=3,
334+
gpus_per_node=8,
335+
)
336+
```
337+
338+
`cancel(wait=True)` polls until both the CR and all associated pods are fully terminated before returning.
339+
296340
#### LeptonExecutor
297341

298342
The `LeptonExecutor` integrates with an NVIDIA DGX Cloud Lepton cluster's Python SDK to launch distributed jobs. It uses API calls behind the Lepton SDK to authenticate, identify the target node group and resource shapes, and submit the job specification which will be launched as a batch job on the cluster.

nemo_run/run/torchx_backend/schedulers/kubeflow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ def _cancel_existing(self, app_id: str) -> None:
194194
return None
195195
executor.cancel(job_name)
196196

197-
def list(self) -> list[ListAppResponse]: ...
197+
def list(self) -> list[ListAppResponse]:
198+
return []
198199

199200
def _validate(self, app: AppDef, scheduler: str) -> None:
200201
pass

0 commit comments

Comments
 (0)