Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion nemo_run/run/ray/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ def start(
)

def stop(self, wait: bool = False) -> None:
self.backend.stop(wait=wait) # type: ignore[attr-defined]
if isinstance(self.backend, KubeRayJob):
self.backend.stop() # type: ignore[attr-defined]
else:
self.backend.stop(wait=wait) # type: ignore[attr-defined]

def status(self, display: bool = True):
return self.backend.status(display=display) # type: ignore[attr-defined]
Expand Down
30 changes: 28 additions & 2 deletions nemo_run/run/ray/kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,20 @@ class KubeRayCluster:
def __post_init__(self) -> None: # noqa: D401 – simple verb is fine
"""Initialise Kubernetes API clients once the instance is created."""
# Load local kube-config once; the function returns *None* so we don't store it.
config.load_kube_config()
try:
config.load_kube_config()
except Exception as kube_config_error:
logger.error(
"Error loading kube-config: %s, trying with incluster config", kube_config_error
)
try:
config.load_incluster_config()
except Exception as incluster_config_error:
logger.error(
"Error loading incluster config: %s, raising original error",
incluster_config_error,
)
raise kube_config_error from incluster_config_error

# The dedicated clients are what we interact with throughout the class
# – separating CoreV1 for pods/services from CustomObjects for CRDs.
Expand Down Expand Up @@ -732,7 +745,20 @@ class KubeRayJob:
executor: KubeRayExecutor

def __post_init__(self):
config.load_kube_config()
try:
config.load_kube_config()
except Exception as kube_config_error:
logger.error(
"Error loading kube-config: %s, trying with incluster config", kube_config_error
)
try:
config.load_incluster_config()
except Exception as incluster_config_error:
logger.error(
"Error loading incluster config: %s, raising original error",
incluster_config_error,
)
raise kube_config_error from incluster_config_error

# Lazily create K8s API clients if not supplied
self.api = client.CustomObjectsApi()
Expand Down
131 changes: 131 additions & 0 deletions test/run/ray/test_kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2074,3 +2074,134 @@ def test_cluster_create_without_lifecycle_kwargs(self, mock_k8s_clients):
# Should create lifecycle_kwargs and succeed
assert hasattr(executor, "lifecycle_kwargs")
assert mock_api.create_namespaced_custom_object.called


class TestKubeConfigLoadingFallback:
"""Test kube config loading with fallback to incluster config."""

def test_kuberay_cluster_kube_config_success(self):
"""Test KubeRayCluster when kube config loads successfully."""
with patch("nemo_run.run.ray.kuberay.config.load_kube_config") as mock_load_kube:
with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster:
with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"):
with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"):
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
# Create cluster to trigger __post_init__ which loads config
_ = KubeRayCluster(name="test-cluster", executor=executor)

# Verify kube config was loaded and incluster was NOT called
assert mock_load_kube.call_count >= 1
# incluster should not be called when kube config succeeds
mock_incluster.assert_not_called()

def test_kuberay_cluster_fallback_to_incluster(self):
"""Test KubeRayCluster falls back to incluster config when kube config fails."""
kube_error = Exception("Kube config file not found")

with patch(
"nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error
) as mock_load_kube:
with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster:
with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"):
with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"):
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
# Create cluster to trigger __post_init__ which loads config
_ = KubeRayCluster(name="test-cluster", executor=executor)

# Verify both were called
assert mock_load_kube.call_count >= 1
assert mock_incluster.call_count >= 1

def test_kuberay_cluster_both_configs_fail(self):
"""Test KubeRayCluster raises original error when both configs fail."""
kube_error = Exception("Kube config file not found")
incluster_error = Exception("Not running inside a cluster")

with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error):
with patch(
"nemo_run.run.ray.kuberay.config.load_incluster_config",
side_effect=incluster_error,
):
with pytest.raises(Exception) as exc_info:
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
KubeRayCluster(name="test-cluster", executor=executor)

# Should raise the original kube config error (not the incluster error)
assert exc_info.value == kube_error
assert "Kube config file not found" in str(exc_info.value)

def test_kuberay_job_kube_config_success(self):
"""Test KubeRayJob when kube config loads successfully."""
with patch("nemo_run.run.ray.kuberay.config.load_kube_config") as mock_load_kube:
with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster:
with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"):
with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"):
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
# Create job to trigger __post_init__ which loads config
_ = KubeRayJob(name="test-job", executor=executor)

# Verify kube config was loaded
assert mock_load_kube.call_count >= 1
# incluster should not be called when kube config succeeds
mock_incluster.assert_not_called()

def test_kuberay_job_fallback_to_incluster(self):
"""Test KubeRayJob falls back to incluster config when kube config fails."""
kube_error = Exception("Kube config file not found")

with patch(
"nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error
) as mock_load_kube:
with patch("nemo_run.run.ray.kuberay.config.load_incluster_config") as mock_incluster:
with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"):
with patch("nemo_run.run.ray.kuberay.client.CoreV1Api"):
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
# Create job to trigger __post_init__ which loads config
_ = KubeRayJob(name="test-job", executor=executor)

# Verify both were called
assert mock_load_kube.call_count >= 1
assert mock_incluster.call_count >= 1

def test_kuberay_job_both_configs_fail(self):
"""Test KubeRayJob raises original error when both configs fail."""
kube_error = Exception("Kube config file not found")
incluster_error = Exception("Not running inside a cluster")

with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error):
with patch(
"nemo_run.run.ray.kuberay.config.load_incluster_config",
side_effect=incluster_error,
):
with pytest.raises(Exception) as exc_info:
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
KubeRayJob(name="test-job", executor=executor)

# Should raise the original kube config error (not the incluster error)
assert exc_info.value == kube_error
assert "Kube config file not found" in str(exc_info.value)

def test_error_chaining_preserved(self):
"""Test that error chaining is preserved (raise X from Y)."""
kube_error = Exception("Kube config file not found")
incluster_error = Exception("Not running inside a cluster")

with patch("nemo_run.run.ray.kuberay.config.load_kube_config", side_effect=kube_error):
with patch(
"nemo_run.run.ray.kuberay.config.load_incluster_config",
side_effect=incluster_error,
):
with pytest.raises(Exception) as exc_info:
with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"):
executor = KubeRayExecutor(namespace="test-namespace")
KubeRayJob(name="test-job", executor=executor)

# Verify error chaining (raise kube_error from incluster_error)
assert exc_info.value == kube_error
assert exc_info.value.__cause__ == incluster_error
Loading