Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ requires-python = ">=3.10"
"import-linter~=2.10",
"pytest-deadfixtures~=3.1",
"taplo~=0.9.3",
"gymnasium~=1.2",
]
rl = ["gymnasium~=1.2"]
docs = [
"sphinx~=8.1",
"nvidia-sphinx-theme~=0.0.8",
Expand Down
34 changes: 31 additions & 3 deletions src/cloudai/cli/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ def prepare_installation(
return installables, installer


def _run_custom_training_loop(agent: object, agent_type: str) -> int:
"""Delegate to an agent's own training loop (e.g. RLlib PPO)."""
logging.info(f"Agent {agent_type} uses a custom training loop, delegating to agent.train()")
try:
agent.train() # type: ignore[union-attr]
return 0
except Exception as e:
logging.error(f"Agent training failed for {agent_type}: {e}", exc_info=True)
return 1
finally:
if hasattr(agent, "shutdown"):
agent.shutdown() # type: ignore[union-attr]


def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:
registry = Registry()

Expand Down Expand Up @@ -151,15 +165,29 @@ def handle_dse_job(runner: Runner, args: argparse.Namespace) -> int:

agent = agent_class(env, agent_config)

if getattr(agent, "HAS_CUSTOM_TRAINING_LOOP", False):
err |= _run_custom_training_loop(agent, agent_type)
continue

observation, _ = env.reset()

for step in range(agent.max_steps):
result = agent.select_action()
result = agent.select_action(observation=observation)
if result is None:
break
step, action = result
env.test_run.step = step
logging.info(f"Running step {step} (of {agent.max_steps}) with action {action}")
observation, reward, *_ = env.step(action)
feedback = {"trial_index": step, "value": reward}
prev_obs = observation
observation, reward, done, *_ = env.step(action)
feedback = {
"trial_index": step,
"value": reward,
"observation": observation,
"prev_observation": prev_obs,
"action": action,
"done": done,
}
agent.update_policy(feedback)
logging.info(f"Step {step}: Observation: {[round(obs, 4) for obs in observation]}, Reward: {reward:.4f}")

Expand Down
2 changes: 2 additions & 0 deletions src/cloudai/configurator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from .base_gym import BaseGym
from .cloudai_gym import CloudAIGymEnv, TrajectoryEntry
from .grid_search import GridSearchAgent
from .gymnasium_adapter import GymnasiumAdapter

__all__ = [
"BaseAgent",
"BaseGym",
"CloudAIGymEnv",
"GridSearchAgent",
"GymnasiumAdapter",
"TrajectoryEntry",
]
7 changes: 5 additions & 2 deletions src/cloudai/configurator/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Any, Dict, Literal
from typing import Any, Dict, Literal, Optional

from pydantic import BaseModel, ConfigDict

Expand Down Expand Up @@ -68,10 +68,13 @@ def configure(self, config: dict[str, Any]) -> None:
pass

@abstractmethod
def select_action(self) -> tuple[int, dict[str, Any]]:
def select_action(self, observation: Optional[list] = None) -> tuple[int, dict[str, Any]]:
"""
Select an action from the action space.

Args:
observation: Optional environment observation from the previous step.

Returns:
Tuple[int, Dict[str, Any]]: The current step index and a dictionary mapping action keys to selected values.
"""
Expand Down
7 changes: 4 additions & 3 deletions src/cloudai/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ def define_observation_space(self) -> list:
Define the observation space for the environment.

Returns:
list: The observation space.
list: One float per agent metric, giving the correct shape.
"""
return [0.0]
n_metrics = max(len(self.test_run.test.agent_metrics), 1)
return [0.0] * n_metrics

def reset(
self,
Expand All @@ -97,7 +98,7 @@ def reset(
if seed is not None:
lazy.np.random.seed(seed)
self.test_run.current_iteration = 0
observation = [0.0]
observation = self.define_observation_space()
info = {}
return observation, info

Expand Down
4 changes: 2 additions & 2 deletions src/cloudai/configurator/grid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.

import itertools
from typing import Any, Dict, List, Tuple
from typing import Any, Dict, List, Optional, Tuple

from .base_agent import BaseAgent, BaseAgentConfig
from .cloudai_gym import CloudAIGymEnv
Expand Down Expand Up @@ -71,7 +71,7 @@ def get_all_combinations(self) -> List[Dict[str, Any]]:
keys = list(self.action_space.keys())
return [dict(zip(keys, combination, strict=True)) for combination in self.action_combinations]

def select_action(self) -> Tuple[int, Dict[str, Any]]:
def select_action(self, observation: Optional[list] = None) -> Tuple[int, Dict[str, Any]]:
"""
Select the next action from the grid.

Expand Down
95 changes: 95 additions & 0 deletions src/cloudai/configurator/gymnasium_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import Any, ClassVar

from .base_gym import BaseGym


def _import_gymnasium():
"""Import gymnasium lazily; raise a clear error when it is absent."""
try:
import gymnasium
from gymnasium import spaces

return gymnasium, spaces
except ImportError as exc:
raise ImportError("gymnasium is required for GymnasiumAdapter. Install it with: pip install gymnasium") from exc


class GymnasiumAdapter:
"""
Wrap a CloudAI BaseGym environment as a standard gymnasium.Env.

gymnasium is imported lazily so it remains an optional dependency.
"""

metadata: ClassVar[dict[str, Any]] = {"render_modes": ["human"]}

def __init__(self, env: BaseGym) -> None:
import numpy as np

gymnasium, spaces = _import_gymnasium()

gymnasium.Env.__init__(self)

self._np = np
self._env = env

raw_action_space = env.define_action_space()
self._tunable_params: dict[str, list] = {k: v for k, v in raw_action_space.items() if len(v) > 1}
self._fixed_params: dict[str, Any] = {k: v[0] for k, v in raw_action_space.items() if len(v) == 1}

self.action_space = spaces.Dict({k: spaces.Discrete(len(v)) for k, v in self._tunable_params.items()})

obs = env.define_observation_space()
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(len(obs),), dtype=np.float32)

def reset(
self,
*,
seed: int | None = None,
options: dict[str, Any] | None = None,
) -> tuple[Any, dict[str, Any]]:
"""Reset the environment and return (observation, info)."""
obs, info = self._env.reset(seed=seed, options=options)
return self._np.asarray(obs, dtype=self._np.float32), info

def step(self, action: dict[str, int]) -> tuple[Any, float, bool, bool, dict[str, Any]]:
"""Execute one step and return the gymnasium 5-tuple."""
decoded = {**self._fixed_params, **self.decode_action(action)}
obs, reward, done, info = self._env.step(decoded)
return self._np.asarray(obs, dtype=self._np.float32), float(reward), bool(done), False, info

def step_raw(self, param_dict: dict[str, Any]) -> tuple[Any, float, bool, bool, dict[str, Any]]:
"""Execute one step with a pre-decoded parameter dictionary."""
obs, reward, done, info = self._env.step(param_dict)
return self._np.asarray(obs, dtype=self._np.float32), float(reward), bool(done), False, info

def decode_action(self, action: dict[str, int]) -> dict[str, Any]:
"""Map discrete indices back to the original parameter values."""
return {k: self._tunable_params[k][idx] for k, idx in action.items()}

def render(self) -> None:
"""Render the underlying environment."""
self._env.render()

@property
def unwrapped(self) -> BaseGym:
"""Return the wrapped CloudAI BaseGym instance."""
return self._env
Loading
Loading