Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/together/lib/cli/api/beta/clusters/_json_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

import json
from typing import Any, cast
from pathlib import Path


def parse_json_object(value: str, parameter_name: str) -> dict[str, Any]:
parsed = _parse_json(value, parameter_name)
if not isinstance(parsed, dict):
raise ValueError(f"{parameter_name} must be a JSON object")
return cast(dict[str, Any], parsed)


def parse_json_array(value: str, parameter_name: str) -> list[Any]:
parsed = _parse_json(value, parameter_name)
if not isinstance(parsed, list):
raise ValueError(f"{parameter_name} must be a JSON array")
return cast(list[Any], parsed)


def _parse_json(value: str, parameter_name: str) -> Any:
if value.startswith("@"):
value = Path(value[1:]).read_text()
try:
return json.loads(value)
except json.JSONDecodeError as exc:
raise ValueError(f"{parameter_name} must be valid JSON") from exc
65 changes: 64 additions & 1 deletion src/together/lib/cli/api/beta/clusters/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
NumGpusParameter = Annotated[Optional[int], Parameter(help="Number of GPUs to allocate in the cluster")]
RegionParameter = Annotated[Optional[str], Parameter(help="Region to create the cluster in")]
BillingTypeParameter = Annotated[
Optional[Literal["RESERVED", "ON_DEMAND"]], Parameter(help="Billing type to use for the cluster")
Optional[Literal["RESERVED", "ON_DEMAND", "SCHEDULED_CAPACITY"]],
Parameter(help="Billing type to use for the cluster"),
]
NvidiaDriverVersionParameter = Annotated[Optional[str], Parameter(help="Nvidia driver version to use for the cluster")]
CudaVersionParameter = Annotated[Optional[str], Parameter(help="CUDA version to use for the cluster")]
Expand All @@ -27,6 +28,26 @@
]
ClusterTypeParameter = Annotated[Optional[Literal["KUBERNETES", "SLURM"]], Parameter(help="Cluster type")]
VolumeParameter = Annotated[Optional[str], Parameter(help="Storage volume ID to use for the cluster")]
AutoScaleParameter = Annotated[Optional[bool], Parameter(help="Enable cluster auto-scaling")]
AutoScaleMaxGpusParameter = Annotated[Optional[int], Parameter(help="Maximum GPUs for auto-scaling")]
AutoScaledParameter = Annotated[Optional[bool], Parameter(help="Enable workload-based GPU auto-scaling")]
CapacityPoolIDParameter = Annotated[Optional[str], Parameter(help="Capacity pool ID to use for the cluster")]
GpuNodeFailoverEnabledParameter = Annotated[
Optional[bool], Parameter(help="Enable automated GPU node failover for the cluster")
]
InstallTraefikParameter = Annotated[Optional[bool], Parameter(help="Install Traefik ingress controller")]
NumCapacityPoolGpusParameter = Annotated[
Optional[int], Parameter(help="Number of GPUs to allocate from a capacity pool")
]
NumPreemptibleGpusParameter = Annotated[Optional[int], Parameter(help="Number of preemptible GPUs to request")]
NumReservedGpusParameter = Annotated[Optional[int], Parameter(help="Number of prepaid reserved GPUs to request")]
ProjectIDParameter = Annotated[Optional[str], Parameter(help="Project ID for the cluster")]
ReservationEndTimeParameter = Annotated[Optional[str], Parameter(help="Reservation end time for scheduled capacity")]
ReservationStartTimeParameter = Annotated[
Optional[str], Parameter(help="Reservation start time for scheduled capacity")
]
SlurmImageParameter = Annotated[Optional[str], Parameter(help="Custom Slurm image for Slurm clusters")]
SlurmShmSizeGibParameter = Annotated[Optional[int], Parameter(help="Shared memory size in GiB for Slurm clusters")]


