Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions tests/e2e/autoscaling_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Workload used by E2E autoscaling tests.

This script is submitted via Ray Job submission to generate enough queued work
to trigger Ray in-tree autoscaling on a KinD cluster.
"""

import os
import time

import ray


def main():
# Expect to run inside the Ray cluster environment (dashboard job submission)
ray.init(address="auto")

concurrency = int(os.getenv("AUTOSCALING_TASKS", "4"))
sleep_s = int(os.getenv("AUTOSCALING_TASK_SLEEP_S", "120"))

@ray.remote(num_cpus=1)
def burn_cpu():
time.sleep(sleep_s)
return True

futures = [burn_cpu.remote() for _ in range(concurrency)]
ray.get(futures)


if __name__ == "__main__":
main()
60 changes: 60 additions & 0 deletions tests/e2e/autoscaling_raycluster_sdk_kind_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pytest

from codeflare_sdk import Cluster, ClusterConfiguration

from support import *


@pytest.mark.kind
class TestRayClusterAutoscalingSDKKind:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
delete_namespace(self)

def test_autoscaling_scale_up_and_down_kind(self):
self.setup_method()
create_namespace(self)

cluster_name = f"autoscale-{random_choice()}"
ray_image = get_ray_image()

cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
enable_autoscaling=True,
min_workers=1,
max_workers=2,
head_cpu_requests="500m",
head_cpu_limits="500m",
worker_cpu_requests="500m",
worker_cpu_limits=1,
worker_memory_requests=1,
worker_memory_limits=4,
image=ray_image,
write_to_file=True,
verify_tls=False,
)
)

cluster.apply()
cluster.wait_ready(timeout=600, dashboard_check=False)

# Verify initial state: 1 worker (min_workers)
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=300)

# Trigger scale-up via load script in head pod (async)
load_proc = run_autoscaling_load_in_head_pod(self, cluster_name)

# Verify scale-up while load is still running
wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=600)

# Wait for load to finish, then verify scale-down back to min_workers
load_proc.wait(timeout=600)
assert load_proc.returncode == 0, "Load script failed"

wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600)

cluster.down()
65 changes: 65 additions & 0 deletions tests/e2e/autoscaling_raycluster_sdk_oauth_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import pytest

from codeflare_sdk import Cluster, ClusterConfiguration

from support import *


@pytest.mark.openshift
@pytest.mark.tier1
class TestRayClusterAutoscalingSDKOauth:
def setup_method(self):
initialize_kubernetes_client(self)

def teardown_method(self):
if hasattr(self, "auth_instance"):
cleanup_authentication(self.auth_instance)
delete_namespace(self)

@pytest.mark.timeout(1800)
def test_autoscaling_scale_up_and_down_openshift_oauth(self):
self.setup_method()

create_namespace(self)

ray_image = get_ray_image()
resources = get_platform_appropriate_resources()
self.auth_instance = authenticate_for_tests()

cluster_name = f"autoscale-{random_choice()}"

cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
namespace=self.namespace,
enable_autoscaling=True,
min_workers=1,
max_workers=2,
image=ray_image,
write_to_file=True,
verify_tls=False,
**resources,
)
)

cluster.apply()
wait_ready_with_stuck_detection(cluster, timeout=900, dashboard_check=False)

# Verify initial state: 1 worker (min_workers)
wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=600)

# Trigger scale-up via load script in head pod (async)
load_proc = run_autoscaling_load_in_head_pod(
self, cluster_name, tasks=2, sleep_s=180
)

# Verify scale-up while load is still running
wait_for_worker_count(self, cluster_name, lambda n: n >= 2, timeout_s=900)

# Wait for load to finish, then verify scale-down back to min_workers
load_proc.wait(timeout=600)
assert load_proc.returncode == 0, "Load script failed"

wait_for_worker_count(self, cluster_name, lambda n: n == 1, timeout_s=900)

cluster.down()
62 changes: 62 additions & 0 deletions tests/e2e/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import random
import string
import subprocess
import time
import warnings
from time import sleep

from codeflare_sdk import get_cluster
from kubernetes import client, config
from kubernetes.client import V1Toleration
Expand Down Expand Up @@ -344,6 +346,66 @@ def run_kubectl_command(args):
return None


def wait_for_worker_count(self, cluster_name, predicate, timeout_s=600):
"""Wait until the number of worker pods for cluster_name satisfies predicate."""
label = f"ray.io/node-type=worker,ray.io/cluster={cluster_name}"
start = time.time()
last = None
while time.time() - start < timeout_s:
pods = self.api_instance.list_namespaced_pod(
self.namespace, label_selector=label
)
last = len(pods.items or [])
if predicate(last):
return last
sleep(10)
raise TimeoutError(
f"Timed out waiting for worker count. cluster={cluster_name} last={last}"
)


def run_autoscaling_load_in_head_pod(self, cluster_name, tasks=2, sleep_s=120):
"""
Copy autoscaling_load.py into the head pod and run it asynchronously.
Returns the Popen handle so the caller can check for scale-up while
the workload is still running (avoids the race where blocking execution
lets workers scale back down before the assertion runs).
"""
label = f"ray.io/node-type=head,ray.io/cluster={cluster_name}"
pods = self.api_instance.list_namespaced_pod(self.namespace, label_selector=label)
if not pods.items:
raise RuntimeError(f"No head pod found for cluster {cluster_name}")
head_pod = pods.items[0].metadata.name

load_script = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "autoscaling_load.py"
)

subprocess.check_call(
[
"kubectl",
"cp",
load_script,
f"{self.namespace}/{head_pod}:/tmp/autoscaling_load.py",
]
)

return subprocess.Popen(
[
"kubectl",
"exec",
"-n",
self.namespace,
head_pod,
"--",
"bash",
"-lc",
f"AUTOSCALING_TASKS={tasks} AUTOSCALING_TASK_SLEEP_S={sleep_s} "
f"python /tmp/autoscaling_load.py",
]
)


def create_cluster_queue(self, cluster_queue, flavor):
cluster_queue_json = {
"apiVersion": "kueue.x-k8s.io/v1beta1",
Expand Down
Loading