-
-
Notifications
You must be signed in to change notification settings - Fork 220
Localization: EKF SLAM #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kunj1302
wants to merge
9
commits into
ShisatoYano:main
Choose a base branch
from
kunj1302:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
bfe74b2
Added motion model
kunj1302 0b27f90
Added observation model
kunj1302 e8f8f51
Added landmark manager
kunj1302 8c99402
Added data association code
kunj1302 c831190
Added ekf slam main
kunj1302 810f0ad
Added ekf slam unit test
kunj1302 f7d2286
Modified code structure - moved helper functions to components directory
kunj1302 9c39194
Changed number of ladmarks and finetuned hyperparameters
kunj1302 6d3490e
Code clean up
kunj1302 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| """ | ||
| data_association.py | ||
|
|
||
| Nearest-neighbor data association with Mahalanobis distance gate. | ||
| """ | ||
|
|
||
| import numpy as np | ||
|
|
||
|
|
||
| def mahalanobis_distance(z_actual, z_predicted, S): | ||
| """ | ||
| Mahalanobis distance: (z_actual - z_predicted)^T @ inv(S) @ (z_actual - z_predicted) | ||
| z_actual, z_predicted: (2, 1) measurement vectors | ||
| S: (2, 2) innovation covariance | ||
| Returns: scalar distance | ||
| """ | ||
| diff = z_actual - z_predicted | ||
| # Normalize bearing difference to [-pi, pi] | ||
| diff[1, 0] = _normalize_angle(diff[1, 0]) | ||
| try: | ||
| s_diff = np.linalg.solve(S, diff) | ||
| d_sq = (diff.T @ s_diff)[0, 0] | ||
| return np.sqrt(max(0, d_sq)) | ||
| except np.linalg.LinAlgError: | ||
| return np.inf | ||
|
|
||
|
|
||
| def _normalize_angle(angle): | ||
| """Normalize angle to [-pi, pi].""" | ||
| return (angle + np.pi) % (2 * np.pi) - np.pi | ||
|
|
||
|
|
||
| # Landmark id returned when observation should not be added as new (potential duplicate) | ||
| POTENTIAL_DUPLICATE = -2 | ||
|
|
||
|
|
||
| def nearest_neighbor_association(observations, predicted_observations, innovation_covs, | ||
| landmark_ids, gate_threshold): | ||
| """ | ||
| For each observation, find nearest landmark within Mahalanobis gate. | ||
| observations: list of (2, 1) arrays - actual measurements | ||
| predicted_observations: list of (2, 1) arrays - one per known landmark | ||
| innovation_covs: list of (2, 2) arrays - S matrix per landmark | ||
| landmark_ids: list of landmark indices (0-based) | ||
| gate_threshold: max Mahalanobis distance to accept a match | ||
| Returns: list of (obs_index, landmark_id). | ||
| landmark_id >= 0: match to that landmark (state index). | ||
| landmark_id == -1: truly new landmark (add to state). | ||
| landmark_id == POTENTIAL_DUPLICATE (-2): within gate of an already-used landmark; | ||
| do NOT add as new (duplicate-landmark guard). | ||
| """ | ||
| num_obs = len(observations) | ||
| num_landmarks = len(landmark_ids) | ||
| matches = [] | ||
| used_landmarks = set() | ||
| for o in range(num_obs): | ||
| z = observations[o] | ||
| best_landmark = -1 | ||
| best_dist = gate_threshold + 1e-6 | ||
| min_dist_any = np.inf # min distance to any landmark (including used) | ||
| for L in range(num_landmarks): | ||
| d = mahalanobis_distance(z, predicted_observations[L], innovation_covs[L]) | ||
| min_dist_any = min(min_dist_any, d) | ||
| if landmark_ids[L] in used_landmarks: | ||
| continue | ||
| if d < best_dist: | ||
| best_dist = d | ||
| best_landmark = landmark_ids[L] | ||
| if best_landmark >= 0: | ||
| used_landmarks.add(best_landmark) | ||
| matches.append((o, best_landmark)) | ||
| elif min_dist_any <= gate_threshold: | ||
| matches.append((o, POTENTIAL_DUPLICATE)) | ||
| else: | ||
| matches.append((o, -1)) # truly new landmark | ||
| return matches |
174 changes: 174 additions & 0 deletions
174
src/components/localization/ekf_slam/ekf_slam_localizer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,174 @@ | ||
| """ | ||
| ekf_slam_localizer.py | ||
|
|
||
| EKF-SLAM localizer component following the ExtendedKalmanFilterLocalizer pattern. | ||
| Jointly estimates robot pose [x, y, yaw] and landmark positions [lx, ly, ...] | ||
| using range-bearing observations. | ||
| """ | ||
|
|
||
| import numpy as np | ||
| from math import sqrt, pi | ||
|
|
||
| from motion_model import predict_robot_state, jacobian_F, jacobian_G | ||
| from observation_model import observe_landmark, build_H_matrix | ||
| from data_association import nearest_neighbor_association, POTENTIAL_DUPLICATE | ||
| from landmark_manager import augment_state, initialize_landmark_from_observation | ||
|
|
||
|
|
||
| class EKFSLAMLocalizer: | ||
| """ | ||
| EKF-SLAM localizer: simultaneous localization and mapping via Extended Kalman Filter. | ||
| Interface mirrors ExtendedKalmanFilterLocalizer where possible. | ||
| """ | ||
|
|
||
| def __init__(self, init_x=0.0, init_y=0.0, init_yaw=0.0, | ||
| sigma_r=0.2, sigma_phi=0.1, sigma_v=0.15, sigma_omega=0.08, | ||
| gate_threshold=2.0, max_range=12.0, | ||
| duplicate_position_threshold=4.5, color='r'): | ||
| self.R_obs = np.diag([sigma_r ** 2, sigma_phi ** 2]) | ||
| self.Q_input = np.diag([sigma_v ** 2, sigma_omega ** 2]) | ||
| self.sigma_r = sigma_r | ||
| self.sigma_phi = sigma_phi | ||
| self.gate_threshold = gate_threshold | ||
| self.duplicate_position_threshold = duplicate_position_threshold | ||
| self.max_range = max_range | ||
| self.DRAW_COLOR = color | ||
|
|
||
| self.mu = np.array([[init_x], [init_y], [init_yaw]]) | ||
| self.Sigma = np.eye(3) * 0.1 | ||
|
|
||
| def predict(self, control, dt): | ||
| """ | ||
| EKF prediction step. | ||
| control: (2,1) [v, omega] | ||
| dt: time step [s] | ||
| """ | ||
| n = self.mu.shape[0] | ||
| robot_state = self.mu[0:3] | ||
| robot_pred = predict_robot_state(robot_state, control, dt) | ||
| F = jacobian_F(robot_state, control, dt) | ||
| G = jacobian_G(robot_state, control, dt) | ||
| Sigma_rr = self.Sigma[0:3, 0:3] | ||
| Sigma_rr_new = F @ Sigma_rr @ F.T + G @ self.Q_input @ G.T | ||
| self.mu[0:3] = robot_pred | ||
| if n == 3: | ||
| self.Sigma[0:3, 0:3] = Sigma_rr_new | ||
| else: | ||
| Sigma_rl = self.Sigma[0:3, 3:].copy() | ||
| self.Sigma[0:3, 0:3] = Sigma_rr_new | ||
| self.Sigma[0:3, 3:] = F @ Sigma_rl | ||
| self.Sigma[3:, 0:3] = Sigma_rl.T @ F.T | ||
|
|
||
| def update(self, observations): | ||
| """ | ||
| EKF update step with data association and landmark augmentation. | ||
| observations: list of (2,1) [range, bearing] measurements | ||
| """ | ||
| if not observations: | ||
| return | ||
|
|
||
| n = self.mu.shape[0] | ||
| num_landmarks = (n - 3) // 2 | ||
| pred_obs_list = [] | ||
| S_list = [] | ||
| for j in range(num_landmarks): | ||
| lx = self.mu[3 + 2 * j, 0] | ||
| ly = self.mu[3 + 2 * j + 1, 0] | ||
| z_pred = observe_landmark(self.mu[0:3], lx, ly) | ||
| pred_obs_list.append(z_pred) | ||
| H = build_H_matrix(self.mu[0:3], lx, ly, j, n) | ||
| S = H @ self.Sigma @ H.T + self.R_obs | ||
| S_list.append(S) | ||
| if not pred_obs_list: | ||
| matches = [(o, -1) for o in range(len(observations))] | ||
| else: | ||
| matches = nearest_neighbor_association( | ||
| observations, pred_obs_list, S_list, | ||
| list(range(num_landmarks)), self.gate_threshold | ||
| ) | ||
|
|
||
| for (obs_idx, land_id) in matches: | ||
| n = self.mu.shape[0] | ||
| z = observations[obs_idx] | ||
| if land_id >= 0: | ||
| j = land_id | ||
| lx = self.mu[3 + 2 * j, 0] | ||
| ly = self.mu[3 + 2 * j + 1, 0] | ||
| z_pred = observe_landmark(self.mu[0:3], lx, ly) | ||
| if z_pred[0, 0] < 0.5: | ||
| continue | ||
| H = build_H_matrix(self.mu[0:3], lx, ly, j, n) | ||
| S = H @ self.Sigma @ H.T + self.R_obs | ||
| try: | ||
| # K = Sigma @ H.T @ inv(S); inv() is clear for beginners; np.linalg.solve is preferred in production | ||
| K = self.Sigma @ H.T @ np.linalg.inv(S) | ||
| except np.linalg.LinAlgError: | ||
| continue | ||
| innov = z - z_pred | ||
| innov[1, 0] = (innov[1, 0] + pi) % (2 * pi) - pi | ||
| self.mu = self.mu + K @ innov | ||
| # Standard EKF covariance update (Joseph form can be used for better numerical stability) | ||
| IKH = np.eye(n) - K @ H | ||
| self.Sigma = IKH @ self.Sigma | ||
| elif land_id == POTENTIAL_DUPLICATE: | ||
| continue | ||
| else: | ||
| # Cross-time duplicate check: skip if would-be position is near an existing landmark | ||
| lm_new, _, _ = initialize_landmark_from_observation( | ||
| self.mu[0:3], z, None, None | ||
| ) | ||
| lx_new, ly_new = float(lm_new[0, 0]), float(lm_new[1, 0]) | ||
| is_duplicate = False | ||
| for j in range((self.mu.shape[0] - 3) // 2): | ||
| lx_j = self.mu[3 + 2 * j, 0] | ||
| ly_j = self.mu[3 + 2 * j + 1, 0] | ||
| d = sqrt((lx_new - lx_j) ** 2 + (ly_new - ly_j) ** 2) | ||
| if d < self.duplicate_position_threshold: | ||
| is_duplicate = True | ||
| break | ||
| if is_duplicate: | ||
| continue | ||
| self.mu, self.Sigma = augment_state( | ||
| self.mu, self.Sigma, z, self.mu[0:3], self.R_obs | ||
| ) | ||
|
|
||
| def get_robot_state(self): | ||
| """Returns (3,1) array [x, y, yaw].""" | ||
| return self.mu[0:3].copy() | ||
|
|
||
| def get_estimated_landmarks(self): | ||
| """Returns list of (lx, ly) tuples for all mapped landmarks.""" | ||
| n = self.mu.shape[0] | ||
| if n <= 3: | ||
| return [] | ||
| return [ | ||
| (self.mu[3 + 2 * j, 0], self.mu[3 + 2 * j + 1, 0]) | ||
| for j in range((n - 3) // 2) | ||
| ] | ||
|
|
||
| def draw(self, axes, elems, pose): | ||
| """ | ||
| Draw uncertainty circle and estimated landmarks. | ||
| axes: Axes object | ||
| elems: list of plot elements | ||
| pose: (3,1) [x, y, yaw] for circle center | ||
| """ | ||
| est_lm = self.get_estimated_landmarks() | ||
| if est_lm: | ||
| lx_est = [p[0] for p in est_lm] | ||
| ly_est = [p[1] for p in est_lm] | ||
| p, = axes.plot(lx_est, ly_est, "rx", markersize=6, label="Est. landmarks") | ||
| elems.append(p) | ||
|
|
||
| # Simple uncertainty circle (radius ~ 3-sigma from trace of pose covariance) | ||
| Sigma_xy = self.Sigma[0:2, 0:2] | ||
| try: | ||
| trace_xy = np.trace(Sigma_xy) / 2.0 | ||
| r = 3.0 * sqrt(max(0.0, trace_xy)) | ||
| t = np.arange(0, 2 * pi + 0.1, 0.1) | ||
| cx = pose[0, 0] + r * np.cos(t) | ||
| cy = pose[1, 0] + r * np.sin(t) | ||
| p, = axes.plot(cx, cy, color=self.DRAW_COLOR, linewidth=0.8) | ||
| elems.append(p) | ||
| except (np.linalg.LinAlgError, ValueError): | ||
| pass |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| """ | ||
| landmark_manager.py | ||
|
|
||
| State vector augmentation and new landmark initialization for EKF-SLAM. | ||
| """ | ||
|
|
||
| import numpy as np | ||
| from math import cos, sin | ||
|
|
||
|
|
||
| def initialize_landmark_from_observation(robot_state, z, G_robot, G_landmark): | ||
| """ | ||
| Compute landmark position (lx, ly) from range-bearing observation and robot pose. | ||
| robot_state: (3, 1) [x, y, yaw] | ||
| z: (2, 1) [range, bearing] | ||
| G_robot: (2, 3) Jacobian of [lx, ly] w.r.t. robot state (for covariance) | ||
| G_landmark: (2, 2) Jacobian of [lx, ly] w.r.t. [r, phi] | ||
| Returns: (2, 1) landmark position, and G_robot, G_landmark for covariance update. | ||
| """ | ||
| x, y, yaw = robot_state[0, 0], robot_state[1, 0], robot_state[2, 0] | ||
| r, phi = z[0, 0], z[1, 0] | ||
| lx = x + r * cos(yaw + phi) | ||
| ly = y + r * sin(yaw + phi) | ||
| # Jacobian of (lx, ly) w.r.t. (x, y, yaw): dlx/dx=1, dlx/dy=0, dlx/dyaw=-r*sin(yaw+phi) | ||
| G_robot = np.array([ | ||
| [1, 0, -r * sin(yaw + phi)], | ||
| [0, 1, r * cos(yaw + phi)] | ||
| ]) | ||
| G_landmark = np.array([ | ||
| [cos(yaw + phi), -r * sin(yaw + phi)], | ||
| [sin(yaw + phi), r * cos(yaw + phi)] | ||
| ]) | ||
| return np.array([[lx], [ly]]), G_robot, G_landmark | ||
|
|
||
|
|
||
| def augment_state(mu, Sigma, z, robot_state, R_obs): | ||
| """ | ||
| Augment state vector and covariance with a new landmark from observation z. | ||
| mu: (n, 1) state vector [x, y, yaw, l1_x, l1_y, ...] | ||
| Sigma: (n, n) covariance | ||
| z: (2, 1) [range, bearing] | ||
| robot_state: (3, 1) current robot pose from mu | ||
| R_obs: (2, 2) observation noise covariance | ||
| Returns: mu_new (n+2, 1), Sigma_new (n+2, n+2) | ||
| """ | ||
| lm, G_robot, G_landmark = initialize_landmark_from_observation(robot_state, z, None, None) | ||
| n = mu.shape[0] | ||
| # Cross covariance: Sigma_xy = Sigma[0:3, :] for robot; we need Sigma_robot_rest | ||
| Sigma_rr = Sigma[0:3, 0:3] | ||
| Sigma_xr = Sigma[:, 0:3] # (n, 3) | ||
| Sigma_rx = Sigma[0:3, :] # (3, n) | ||
| # New landmark covariance and cross terms | ||
| sigma_new_lm = Sigma_xr @ G_robot.T # (n, 2) | ||
| sigma_new_lm_T = sigma_new_lm.T # (2, n) | ||
| Sigma_ll = G_robot @ Sigma_rr @ G_robot.T + G_landmark @ R_obs @ G_landmark.T | ||
| mu_new = np.vstack([mu, lm]) | ||
| Sigma_new = np.block([ | ||
| [Sigma, sigma_new_lm], | ||
| [sigma_new_lm_T, Sigma_ll] | ||
| ]) | ||
| return mu_new, Sigma_new |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| """ | ||
| motion_model.py | ||
|
|
||
| Velocity motion model and Jacobians for EKF-SLAM prediction step. | ||
| Robot state: [x, y, yaw]. Control input: [v, omega]. | ||
| """ | ||
|
|
||
| import numpy as np | ||
| from math import cos, sin | ||
|
|
||
|
|
||
| def predict_robot_state(robot_state, control, dt): | ||
| """ | ||
| Predict next robot pose using velocity motion model. | ||
| robot_state: (3, 1) array [x, y, yaw] | ||
| control: (2, 1) array [v, omega] | ||
| dt: time step [s] | ||
| Returns: (3, 1) predicted state | ||
| """ | ||
| x, y, yaw = robot_state[0, 0], robot_state[1, 0], robot_state[2, 0] | ||
| v, omega = control[0, 0], control[1, 0] | ||
| yaw_new = yaw + omega * dt | ||
| # avoid singularities when omega is near zero | ||
| if abs(omega) < 1e-6: | ||
| x_new = x + v * cos(yaw) * dt | ||
| y_new = y + v * sin(yaw) * dt | ||
| else: | ||
| x_new = x + (v / omega) * (sin(yaw_new) - sin(yaw)) | ||
| y_new = y + (v / omega) * (-cos(yaw_new) + cos(yaw)) | ||
| return np.array([[x_new], [y_new], [yaw_new]]) | ||
|
|
||
|
|
||
| def jacobian_F(robot_state, control, dt): | ||
| """ | ||
| Jacobian of motion model w.r.t. robot state [x, y, yaw]. | ||
| robot_state: (3, 1), control: (2, 1), dt: float | ||
| Returns: (3, 3) Jacobian F | ||
| """ | ||
| yaw = robot_state[2, 0] | ||
| v, omega = control[0, 0], control[1, 0] | ||
| if abs(omega) < 1e-6: | ||
| dx_dyaw = -v * sin(yaw) * dt | ||
| dy_dyaw = v * cos(yaw) * dt | ||
| else: | ||
| yaw_new = yaw + omega * dt | ||
| dx_dyaw = (v / omega) * (cos(yaw_new) - cos(yaw)) | ||
| dy_dyaw = (v / omega) * (sin(yaw_new) - sin(yaw)) | ||
| F = np.array([ | ||
| [1, 0, dx_dyaw], | ||
| [0, 1, dy_dyaw], | ||
| [0, 0, 1] | ||
| ]) | ||
| return F | ||
|
|
||
|
|
||
| def jacobian_G(robot_state, control, dt): | ||
| """ | ||
| Jacobian of motion model w.r.t. control [v, omega]. | ||
| robot_state: (3, 1), control: (2, 1), dt: float | ||
| Returns: (3, 2) Jacobian G | ||
| """ | ||
| yaw = robot_state[2, 0] | ||
| v, omega = control[0, 0], control[1, 0] | ||
| if abs(omega) < 1e-6: | ||
| dx_dv = cos(yaw) * dt | ||
| dy_dv = sin(yaw) * dt | ||
| dx_domega = 0.0 | ||
| dy_domega = 0.0 | ||
| else: | ||
| yaw_new = yaw + omega * dt | ||
| dx_dv = (sin(yaw_new) - sin(yaw)) / omega | ||
| dy_dv = (-cos(yaw_new) + cos(yaw)) / omega | ||
| dx_domega = (v / (omega ** 2)) * (sin(yaw) - sin(yaw_new) + omega * cos(yaw_new) * dt) | ||
| dy_domega = (v / (omega ** 2)) * (cos(yaw_new) - cos(yaw) + omega * sin(yaw_new) * dt) | ||
| G = np.array([ | ||
| [dx_dv, dx_domega], | ||
| [dy_dv, dy_domega], | ||
| [0, dt] | ||
| ]) | ||
| return G |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Motion model of the vehicle has already been implemented in the following state class.
https://github.com/ShisatoYano/AutonomousVehicleControlBeginnersGuide/blob/main/src/components/state/state.py
Please refactor your code by using the above existing motion model.