async def create(
Expand All @@ -40,6 +61,20 @@ async def create(
gpu_type: GpuTypeParameter = None,
cluster_type: ClusterTypeParameter = None,
volume: VolumeParameter = None,
auto_scale: AutoScaleParameter = None,
auto_scale_max_gpus: AutoScaleMaxGpusParameter = None,
auto_scaled: AutoScaledParameter = None,
capacity_pool_id: CapacityPoolIDParameter = None,
gpu_node_failover_enabled: GpuNodeFailoverEnabledParameter = None,
install_traefik: InstallTraefikParameter = None,
num_capacity_pool_gpus: NumCapacityPoolGpusParameter = None,
num_preemptible_gpus: NumPreemptibleGpusParameter = None,
num_reserved_gpus: NumReservedGpusParameter = None,
project_id: ProjectIDParameter = None,
reservation_end_time: ReservationEndTimeParameter = None,
reservation_start_time: ReservationStartTimeParameter = None,
slurm_image: SlurmImageParameter = None,
slurm_shm_size_gib: SlurmShmSizeGibParameter = None,
*,
config: CLIConfigParameter,
) -> None:
Expand All @@ -57,6 +92,34 @@ async def create(
)
if volume:
params["volume_id"] = volume
if auto_scale is not None:
params["auto_scale"] = auto_scale
if auto_scale_max_gpus is not None:
params["auto_scale_max_gpus"] = auto_scale_max_gpus
if auto_scaled is not None:
params["auto_scaled"] = auto_scaled
if capacity_pool_id:
params["capacity_pool_id"] = capacity_pool_id
if gpu_node_failover_enabled is not None:
params["gpu_node_failover_enabled"] = gpu_node_failover_enabled
if install_traefik is not None:
params["install_traefik"] = install_traefik
if num_capacity_pool_gpus is not None:
params["num_capacity_pool_gpus"] = num_capacity_pool_gpus
if num_preemptible_gpus is not None:
params["num_preemptible_gpus"] = num_preemptible_gpus
if num_reserved_gpus is not None:
params["num_reserved_gpus"] = num_reserved_gpus
if project_id:
params["project_id"] = project_id
if reservation_end_time:
params["reservation_end_time"] = reservation_end_time
if reservation_start_time:
params["reservation_start_time"] = reservation_start_time
if slurm_image:
params["slurm_image"] = slurm_image
if slurm_shm_size_gib is not None:
params["slurm_shm_size_gib"] = slurm_shm_size_gib

# JSON Mode skips hand holding through the argument setup
if not config.json and not config.non_interactive:
Expand Down
8 changes: 7 additions & 1 deletion src/together/lib/cli/api/beta/clusters/storage/create.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

from typing import Annotated
from typing import Optional, Annotated

from cyclopts import Parameter

from together import omit
from together._utils._json import openapi_dumps
from together.lib.cli.utils.config import CLIConfigParameter
from together.lib.cli.utils._console import console
Expand All @@ -13,6 +14,10 @@ async def create(
region: Annotated[str, Parameter(help="Region to create the storage volume in")],
size_tib: Annotated[int, Parameter(help="Size of the storage volume in TiB")],
volume_name: Annotated[str, Parameter(help="Name of the storage volume")],
is_lifecycle_independent: Annotated[
Optional[bool],
Parameter(help="Keep the storage volume after cluster decommissioning"),
] = None,
*,
config: CLIConfigParameter,
) -> None:
Expand All @@ -21,6 +26,7 @@ async def create(
region=region,
size_tib=size_tib,
volume_name=volume_name,
is_lifecycle_independent=is_lifecycle_independent if is_lifecycle_independent is not None else omit,
)

if config.json:
Expand Down
7 changes: 4 additions & 3 deletions src/together/lib/cli/api/beta/clusters/storage/update.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
from __future__ import annotations

from typing import Annotated
from typing import Optional, Annotated

from cyclopts import Parameter

from together import omit
from together._utils._json import openapi_dumps
from together.lib.cli.utils.config import CLIConfigParameter
from together.lib.cli.utils._console import console


async def update(
volume_id: str,
size_tib: Annotated[int, Parameter(help="New size of the storage volume in TiB")],
size_tib: Annotated[Optional[int], Parameter(help="New size of the storage volume in TiB")] = None,
*,
config: CLIConfigParameter,
) -> None:
"""Update a storage volume (resize)."""
response = await config.client.beta.clusters.storage.update(
volume_id=volume_id,
size_tib=size_tib,
size_tib=size_tib if size_tib is not None else omit,
)

if config.json:
Expand Down
15 changes: 15 additions & 0 deletions src/together/lib/cli/api/beta/clusters/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ async def update(
cluster_type: Annotated[
Optional[Literal["KUBERNETES", "SLURM"]], Parameter(help="Type of cluster to update")
] = None,
num_preemptible_gpus: Annotated[
Optional[int],
Parameter(help="Desired number of preemptible GPUs for the cluster"),
] = None,
num_reserved_gpus: Annotated[
Optional[int],
Parameter(help="Desired number of reserved GPUs for the cluster"),
] = None,
reservation_end_time: Annotated[
Optional[str],
Parameter(help="Timestamp at which the cluster should be decommissioned"),
] = None,
*,
config: CLIConfigParameter,
) -> None:
Expand All @@ -29,6 +41,9 @@ async def update(
cluster_id,
num_gpus=num_gpus if num_gpus is not None else omit,
cluster_type=cluster_type if cluster_type is not None else omit,
num_preemptible_gpus=num_preemptible_gpus if num_preemptible_gpus is not None else omit,
num_reserved_gpus=num_reserved_gpus if num_reserved_gpus is not None else omit,
reservation_end_time=reservation_end_time or omit,
),
)

