Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions tools/ncore_vis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

load("@ncore_pip_deps//:requirements.bzl", "requirement")
load("@rules_python//python:defs.bzl", "py_binary", "py_library")
load("//bazel/pytest:defs.bzl", "pytest_test")

package(default_visibility = ["//visibility:public"])

Expand All @@ -31,12 +32,35 @@ py_library(
],
)

py_library(
name = "pylib_tracks",
srcs = [
"tracks.py",
],
deps = [
"//ncore/impl/common:pylib_transformations",
"//ncore/impl/data:pylib_types",
requirement("numpy"),
],
)

pytest_test(
name = "pytest_tracks",
srcs = ["tracks_test.py"],
deps = [
":pylib_tracks",
"//ncore/impl/data:pylib_types",
requirement("numpy"),
],
)

py_library(
name = "pylib_data_loader",
srcs = [
"data_loader.py",
],
deps = [
":pylib_tracks",
"//ncore/impl/common:pylib_transformations",
"//ncore/impl/data:pylib_compat",
"//ncore/impl/data:pylib_types",
Expand Down
120 changes: 77 additions & 43 deletions tools/ncore_vis/components/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from scipy.spatial.transform import Rotation as RotLib

from ncore.impl.common.transformations import HalfClosedInterval, se3_inverse, transform_point_cloud
from ncore.impl.common.transformations import HalfClosedInterval, transform_point_cloud
from ncore.impl.data.types import FrameTimepoint, LabelSource
from ncore.impl.sensors.camera import CameraModel
from tools.ncore_vis.components.base import VisualizationComponent, register_component
Expand Down Expand Up @@ -443,11 +443,13 @@ def _update_camera(self, camera_id: str) -> None:
if label := self._labels.pop(camera_id, None):
label.remove()

frame = self._frame_sliders[camera_id].value
frame_idx = self._frame_sliders[camera_id].value
visible = self._visible[camera_id]

cam = self.data_loader.get_camera_sensor(camera_id)
T_camera_world = cam.get_frames_T_sensor_target(self.data_loader.world_frame_id, frame, FrameTimepoint.END)
T_camera_world = cam.get_frames_T_sensor_target(
self.data_loader.world_frame_id, frame_idx, FrameTimepoint.END
)
position, wxyz = se3_to_position_wxyz(T_camera_world)

# Pose frame
Expand All @@ -462,25 +464,25 @@ def _update_camera(self, camera_id: str) -> None:
self._poses[camera_id] = pose_handle

# Image (with optional overlays)
image = cam.get_frame_image_array(frame)
image = cam.get_frame_image_array(frame_idx)

if self._project_lidar:
try:
image = self._overlay_lidar_projection(camera_id, frame, image)
image = self._overlay_lidar_projection(camera_id, frame_idx, image)
except Exception:
logger.debug("Lidar projection overlay failed for %s frame %d", camera_id, frame, exc_info=True)
logger.debug("Lidar projection overlay failed for %s frame %d", camera_id, frame_idx, exc_info=True)

if self._overlay_cuboids:
try:
image = self._overlay_cuboids_on_image(camera_id, frame, image)
image = self._overlay_cuboids_on_image(camera_id, frame_idx, image)
except Exception:
logger.debug("Cuboid overlay failed for %s frame %d", camera_id, frame, exc_info=True)
logger.debug("Cuboid overlay failed for %s frame %d", camera_id, frame_idx, exc_info=True)

if self._show_mask:
try:
image = self._overlay_mask(camera_id, image)
except Exception:
logger.debug("Mask overlay failed for %s frame %d", camera_id, frame, exc_info=True)
logger.debug("Mask overlay failed for %s frame %d", camera_id, frame_idx, exc_info=True)

frustum_handle = self.client.scene.add_camera_frustum(
f"/cameras/{camera_id}/pose/frustum",
Expand Down Expand Up @@ -557,12 +559,12 @@ def _overlay_mask(self, camera_id: str, image: np.ndarray) -> np.ndarray:
# Lidar projection overlay
# ------------------------------------------------------------------

def _overlay_lidar_projection(self, camera_id: str, frame: int, image: np.ndarray) -> np.ndarray:
def _overlay_lidar_projection(self, camera_id: str, frame_idx: int, image: np.ndarray) -> np.ndarray:
"""Project a lidar point cloud onto the camera image with range-based coloring.

Args:
camera_id: Camera sensor to project onto.
frame: Camera frame index.
frame_idx: Camera frame index.
image: RGB image array (H, W, 3), uint8.

Returns:
Expand All @@ -577,19 +579,21 @@ def _overlay_lidar_projection(self, camera_id: str, frame: int, image: np.ndarra
camera_model = self._camera_models[camera_id]

# Find closest lidar frame to the camera frame (by center-of-frame timestamp)
cam_interval = self.data_loader.get_sensor_frame_interval_us(camera_id, frame)
cam_interval = self.data_loader.get_sensor_frame_interval_us(camera_id, frame_idx)
cam_center_us = cam_interval.start + (cam_interval.end - cam_interval.start) // 2
lidar_frame = lidar_sensor.get_closest_frame_index(cam_center_us, relative_frame_time=0.5)
lidar_frame_idx = lidar_sensor.get_closest_frame_index(cam_center_us, relative_frame_time=0.5)

# Load point cloud and transform to world coordinates
pc_sensor = lidar_sensor.get_frame_point_cloud(lidar_frame, motion_compensation=True, with_start_points=False)
pc_sensor = lidar_sensor.get_frame_point_cloud(
lidar_frame_idx, motion_compensation=True, with_start_points=False
)
world_id = self.data_loader.world_frame_id
T_lidar_world = lidar_sensor.get_frames_T_sensor_target(world_id, lidar_frame, FrameTimepoint.END)
T_lidar_world = lidar_sensor.get_frames_T_sensor_target(world_id, lidar_frame_idx, FrameTimepoint.END)
pc_world = transform_point_cloud(pc_sensor.xyz_m_end, T_lidar_world)

# Get camera world-to-sensor transforms (T_world_sensor = inverse of T_sensor_world)
T_world_camera_start = se3_inverse(cam.get_frames_T_sensor_target(world_id, frame, FrameTimepoint.START))
T_world_camera_end = se3_inverse(cam.get_frames_T_sensor_target(world_id, frame, FrameTimepoint.END))
# Get camera world-to-sensor transforms (T_world_camera)
T_world_camera_start = cam.get_frames_T_source_sensor(world_id, frame_idx, FrameTimepoint.START)
T_world_camera_end = cam.get_frames_T_source_sensor(world_id, frame_idx, FrameTimepoint.END)

# Project world points to image coordinates
mode = self._project_mode
Expand Down Expand Up @@ -635,6 +639,7 @@ def _project_points(
T_world_camera_start: np.ndarray,
T_world_camera_end: np.ndarray,
mode: str,
return_all_projections: bool = False,
) -> CameraModel.WorldPointsToImagePointsReturn:
"""Project world points using the specified projection mode."""
if mode == "rolling-shutter":
Expand All @@ -644,6 +649,7 @@ def _project_points(
T_world_camera_end,
return_valid_indices=True,
return_T_world_sensors=True,
return_all_projections=return_all_projections,
)
if mode == "mean":
return camera_model.world_points_to_image_points_mean_pose(
Expand All @@ -652,35 +658,41 @@ def _project_points(
T_world_camera_end,
return_valid_indices=True,
return_T_world_sensors=True,
return_all_projections=return_all_projections,
)
if mode == "start":
return camera_model.world_points_to_image_points_static_pose(
pc_world,
T_world_camera_start,
return_valid_indices=True,
return_T_world_sensors=True,
return_all_projections=return_all_projections,
)
# "end"
return camera_model.world_points_to_image_points_static_pose(
pc_world,
T_world_camera_end,
return_valid_indices=True,
return_T_world_sensors=True,
return_all_projections=return_all_projections,
)

# ------------------------------------------------------------------
# Cuboid overlay projection
# ------------------------------------------------------------------

def _overlay_cuboids_on_image(self, camera_id: str, frame: int, image: np.ndarray) -> np.ndarray:
"""Project 3D cuboid edges onto the camera image using rolling-shutter-aware projection.
def _overlay_cuboids_on_image(self, camera_id: str, frame_idx: int, image: np.ndarray) -> np.ndarray:
"""Project 3D cuboid edges onto the camera image, interpolated to the mid-of-frame time.

Cuboid observations are queried at the scene's reference timestamp in world
coordinates and projected onto the camera image.
Each cuboid track is interpolated to the camera frame's mid-of-frame timestamp so
that the projected box position reflects the object's estimated location at the
moment the camera was actually exposing. The interpolated observation is then
transformed to world coordinates and projected using the shared projection mode
(rolling-shutter / mean / start / end).

Args:
camera_id: Camera sensor to project onto.
frame: Camera frame index.
frame_idx: Current frame index for this camera.
image: RGB image array (H, W, 3), uint8.

Returns:
Expand All @@ -690,50 +702,72 @@ def _overlay_cuboids_on_image(self, camera_id: str, frame: int, image: np.ndarra
camera_model = self._camera_models[camera_id]

world_id = self.data_loader.world_frame_id
T_world_sensor_start = cam.get_frames_T_source_sensor(world_id, frame, FrameTimepoint.START)
T_world_sensor_end = cam.get_frames_T_source_sensor(world_id, frame, FrameTimepoint.END)
timestamp_start_us = cam.get_frame_timestamp_us(frame, FrameTimepoint.START)
timestamp_end_us = cam.get_frame_timestamp_us(frame, FrameTimepoint.END)

cuboid_source = self._cuboid_source
pose_graph = self.data_loader.pose_graph

output_image = image.copy()
image_height, image_width = output_image.shape[:2]
image_rect = (0, 0, image_width, image_height)

# Query cuboid observations in world coordinates for the reference frame's time window.
interval_us = self.renderer.reference_frame_interval_us
observations = self.data_loader.get_cuboid_observations_in_world(interval_us, cuboid_source)
# Approximate the track / camera association with mid-frame interpolation
timestamp_start_us = cam.get_frame_timestamp_us(frame_idx, FrameTimepoint.START)
timestamp_end_us = cam.get_frame_timestamp_us(frame_idx, FrameTimepoint.END)
mid_timestamp_us = (timestamp_start_us + timestamp_end_us) // 2

# Use the reference-time range as the clamp boundary so tracks are
# currently selected remain visible at the scene boundary
ref_interval = self.renderer.reference_frame_interval_us
max_clamp_us = ref_interval.stop - ref_interval.start

# Camera poses at start/end of frame for rolling-shutter-aware projection
T_world_camera_start = cam.get_frames_T_source_sensor(world_id, frame_idx, FrameTimepoint.START)
T_world_camera_end = cam.get_frames_T_source_sensor(world_id, frame_idx, FrameTimepoint.END)

# Iterate over all tracks; interpolate each to the mid-frame time.
for track in self.data_loader.get_cuboid_tracks():
# Filter by label source
if track.source.name != self._cuboid_source:
continue

if (obs := track.interpolate_at(mid_timestamp_us, max_clamp_us=max_clamp_us)) is None:
continue

for obs in observations:
# Transform the interpolated observation at mid-of-frame time to world coordinates at mid-frame time
obs = obs.transform(
target_frame_id=world_id,
target_frame_timestamp_us=mid_timestamp_us,
pose_graph=pose_graph,
)
bbox = obs.bbox3
# Observations are in world coordinates — compute corners directly.

# Compute 8 corners in world coordinates
dimensions = np.array(bbox.dim, dtype=np.float32)
corners_local = _UNIT_CUBE_CORNERS * dimensions
rotation = RotLib.from_euler("XYZ", bbox.rot).as_matrix().astype(np.float32)
translation = np.array(bbox.centroid, dtype=np.float32)
corners_world = (corners_local @ rotation.T + translation).astype(np.float32)

projection = camera_model.world_points_to_image_points_shutter_pose(
torch.from_numpy(corners_world),
T_world_sensor_start,
T_world_sensor_end,
start_timestamp_us=int(timestamp_start_us),
end_timestamp_us=int(timestamp_end_us),
return_valid_indices=True,
# Project using the shared projection mode (rolling-shutter / mean / start / end)
projection = self._project_points(
camera_model,
corners_world,
T_world_camera_start,
T_world_camera_end,
self._project_mode,
return_all_projections=True,
)

if projection.valid_indices is None or projection.image_points.shape[0] == 0:
continue

projected_pts = projection.image_points.cpu().numpy().astype(np.float32)
projected_pts = projection.image_points.cpu().numpy()
valid_mask = np.zeros(projected_pts.shape[0], dtype=bool)
valid_mask[projection.valid_indices.cpu().numpy().astype(np.int32)] = True
valid_mask[projection.valid_indices.cpu().numpy()] = True

# Deterministic color per class
line_color = self.renderer.get_class_color(obs.class_id)

# Draw the 12 edges of the cuboid if either corner is valid (visible);
# use OpenCV's clipLine to handle partially visible edges
for corner_a, corner_b in _CUBOID_EDGES:
if not (valid_mask[corner_a] or valid_mask[corner_b]):
continue
Expand Down
4 changes: 3 additions & 1 deletion tools/ncore_vis/components/cuboids.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ def _update_cuboids(self) -> None:
visible = self._enabled
show_labels = self._show_labels

observations = self.data_loader.get_cuboid_observations_in_world(interval_us, source_filter=source)
observations = self.data_loader.get_cuboid_observations_in_world(
interval_us, "end-of-interval", source_filter=source
)

# Observations are already in world coordinates.
for i, obs in enumerate(observations):
Expand Down
Loading
Loading