Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/components/localization/ekf_slam/data_association.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
data_association.py

Nearest-neighbor data association with Mahalanobis distance gate.
"""

import numpy as np


def mahalanobis_distance(z_actual, z_predicted, S):
"""
Mahalanobis distance: (z_actual - z_predicted)^T @ inv(S) @ (z_actual - z_predicted)
z_actual, z_predicted: (2, 1) measurement vectors
S: (2, 2) innovation covariance
Returns: scalar distance
"""
diff = z_actual - z_predicted
# Normalize bearing difference to [-pi, pi]
diff[1, 0] = _normalize_angle(diff[1, 0])
try:
s_diff = np.linalg.solve(S, diff)
d_sq = (diff.T @ s_diff)[0, 0]
return np.sqrt(max(0, d_sq))
except np.linalg.LinAlgError:
return np.inf


def _normalize_angle(angle):
"""Normalize angle to [-pi, pi]."""
return (angle + np.pi) % (2 * np.pi) - np.pi


# Landmark id returned when observation should not be added as new (potential duplicate)
POTENTIAL_DUPLICATE = -2


def nearest_neighbor_association(observations, predicted_observations, innovation_covs,
landmark_ids, gate_threshold):
"""
For each observation, find nearest landmark within Mahalanobis gate.
observations: list of (2, 1) arrays - actual measurements
predicted_observations: list of (2, 1) arrays - one per known landmark
innovation_covs: list of (2, 2) arrays - S matrix per landmark
landmark_ids: list of landmark indices (0-based)
gate_threshold: max Mahalanobis distance to accept a match
Returns: list of (obs_index, landmark_id).
landmark_id >= 0: match to that landmark (state index).
landmark_id == -1: truly new landmark (add to state).
landmark_id == POTENTIAL_DUPLICATE (-2): within gate of an already-used landmark;
do NOT add as new (duplicate-landmark guard).
"""
num_obs = len(observations)
num_landmarks = len(landmark_ids)
matches = []
used_landmarks = set()
for o in range(num_obs):
z = observations[o]
best_landmark = -1
best_dist = gate_threshold + 1e-6
min_dist_any = np.inf # min distance to any landmark (including used)
for L in range(num_landmarks):
d = mahalanobis_distance(z, predicted_observations[L], innovation_covs[L])
min_dist_any = min(min_dist_any, d)
if landmark_ids[L] in used_landmarks:
continue
if d < best_dist:
best_dist = d
best_landmark = landmark_ids[L]
if best_landmark >= 0:
used_landmarks.add(best_landmark)
matches.append((o, best_landmark))
elif min_dist_any <= gate_threshold:
matches.append((o, POTENTIAL_DUPLICATE))
else:
matches.append((o, -1)) # truly new landmark
return matches
174 changes: 174 additions & 0 deletions src/components/localization/ekf_slam/ekf_slam_localizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
ekf_slam_localizer.py

EKF-SLAM localizer component following the ExtendedKalmanFilterLocalizer pattern.
Jointly estimates robot pose [x, y, yaw] and landmark positions [lx, ly, ...]
using range-bearing observations.
"""

import numpy as np
from math import sqrt, pi

from motion_model import predict_robot_state, jacobian_F, jacobian_G
from observation_model import observe_landmark, build_H_matrix
from data_association import nearest_neighbor_association, POTENTIAL_DUPLICATE
from landmark_manager import augment_state, initialize_landmark_from_observation


class EKFSLAMLocalizer:
"""
EKF-SLAM localizer: simultaneous localization and mapping via Extended Kalman Filter.
Interface mirrors ExtendedKalmanFilterLocalizer where possible.
"""

def __init__(self, init_x=0.0, init_y=0.0, init_yaw=0.0,
sigma_r=0.2, sigma_phi=0.1, sigma_v=0.15, sigma_omega=0.08,
gate_threshold=2.0, max_range=12.0,
duplicate_position_threshold=4.5, color='r'):
self.R_obs = np.diag([sigma_r ** 2, sigma_phi ** 2])
self.Q_input = np.diag([sigma_v ** 2, sigma_omega ** 2])
self.sigma_r = sigma_r
self.sigma_phi = sigma_phi
self.gate_threshold = gate_threshold
self.duplicate_position_threshold = duplicate_position_threshold
self.max_range = max_range
self.DRAW_COLOR = color