Expand Down
112 changes: 111 additions & 1 deletion tests/cli/test_beta_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,75 @@ def test_create_non_interactive_posts_expected_body(self, respx_mock: MockRouter
assert body["billing_type"] == "ON_DEMAND"
assert result.exit_code == 0

@pytest.mark.respx(base_url=base_url)
def test_create_accepts_new_cluster_params(self, respx_mock: MockRouter, cli_runner: CliRunner) -> None:
created = _cluster_body("new-id", "scheduled")
route = respx_mock.post("/compute/clusters").mock(return_value=httpx.Response(200, json=created))
result = cli_runner.invoke(
[
"beta",
"clusters",
"create",
"--non-interactive",
"--cluster-type",
"SLURM",
"--gpu-type",
"H100_SXM",
"--nvidia-driver-version",
"565",
"--cuda-version",
"12.6",
"--region",
"us-central-8",
"--num-gpus",
"8",
"--billing-type",
"SCHEDULED_CAPACITY",
"--name",
"scheduled",
"--auto-scale",
"--auto-scale-max-gpus",
"16",
"--capacity-pool-id",
"pool-1",
"--gpu-node-failover-enabled",
"--install-traefik",
"--num-capacity-pool-gpus",
"8",
"--num-preemptible-gpus",
"8",
"--num-reserved-gpus",
"8",
"--project-id",
"proj-1",
"--reservation-start-time",
"2026-06-01T00:00:00Z",
"--reservation-end-time",
"2026-06-02T00:00:00Z",
"--slurm-image",
"slurm:latest",
"--slurm-shm-size-gib",
"32",
],
)

body = json.loads(cast(Call, route.calls[0]).request.content.decode())
assert body["billing_type"] == "SCHEDULED_CAPACITY"
assert body["auto_scale"] is True
assert body["auto_scale_max_gpus"] == 16
assert body["capacity_pool_id"] == "pool-1"
assert body["gpu_node_failover_enabled"] is True
assert body["install_traefik"] is True
assert body["num_capacity_pool_gpus"] == 8
assert body["num_preemptible_gpus"] == 8
assert body["num_reserved_gpus"] == 8
assert body["project_id"] == "proj-1"
assert body["reservation_start_time"] == "2026-06-01T00:00:00Z"
assert body["reservation_end_time"] == "2026-06-02T00:00:00Z"
assert body["slurm_image"] == "slurm:latest"
assert body["slurm_shm_size_gib"] == 32
assert result.exit_code == 0


class TestBetaClustersUpdate:
@pytest.mark.respx(base_url=base_url)
Expand All @@ -176,6 +245,31 @@ def test_update_json_triggers_put_and_second_get(self, respx_mock: MockRouter, c
assert put_body["cluster_type"] == "SLURM"
assert result.exit_code == 0

@pytest.mark.respx(base_url=base_url)
def test_update_accepts_new_cluster_params(self, respx_mock: MockRouter, cli_runner: CliRunner) -> None:
updated = _cluster_body("c1", num_gpus=16)
put = respx_mock.put("/compute/clusters/c1").mock(return_value=httpx.Response(200, json=updated))
result = cli_runner.invoke(
[
"beta",
"clusters",
"update",
"c1",
"--num-preemptible-gpus",
"8",
"--num-reserved-gpus",
"16",
"--reservation-end-time",
"2026-06-02T00:00:00Z",
],
)

put_body = json.loads(cast(Call, put.calls[0]).request.content.decode())
assert put_body["num_preemptible_gpus"] == 8
assert put_body["num_reserved_gpus"] == 16
assert put_body["reservation_end_time"] == "2026-06-02T00:00:00Z"
assert result.exit_code == 0


class TestBetaClustersDelete:
@pytest.mark.respx(base_url=base_url)
Expand Down Expand Up @@ -234,13 +328,29 @@ def test_storage_create_json(self, respx_mock: MockRouter, cli_runner: CliRunner
"1",
"--volume-name",
"test-volume",
"--is-lifecycle-independent",
"--json",
],
)
out = json.loads(result.output)
assert out["volume_id"] == "vol-1"
raw = cast(Call, route.calls[0]).request.content.decode()
assert json.loads(raw) == {"region": "us-east-1", "size_tib": 1, "volume_name": "test-volume"}
assert json.loads(raw) == {
"region": "us-east-1",
"size_tib": 1,
"volume_name": "test-volume",
"is_lifecycle_independent": True,
}
assert result.exit_code == 0

@pytest.mark.respx(base_url=base_url)
def test_storage_update_allows_omitting_size(self, respx_mock: MockRouter, cli_runner: CliRunner) -> None:
route = respx_mock.put("/compute/clusters/storage/volumes").mock(
return_value=httpx.Response(200, json=_VOLUME_BODY)
)
result = cli_runner.invoke(["beta", "clusters", "storage", "update", "vol-1", "--json"])

assert json.loads(cast(Call, route.calls[0]).request.content.decode()) == {"volume_id": "vol-1"}
assert result.exit_code == 0

@pytest.mark.respx(base_url=base_url)
Expand Down
Loading