Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ To install Skypilot with optional features, use one of the following commands:

You can also manually install Skypilot from https://skypilot.readthedocs.io/en/latest/getting-started/installation.html

If using DGX Cloud Lepton, use the following command to install the Lepton CLI:
If using DGX Cloud Lepton, install NeMo Run with the Lepton extra:

```bash
pip install leptonai
pip install "nemo_run[lepton]"
```

To authenticate with the DGX Cloud Lepton cluster, navigate to the **Settings > Tokens** page in the DGX Cloud Lepton UI and copy the ``lep login`` command shown on the page and run it in the terminal.
Expand Down
71 changes: 54 additions & 17 deletions nemo_run/core/execution/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import base64
import logging
import os
Expand All @@ -22,36 +24,67 @@
import time
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any, List, Optional, Set, Type

from invoke.context import Context
from leptonai.api.v2.client import APIClient
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.common import Metadata, LeptonVisibility
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import (
EnvVar,
EnvValue,
LeptonContainer,
Mount,
)
from leptonai.api.v1.types.job import (
LeptonJob,
LeptonJobState,
LeptonJobUserSpec,
ReservationConfig,
)
from leptonai.api.v1.types.replica import Replica

from nemo_run.config import get_nemorun_home
from nemo_run.core.execution.base import Executor, ExecutorMacros
from nemo_run.core.packaging.base import Packager
from nemo_run.core.packaging.git import GitArchivePackager

_LEPTON_IMPORT_ERROR: ImportError | None = None
_LEPTON_AVAILABLE = False

try:
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import (
EnvVar,
EnvValue,
LeptonContainer,
Mount,
)
from leptonai.api.v1.types.job import (
LeptonJob,
LeptonJobState,
LeptonJobUserSpec,
ReservationConfig,
)
from leptonai.api.v1.types.replica import Replica
from leptonai.api.v2.client import APIClient

_LEPTON_AVAILABLE = True
except ImportError as e:
_LEPTON_IMPORT_ERROR = e

class LeptonJobState(Enum):
Starting = "Starting"
Running = "Running"
Failed = "Failed"
Completed = "Completed"
Deleting = "Deleting"
Restarting = "Restarting"
Archived = "Archived"
Stopped = "Stopped"
Stopping = "Stopping"
Unknown = "Unknown"


logger = logging.getLogger(__name__)


def _require_leptonai() -> None:
if not _LEPTON_AVAILABLE:
raise ImportError(
"leptonai package is required for LeptonExecutor. "
'Install it with: pip install "nemo_run[lepton]"'
) from _LEPTON_IMPORT_ERROR


@dataclass(kw_only=True)
class LeptonExecutor(Executor):
"""
Expand Down Expand Up @@ -84,6 +117,9 @@ class LeptonExecutor(Executor):
head_resource_shape: Optional[str] = "" # Only used for LeptonRayCluster
ray_version: Optional[str] = None # Only used for LeptonRayCluster

def __post_init__(self) -> None:
_require_leptonai()

def stop_job(self, job_id: str):
"""
Send a stop signal to the requested job
Expand Down Expand Up @@ -376,6 +412,7 @@ def cancel(self, job_id: str):

@classmethod
def logs(cls: Type["LeptonExecutor"], app_id: str, fallback_path: Optional[str]):
_require_leptonai()
client = APIClient()

# Get the first replica from the job which contains the job logs
Expand Down
61 changes: 44 additions & 17 deletions nemo_run/run/ray/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,45 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import asyncio
import json
import logging
import sys
import time
import urllib3
import warnings
from dataclasses import dataclass
from ray.job_submission import JobSubmissionClient
from rich.pretty import pretty_repr
from typing import Any, Optional, TypeAlias

from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import EnvVar, EnvValue

from nemo_run.core.execution.lepton import LeptonExecutor
import urllib3
from rich.pretty import pretty_repr