self.mu = np.array([[init_x], [init_y], [init_yaw]])
self.Sigma = np.eye(3) * 0.1

def predict(self, control, dt):
"""
EKF prediction step.
control: (2,1) [v, omega]
dt: time step [s]
"""
n = self.mu.shape[0]
robot_state = self.mu[0:3]
robot_pred = predict_robot_state(robot_state, control, dt)
F = jacobian_F(robot_state, control, dt)
G = jacobian_G(robot_state, control, dt)
Sigma_rr = self.Sigma[0:3, 0:3]
Sigma_rr_new = F @ Sigma_rr @ F.T + G @ self.Q_input @ G.T
self.mu[0:3] = robot_pred
if n == 3:
self.Sigma[0:3, 0:3] = Sigma_rr_new
else:
Sigma_rl = self.Sigma[0:3, 3:].copy()
self.Sigma[0:3, 0:3] = Sigma_rr_new
self.Sigma[0:3, 3:] = F @ Sigma_rl
self.Sigma[3:, 0:3] = Sigma_rl.T @ F.T

def update(self, observations):
"""
EKF update step with data association and landmark augmentation.
observations: list of (2,1) [range, bearing] measurements
"""
if not observations:
return

n = self.mu.shape[0]
num_landmarks = (n - 3) // 2
pred_obs_list = []
S_list = []
for j in range(num_landmarks):
lx = self.mu[3 + 2 * j, 0]
ly = self.mu[3 + 2 * j + 1, 0]
z_pred = observe_landmark(self.mu[0:3], lx, ly)
pred_obs_list.append(z_pred)
H = build_H_matrix(self.mu[0:3], lx, ly, j, n)
S = H @ self.Sigma @ H.T + self.R_obs
S_list.append(S)
if not pred_obs_list:
matches = [(o, -1) for o in range(len(observations))]
else:
matches = nearest_neighbor_association(
observations, pred_obs_list, S_list,
list(range(num_landmarks)), self.gate_threshold
)

