Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/overview/sim/sim_manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ sim_config = SimulationManagerCfg(
)
```

### multiple instance
- current instance number of `SimulationManager`: `SimulationManager.get_n_instances()`
- get specific instance: `SimulationManager.get_instance(instance_id)`, `instance_id` < `SimulationManager.get_n_instances()`

### Configuration Parameters

| Parameter | Type | Default | Description |
Expand Down
86 changes: 54 additions & 32 deletions embodichain/lab/sim/sim_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,41 +158,52 @@ class SimulationManager:
- physics simulation management, eg. time step, manual update, etc.
- interactive control via gizmo and window callbacks events.

This class implements the singleton pattern to ensure only one instance exists at a time.

Args:
sim_config (SimulationManagerCfg, optional): simulation configuration. Defaults to SimulationManagerCfg().
"""

_instance = None
_is_initialized = False
_instances = {}

SUPPORTED_SENSOR_TYPES = {
"Camera": Camera,
"StereoCamera": StereoCamera,
"ContactSensor": ContactSensor,
}

def __new__(cls, sim_config: SimulationManagerCfg = SimulationManagerCfg()):
"""Create or return the singleton instance."""
if cls._instance is None:
cls._instance = super(SimulationManager, cls).__new__(cls)
return cls._instance
def __new__(
cls,
sim_config: SimulationManagerCfg = SimulationManagerCfg()
):
"""Create or return the instance based on instance_id."""
n_instance = len(list(cls._instances.keys()))
instance = super(SimulationManager, cls).__new__(cls)
# Store sim_config in the instance for use in __init__ or elsewhere
instance.sim_config = sim_config
cls._instances[str(n_instance + 1)] = instance
return instance

def __init__(
self, sim_config: SimulationManagerCfg = SimulationManagerCfg()
self,
sim_config: SimulationManagerCfg = SimulationManagerCfg()
) -> None:
instance_id = SimulationManager.get_n_instances() + 1
# Skip initialization if already initialized
if self._is_initialized:
if hasattr(self, "_is_initialized") and self._is_initialized:
logger.log_warning(
"SimulationManager is already initialized. Skipping re-initialization. "
"Use SimulationManager.get_instance() to get the existing instance or "
"SimulationManager.reset() to create a new instance."
f"SimulationManager (id={instance_id}) is already initialized. Skipping re-initialization. "
"Use SimulationManager.get_instance(instance_id) to get the existing instance or "
"SimulationManager.reset(instance_id) to create a new instance."
)
return

if sim_config.enable_rt and instance_id > 0:
logger.log_error(
f"Ray Tracing rendering backend is only supported for single instance (instance_id=0). "
)

# Mark as initialized
SimulationManager._is_initialized = True
self._is_initialized = True
self.instance_id = instance_id
# Cache paths
self._sim_cache_dir = SIM_CACHE_DIR
self._material_cache_dir = MATERIAL_CACHE_DIR
Expand Down Expand Up @@ -278,41 +289,52 @@ def __init__(
self._build_multiple_arenas(sim_config.num_envs)

@classmethod
def get_instance(cls) -> SimulationManager:
"""Get the singleton instance of SimulationManager.
def get_instance(cls, instance_id: int = 0) -> SimulationManager:
"""Get the instance of SimulationManager by id.

Args:
instance_id (int): The instance id. Defaults to 0.

Returns:
SimulationManager: The singleton instance.
SimulationManager: The instance.

Raises:
RuntimeError: If the instance has not been created yet.
"""
if cls._instance is None:
if instance_id not in cls._instances:
logger.log_error(
"SimulationManager has not been instantiated yet. "
"Create an instance first using SimulationManager(sim_config)."
f"SimulationManager (id={instance_id}) has not been instantiated yet. "
f"Create an instance first using SimulationManager(sim_config, instance_id={instance_id})."
)
return cls._instance
return cls._instances[instance_id]

@classmethod
def get_n_instances(cls) -> int:
"""Get the number of instantiated SimulationManager instances.

Returns:
int: The number of instances.
"""
return len(cls._instances)

@classmethod
def reset(cls) -> None:
"""Reset the singleton instance.
def reset(cls, instance_id: int = 0) -> None:
"""Reset the instance.

This allows creating a new instance with different configuration.
"""
if cls._instance is not None:
logger.log_info("Resetting SimulationManager singleton instance.")
cls._instance = None
cls._is_initialized = False
if instance_id in cls._instances:
logger.log_info(f"Resetting SimulationManager instance {instance_id}.")
del cls._instances[instance_id]

@classmethod
def is_instantiated(cls) -> bool:
"""Check if the singleton instance has been created.
def is_instantiated(cls, instance_id: int = 0) -> bool:
"""Check if the instance has been created.

Returns:
bool: True if the instance exists, False otherwise.
"""
return cls._instance is not None
return instance_id in cls._instances

@property
def num_envs(self) -> int:
Expand Down Expand Up @@ -1560,4 +1582,4 @@ def destroy(self) -> None:
self._env.clean()
self._world.quit()

self.reset()
SimulationManager.reset(self.instance_id)