from leptonai.api.v2.client import APIClient
from leptonai.api.v1.types.raycluster import (
LeptonRayCluster as LeptonRayClusterSpec,
LeptonRayClusterUserSpec,
Metadata,
RayHeadGroupSpec,
RayWorkerGroupSpec,
)
from leptonai.cli.raycluster import DEFAULT_RAY_IMAGE
from nemo_run.core.execution.lepton import LeptonExecutor, _LEPTON_AVAILABLE, _require_leptonai

_RAY_IMPORT_ERROR: ImportError | None = None
_RAY_AVAILABLE = False
try:
from ray.job_submission import JobSubmissionClient

_RAY_AVAILABLE = True
except ImportError as e:
_RAY_IMPORT_ERROR = e
JobSubmissionClient = None

if _LEPTON_AVAILABLE:
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
from leptonai.api.v1.types.deployment import EnvVar, EnvValue
from leptonai.api.v1.types.raycluster import (
LeptonRayCluster as LeptonRayClusterSpec,
LeptonRayClusterUserSpec,
Metadata,
RayHeadGroupSpec,
RayWorkerGroupSpec,
)
from leptonai.api.v2.client import APIClient
from leptonai.cli.raycluster import DEFAULT_RAY_IMAGE

noquote: TypeAlias = str

Expand All @@ -49,6 +61,19 @@
RAY_NOT_READY_STATE = "Not Ready"


def _require_ray() -> None:
if not _RAY_AVAILABLE:
raise ImportError(
"ray is required for Lepton Ray helpers. "
'Install it with: pip install "nemo_run[lepton]"'
) from _RAY_IMPORT_ERROR


def _require_lepton_ray() -> None:
_require_leptonai()
_require_ray()


@dataclass(kw_only=True)
class LeptonRayCluster:
EXECUTOR_CLS = LeptonExecutor
Expand All @@ -57,6 +82,7 @@ class LeptonRayCluster:
executor: LeptonExecutor

def __post_init__(self):
_require_lepton_ray()
self.cluster_map: dict[str, str] = {}

def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
Expand Down Expand Up @@ -374,6 +400,7 @@ class LeptonRayJob:
# Internals
# ---------------------------------------------------------------------
def __post_init__(self):
_require_lepton_ray()
self.submission_id = None

def _get_last_submission_id(self) -> Optional[int]:
Expand Down
9 changes: 4 additions & 5 deletions nemo_run/run/torchx_backend/schedulers/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

import fiddle as fdl
import fiddle._src.experimental.dataclasses as fdl_dc
from leptonai.api.v1.types.job import LeptonJobState
from torchx.schedulers.api import AppDryRunInfo, DescribeAppResponse, ListAppResponse, Scheduler
from torchx.specs import AppDef, AppState, ReplicaStatus, Role, RoleStatus, runopts

from nemo_run.config import get_nemorun_home
from nemo_run.core.execution.base import Executor
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.lepton import LeptonExecutor, LeptonJobState, _require_leptonai
from nemo_run.core.serialization.zlib_json import ZlibJSONSerializer
from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin

Expand Down Expand Up @@ -70,6 +69,7 @@ class LeptonRequest:
class LeptonScheduler(SchedulerMixin, Scheduler[dict[str, str]]): # type: ignore
def __init__(self, session_name: str) -> None:
super().__init__("lepton", session_name)
_require_leptonai()

def _run_opts(self) -> runopts:
opts = runopts()
Expand All @@ -86,9 +86,8 @@ def _submit_dryrun( # type: ignore
app: AppDef,
cfg: Executor,
) -> AppDryRunInfo[LeptonRequest]:
assert isinstance(cfg, LeptonExecutor), (
f"{cfg.__class__} not supported for Lepton scheduler."
)
if not isinstance(cfg, LeptonExecutor):
raise AssertionError(f"{cfg.__class__} not supported for Lepton scheduler.")
executor = cfg