for (obs_idx, land_id) in matches:
n = self.mu.shape[0]
z = observations[obs_idx]
if land_id >= 0:
j = land_id
lx = self.mu[3 + 2 * j, 0]
ly = self.mu[3 + 2 * j + 1, 0]
z_pred = observe_landmark(self.mu[0:3], lx, ly)
if z_pred[0, 0] < 0.5:
continue
H = build_H_matrix(self.mu[0:3], lx, ly, j, n)
S = H @ self.Sigma @ H.T + self.R_obs
try:
# K = Sigma @ H.T @ inv(S); inv() is clear for beginners; np.linalg.solve is preferred in production
K = self.Sigma @ H.T @ np.linalg.inv(S)
except np.linalg.LinAlgError:
continue
innov = z - z_pred
innov[1, 0] = (innov[1, 0] + pi) % (2 * pi) - pi
self.mu = self.mu + K @ innov
# Standard EKF covariance update (Joseph form can be used for better numerical stability)
IKH = np.eye(n) - K @ H
self.Sigma = IKH @ self.Sigma
elif land_id == POTENTIAL_DUPLICATE:
continue
else:
# Cross-time duplicate check: skip if would-be position is near an existing landmark
lm_new, _, _ = initialize_landmark_from_observation(
self.mu[0:3], z, None, None
)
lx_new, ly_new = float(lm_new[0, 0]), float(lm_new[1, 0])
is_duplicate = False
for j in range((self.mu.shape[0] - 3) // 2):
lx_j = self.mu[3 + 2 * j, 0]
ly_j = self.mu[3 + 2 * j + 1, 0]
d = sqrt((lx_new - lx_j) ** 2 + (ly_new - ly_j) ** 2)
if d < self.duplicate_position_threshold:
is_duplicate = True
break
if is_duplicate:
continue
self.mu, self.Sigma = augment_state(
self.mu, self.Sigma, z, self.mu[0:3], self.R_obs
)

def get_robot_state(self):
"""Returns (3,1) array [x, y, yaw]."""
return self.mu[0:3].copy()

def get_estimated_landmarks(self):
"""Returns list of (lx, ly) tuples for all mapped landmarks."""
n = self.mu.shape[0]
if n <= 3:
return []
return [
(self.mu[3 + 2 * j, 0], self.mu[3 + 2 * j + 1, 0])
for j in range((n - 3) // 2)
]

def draw(self, axes, elems, pose):
"""
Draw uncertainty circle and estimated landmarks.
axes: Axes object
elems: list of plot elements
pose: (3,1) [x, y, yaw] for circle center
"""
est_lm = self.get_estimated_landmarks()
if est_lm:
lx_est = [p[0] for p in est_lm]
ly_est = [p[1] for p in est_lm]
p, = axes.plot(lx_est, ly_est, "rx", markersize=6, label="Est. landmarks")
elems.append(p)

# Simple uncertainty circle (radius ~ 3-sigma from trace of pose covariance)
Sigma_xy = self.Sigma[0:2, 0:2]
try:
trace_xy = np.trace(Sigma_xy) / 2.0
r = 3.0 * sqrt(max(0.0, trace_xy))
t = np.arange(0, 2 * pi + 0.1, 0.1)
cx = pose[0, 0] + r * np.cos(t)
cy = pose[1, 0] + r * np.sin(t)
p, = axes.plot(cx, cy, color=self.DRAW_COLOR, linewidth=0.8)
elems.append(p)
except (np.linalg.LinAlgError, ValueError):
pass
61 changes: 61 additions & 0 deletions src/components/localization/ekf_slam/landmark_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
landmark_manager.py

State vector augmentation and new landmark initialization for EKF-SLAM.
"""

import numpy as np
from math import cos, sin


def initialize_landmark_from_observation(robot_state, z, G_robot, G_landmark):
"""
Compute landmark position (lx, ly) from range-bearing observation and robot pose.
robot_state: (3, 1) [x, y, yaw]
z: (2, 1) [range, bearing]
G_robot: (2, 3) Jacobian of [lx, ly] w.r.t. robot state (for covariance)
G_landmark: (2, 2) Jacobian of [lx, ly] w.r.t. [r, phi]
Returns: (2, 1) landmark position, and G_robot, G_landmark for covariance update.
"""
x, y, yaw = robot_state[0, 0], robot_state[1, 0], robot_state[2, 0]
r, phi = z[0, 0], z[1, 0]
lx = x + r * cos(yaw + phi)
ly = y + r * sin(yaw + phi)
# Jacobian of (lx, ly) w.r.t. (x, y, yaw): dlx/dx=1, dlx/dy=0, dlx/dyaw=-r*sin(yaw+phi)
G_robot = np.array([
[1, 0, -r * sin(yaw + phi)],
[0, 1, r * cos(yaw + phi)]
])
G_landmark = np.array([
[cos(yaw + phi), -r * sin(yaw + phi)],
[sin(yaw + phi), r * cos(yaw + phi)]
])
return np.array([[lx], [ly]]), G_robot, G_landmark


def augment_state(mu, Sigma, z, robot_state, R_obs):
"""
Augment state vector and covariance with a new landmark from observation z.
mu: (n, 1) state vector [x, y, yaw, l1_x, l1_y, ...]
Sigma: (n, n) covariance
z: (2, 1) [range, bearing]
robot_state: (3, 1) current robot pose from mu
R_obs: (2, 2) observation noise covariance
Returns: mu_new (n+2, 1), Sigma_new (n+2, n+2)
"""
lm, G_robot, G_landmark = initialize_landmark_from_observation(robot_state, z, None, None)
n = mu.shape[0]
# Cross covariance: Sigma_xy = Sigma[0:3, :] for robot; we need Sigma_robot_rest
Sigma_rr = Sigma[0:3, 0:3]
Sigma_xr = Sigma[:, 0:3] # (n, 3)
Sigma_rx = Sigma[0:3, :] # (3, n)
# New landmark covariance and cross terms
sigma_new_lm = Sigma_xr @ G_robot.T # (n, 2)
sigma_new_lm_T = sigma_new_lm.T # (2, n)
Sigma_ll = G_robot @ Sigma_rr @ G_robot.T + G_landmark @ R_obs @ G_landmark.T
mu_new = np.vstack([mu, lm])
Sigma_new = np.block([
[Sigma, sigma_new_lm],
[sigma_new_lm_T, Sigma_ll]
])
return mu_new, Sigma_new
80 changes: 80 additions & 0 deletions src/components/localization/ekf_slam/motion_model.py
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Motion model of the vehicle has already been implemented in the following state class.
https://github.com/ShisatoYano/AutonomousVehicleControlBeginnersGuide/blob/main/src/components/state/state.py
Please refactor your code by using the above existing motion model.

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
motion_model.py

Velocity motion model and Jacobians for EKF-SLAM prediction step.
Robot state: [x, y, yaw]. Control input: [v, omega].
"""

import numpy as np
from math import cos, sin


def predict_robot_state(robot_state, control, dt):
"""
Predict next robot pose using velocity motion model.
robot_state: (3, 1) array [x, y, yaw]
control: (2, 1) array [v, omega]
dt: time step [s]
Returns: (3, 1) predicted state
"""
x, y, yaw = robot_state[0, 0], robot_state[1, 0], robot_state[2, 0]
v, omega = control[0, 0], control[1, 0]
yaw_new = yaw + omega * dt
# avoid singularities when omega is near zero
if abs(omega) < 1e-6:
x_new = x + v * cos(yaw) * dt
y_new = y + v * sin(yaw) * dt
else:
x_new = x + (v / omega) * (sin(yaw_new) - sin(yaw))
y_new = y + (v / omega) * (-cos(yaw_new) + cos(yaw))
return np.array([[x_new], [y_new], [yaw_new]])


def jacobian_F(robot_state, control, dt):
"""
Jacobian of motion model w.r.t. robot state [x, y, yaw].
robot_state: (3, 1), control: (2, 1), dt: float
Returns: (3, 3) Jacobian F
"""
yaw = robot_state[2, 0]
v, omega = control[0, 0], control[1, 0]
if abs(omega) < 1e-6:
dx_dyaw = -v * sin(yaw) * dt
dy_dyaw = v * cos(yaw) * dt
else:
yaw_new = yaw + omega * dt
dx_dyaw = (v / omega) * (cos(yaw_new) - cos(yaw))
dy_dyaw = (v / omega) * (sin(yaw_new) - sin(yaw))
F = np.array([
[1, 0, dx_dyaw],
[0, 1, dy_dyaw],
[0, 0, 1]
])
return F


def jacobian_G(robot_state, control, dt):
"""
Jacobian of motion model w.r.t. control [v, omega].
robot_state: (3, 1), control: (2, 1), dt: float
Returns: (3, 2) Jacobian G
"""
yaw = robot_state[2, 0]
v, omega = control[0, 0], control[1, 0]
if abs(omega) < 1e-6:
dx_dv = cos(yaw) * dt
dy_dv = sin(yaw) * dt
dx_domega = 0.0
dy_domega = 0.0
else:
yaw_new = yaw + omega * dt
dx_dv = (sin(yaw_new) - sin(yaw)) / omega
dy_dv = (-cos(yaw_new) + cos(yaw)) / omega
dx_domega = (v / (omega ** 2)) * (sin(yaw) - sin(yaw_new) + omega * cos(yaw_new) * dt)
dy_domega = (v / (omega ** 2)) * (cos(yaw_new) - cos(yaw) + omega * sin(yaw_new) * dt)
G = np.array([
[dx_dv, dx_domega],
[dy_dv, dy_domega],
[0, dt]
])
return G
Loading