Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/scenic/simulators/carla/model.scenic
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ from scenic.simulators.utils.colors import Color
# Sensor imports
from scenic.simulators.carla.sensors import CarlaRGBSensor as RGBSensor
from scenic.simulators.carla.sensors import CarlaSSSensor as SSSensor
from scenic.simulators.carla.sensors import CarlaCollisionSensor as CollisionSensor

try:
from scenic.simulators.carla.simulator import CarlaSimulator # for use in scenarios
Expand Down
69 changes: 69 additions & 0 deletions src/scenic/simulators/carla/sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,72 @@ def process(self, data):
array = array[:, :, 2] # Take only R

return array.copy()


class CarlaCollisionSensor(CallbackSensor):
"""Collision sensor that detects collisions with other actors/objects."""

blueprint = "sensor.other.collision"

def __init__(self):
super().__init__()
self.collision_history = []
self.has_collision = False
self.last_collision_intensity = 0.0
self.last_collision_data = None
self.frame = 0
self._initialized = False
self._is_event_based = True

self.offset = (0, 0, 0)
self.rotation = (0, 0, 0)
self.attributes = {}
self.convert = None
self.carla_sensor = None

def onData(self, event):
self.frame = event.frame
self._initialized = True

impulse = event.normal_impulse
intensity = np.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2)

collision_info = {
"frame": event.frame,
"timestamp": event.timestamp,
"intensity": intensity,
"other_actor": event.other_actor.type_id if event.other_actor else None,
"impulse": (impulse.x, impulse.y, impulse.z),
}

self.collision_history.append(collision_info)
self.has_collision = True
self.last_collision_intensity = intensity
self.last_collision_data = collision_info

def process(self, data):
return self.last_collision_data

def getObservation(self):
return {
"has_collision": self.has_collision,
"last_collision_intensity": self.last_collision_intensity,
"last_collision_data": self.last_collision_data,
"collision_history": self.collision_history,
}

def updateFrame(self, frame):
"""Advance frame number for event-based sensors that didn't fire this tick."""
if not self._initialized:
self._initialized = True
if self.frame < frame:
self.frame = frame

def reset(self):
"""Reset collision state between simulations."""
self.collision_history = []
self.has_collision = False
self.last_collision_intensity = 0.0
self.last_collision_data = None
self.frame = 0
self._initialized = False
20 changes: 17 additions & 3 deletions src/scenic/simulators/carla/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from scenic.core.simulators import SimulationCreationError
from scenic.domains.driving.simulators import DrivingSimulation, DrivingSimulator
from scenic.simulators.carla.blueprints import oldBlueprintNames
from scenic.simulators.carla.sensors import CarlaCollisionSensor
import scenic.simulators.carla.utils.utils as utils
import scenic.simulators.carla.utils.visuals as visuals
from scenic.syntax.veneer import verbosePrint
Expand Down Expand Up @@ -250,6 +251,11 @@ def createObjectInSimulator(self, obj):
)
obj.carlaController = controller

# Auto-attach a collision sensor to the ego so users can check
# ego.sensors['collision'].has_collision without explicit declaration.
if obj is self.scene.egoObject and "collision" not in obj.sensors:
obj.sensors["collision"] = CarlaCollisionSensor()

# Adding sensors if available
if obj.sensors:
for sensor_key, sensor in obj.sensors.items():
Expand Down Expand Up @@ -301,12 +307,18 @@ def step(self):
# Run simulation for one timestep
self.current_frame = self.world.tick()

# Wait for sensors to get updates
# Wait for sensors to deliver this frame's data. Event-based sensors
# (e.g. collision) don't fire every tick, so just bump their frame
# number rather than blocking on it.
for obj in self.objects:
if obj.sensors:
for sensor in obj.sensors.values():
while sensor.frame != self.current_frame:
pass
if getattr(sensor, "_is_event_based", False):
if hasattr(sensor, "updateFrame"):
sensor.updateFrame(self.current_frame)
else:
while sensor.frame != self.current_frame:
pass

# Render simulation
if self.render:
Expand Down Expand Up @@ -352,6 +364,8 @@ def destroy(self):
if sensor.carla_sensor is not None and sensor.carla_sensor.is_alive:
sensor.carla_sensor.stop()
sensor.carla_sensor.destroy()
if hasattr(sensor, "reset"):
sensor.reset()
if obj.carlaActor is not None:
if isinstance(obj.carlaActor, carla.Vehicle):
obj.carlaActor.set_autopilot(False, self.tm.get_port())
Expand Down
28 changes: 28 additions & 0 deletions src/scenic/simulators/carla/utils/visuals.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,25 @@ def tick(self, world, ego, clock, showLabels=True):
"Height: % 18.0f m" % t.location.z,
]

collision = (
getattr(ego, "sensors", {}).get("collision")
if hasattr(ego, "sensors")
else None
)
if collision is not None:
if getattr(collision, "has_collision", False):
intensity = getattr(collision, "last_collision_intensity", 0.0)
data = getattr(collision, "last_collision_data", None)
other = data.get("other_actor") if isinstance(data, dict) else None
value = (
f"hit {other} ({intensity:.0f})"
if other
else f"detected ({intensity:.0f})"
)
self._info_text.append(("Collision:", value, (255, 80, 80)))
else:
self._info_text.append(("Collision:", "none", (80, 220, 80)))

try:
_control_text = [
"",
Expand Down Expand Up @@ -158,6 +177,15 @@ def render(self, display):
item = None
v_offset += 18
elif isinstance(item, tuple):
if len(item) == 3 and isinstance(item[2], tuple):
label_surface = self._font_mono.render(item[0], True, (255, 255, 255))
display.blit(label_surface, (8, v_offset))
value_surface = self._font_mono.render(
" " + str(item[1]), True, item[2]
)
display.blit(value_surface, (8 + label_surface.get_width(), v_offset))
v_offset += 18
continue
if isinstance(item[1], bool):
rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6))
pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1)
Expand Down
Loading