assert len(app.roles) == 1, "Only single-role apps are supported."
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies = [
"jinja2>=3.1.4",
"networkx >= 3.3",
"omegaconf>=2.3.0",
"leptonai>=0.26.6",
"toml",
]
readme = "README.md"
Expand All @@ -50,6 +49,9 @@ skypilot_jobs = "nemo_run.run.torchx_backend.schedulers.skypilot_jobs:create_sch
kubeflow = "nemo_run.run.torchx_backend.schedulers.kubeflow:create_scheduler"

[project.optional-dependencies]
lepton = [
"leptonai>=0.26.6",
]
skypilot = [
"skypilot[kubernetes]>=0.10.0",
]
Expand Down
13 changes: 8 additions & 5 deletions test/core/execution/test_lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@
from unittest.mock import MagicMock, mock_open, patch

import pytest
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
from leptonai.api.v1.types.deployment import (

pytest.importorskip("leptonai")

from leptonai.api.v1.types.common import LeptonVisibility, Metadata # noqa: E402
from leptonai.api.v1.types.deployment import ( # noqa: E402
LeptonContainer,
LeptonResourceAffinity,
Mount,
EnvVar,
EnvValue,
)
from leptonai.api.v1.types.job import LeptonJob, LeptonJobUserSpec
from leptonai.api.v1.types.job import LeptonJob, LeptonJobUserSpec # noqa: E402

from nemo_run.core.execution.lepton import LeptonExecutor, LeptonJobState
from nemo_run.core.packaging.git import GitArchivePackager
from nemo_run.core.execution.lepton import LeptonExecutor, LeptonJobState # noqa: E402
from nemo_run.core.packaging.git import GitArchivePackager # noqa: E402


class MockLeptonJob:
Expand Down
93 changes: 93 additions & 0 deletions test/core/execution/test_lepton_optional_dependency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import subprocess
import sys
import textwrap
from pathlib import Path


REPO_ROOT = Path(__file__).parents[3]


def _run_with_blocked_leptonai(code: str) -> subprocess.CompletedProcess[str]:
blocker = """
import importlib.abc
import sys


class BlockLeptonai(importlib.abc.MetaPathFinder):
def find_spec(self, fullname, path=None, target=None):
if fullname == "leptonai" or fullname.startswith("leptonai."):
raise ModuleNotFoundError("No module named 'leptonai'")
return None


sys.meta_path.insert(0, BlockLeptonai())
"""
script = blocker + "\n" + textwrap.dedent(code)
return subprocess.run(
[sys.executable, "-c", script],
cwd=REPO_ROOT,
text=True,
capture_output=True,
check=False,
)


def test_nemo_run_import_without_leptonai() -> None:
result = _run_with_blocked_leptonai(
"""
import sys

import nemo_run as run
from nemo_run import LeptonExecutor as PublicLeptonExecutor
from nemo_run.core.execution import LeptonExecutor as ExecutionLeptonExecutor

assert run.LocalExecutor.__name__ == "LocalExecutor"
assert run.LeptonExecutor.__name__ == "LeptonExecutor"
assert PublicLeptonExecutor is run.LeptonExecutor
assert ExecutionLeptonExecutor is run.LeptonExecutor
assert "leptonai" not in sys.modules

try:
run.LeptonExecutor(container_image="image", nemo_run_dir="/nemo")
except ImportError as e:
assert "nemo_run[lepton]" in str(e)
else:
raise AssertionError("LeptonExecutor should require the lepton extra")
"""
)

assert result.returncode == 0, result.stderr


def test_scheduler_and_ray_modules_import_without_leptonai() -> None:
result = _run_with_blocked_leptonai(
"""
import sys

from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.run.torchx_backend.schedulers.api import REVERSE_EXECUTOR_MAPPING
import nemo_run.run.ray.cluster
import nemo_run.run.ray.job
import nemo_run.run.ray.lepton

assert REVERSE_EXECUTOR_MAPPING["lepton"] is LeptonExecutor
assert "leptonai" not in sys.modules
"""
)

assert result.returncode == 0, result.stderr
Loading