Python Sensor Fusion — Deep Dive
Kalman Filter from Scratch
The Kalman filter estimates a state vector x and covariance matrix P through predict-update cycles:
import numpy as np
class KalmanFilter:
def __init__(self, state_dim, meas_dim):
self.x = np.zeros(state_dim) # state estimate
self.P = np.eye(state_dim) * 1000 # state covariance (high initial uncertainty)
self.F = np.eye(state_dim) # state transition matrix
self.H = np.zeros((meas_dim, state_dim)) # measurement matrix
self.R = np.eye(meas_dim) # measurement noise covariance
self.Q = np.eye(state_dim) # process noise covariance
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
def update(self, z):
y = z - self.H @ self.x # innovation
S = self.H @ self.P @ self.H.T + self.R # innovation covariance
K = self.P @ self.H.T @ np.linalg.inv(S) # Kalman gain
self.x = self.x + K @ y
I = np.eye(len(self.x))
self.P = (I - K @ self.H) @ self.P @ (I - K @ self.H).T + K @ self.R @ K.T
# Joseph form above is numerically more stable than (I - KH)P
Tracking a Vehicle with Position and Velocity
def create_vehicle_tracker(dt=0.1):
"""Track vehicle position (x, y) and velocity (vx, vy)."""
kf = KalmanFilter(state_dim=4, meas_dim=2)
# State: [x, vx, y, vy]
kf.F = np.array([
[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]
])
# Measurement: [x, y] (position only)
kf.H = np.array([
[1, 0, 0, 0],
[0, 0, 1, 0]
])
# Measurement noise (GPS-like: ±2 meters)
kf.R = np.diag([4.0, 4.0])
# Process noise (acceleration uncertainty)
q = 0.5 # m/s^2
kf.Q = np.array([
[dt**4/4, dt**3/2, 0, 0],
[dt**3/2, dt**2, 0, 0],
[0, 0, dt**4/4, dt**3/2],
[0, 0, dt**3/2, dt**2]
]) * q**2
return kf
Extended Kalman Filter (EKF)
When the motion or measurement model is nonlinear, the EKF linearizes using Jacobians:
class ExtendedKalmanFilter:
def __init__(self, state_dim):
self.x = np.zeros(state_dim)
self.P = np.eye(state_dim) * 100
def predict(self, f, F_jacobian, Q):
"""
f: nonlinear state transition function x_new = f(x)
F_jacobian: Jacobian of f evaluated at current x
"""
self.x = f(self.x)
F = F_jacobian(self.x)
self.P = F @ self.P @ F.T + Q
def update(self, z, h, H_jacobian, R):
"""
z: measurement
h: nonlinear measurement function z_predicted = h(x)
H_jacobian: Jacobian of h evaluated at current x
"""
z_pred = h(self.x)
y = z - z_pred
H = H_jacobian(self.x)
S = H @ self.P @ H.T + R
K = self.P @ H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
I = np.eye(len(self.x))
self.P = (I - K @ H) @ self.P
EKF for Radar Measurements
Radar reports range and bearing (polar coordinates), which have a nonlinear relationship to Cartesian position:
def radar_ekf_example():
ekf = ExtendedKalmanFilter(state_dim=4) # [x, vx, y, vy]
ekf.x = np.array([10, 1, 5, 0.5])
def h_radar(x):
"""Measurement model: radar reports (range, bearing, range_rate)."""
px, vx, py, vy = x
rng = np.sqrt(px**2 + py**2)
bearing = np.arctan2(py, px)
range_rate = (px*vx + py*vy) / rng
return np.array([rng, bearing, range_rate])
def H_radar(x):
"""Jacobian of radar measurement model."""
px, vx, py, vy = x
rng = np.sqrt(px**2 + py**2)
rng2 = rng**2
rng3 = rng**3
H = np.zeros((3, 4))
H[0, 0] = px / rng
H[0, 2] = py / rng
H[1, 0] = -py / rng2
H[1, 2] = px / rng2
H[2, 0] = py*(vx*py - vy*px) / rng3
H[2, 1] = px / rng
H[2, 2] = px*(vy*px - vx*py) / rng3
H[2, 3] = py / rng
return H
R_radar = np.diag([0.09, 0.0009, 0.09]) # range, bearing, range_rate noise
return ekf, h_radar, H_radar, R_radar
Unscented Kalman Filter (UKF)
The UKF avoids Jacobian computation by using “sigma points” — carefully chosen sample points that capture the mean and covariance of the state distribution:
class UnscentedKalmanFilter:
def __init__(self, state_dim, meas_dim, alpha=1e-3, beta=2, kappa=0):
self.n = state_dim
self.m = meas_dim
self.x = np.zeros(state_dim)
self.P = np.eye(state_dim) * 100
# UKF parameters
self.alpha = alpha
self.beta = beta
self.kappa = kappa
self.lam = alpha**2 * (self.n + kappa) - self.n
# Weights for mean and covariance
self.Wm = np.full(2 * self.n + 1, 1 / (2 * (self.n + self.lam)))
self.Wc = np.full(2 * self.n + 1, 1 / (2 * (self.n + self.lam)))
self.Wm[0] = self.lam / (self.n + self.lam)
self.Wc[0] = self.lam / (self.n + self.lam) + (1 - alpha**2 + beta)
def _sigma_points(self):
n = self.n
sigma_pts = np.zeros((2 * n + 1, n))
sigma_pts[0] = self.x
sqrt_P = np.linalg.cholesky((n + self.lam) * self.P)
for i in range(n):
sigma_pts[i + 1] = self.x + sqrt_P[i]
sigma_pts[n + i + 1] = self.x - sqrt_P[i]
return sigma_pts
def predict(self, f, Q):
sigma_pts = self._sigma_points()
# Transform sigma points through motion model
transformed = np.array([f(sp) for sp in sigma_pts])
# Weighted mean and covariance
self.x = np.average(transformed, weights=self.Wm, axis=0)
self.P = Q.copy()
for i in range(len(transformed)):
diff = transformed[i] - self.x
self.P += self.Wc[i] * np.outer(diff, diff)
def update(self, z, h, R):
sigma_pts = self._sigma_points()
# Transform through measurement model
z_sigma = np.array([h(sp) for sp in sigma_pts])
z_mean = np.average(z_sigma, weights=self.Wm, axis=0)
# Innovation covariance and cross-covariance
S = R.copy()
Pxz = np.zeros((self.n, self.m))
for i in range(len(sigma_pts)):
z_diff = z_sigma[i] - z_mean
x_diff = sigma_pts[i] - self.x
S += self.Wc[i] * np.outer(z_diff, z_diff)
Pxz += self.Wc[i] * np.outer(x_diff, z_diff)
K = Pxz @ np.linalg.inv(S)
self.x += K @ (z - z_mean)
self.P -= K @ S @ K.T
Multi-Sensor Fusion Architecture
A production fusion system must handle sensors with different update rates, latencies, and coordinate frames:
class MultiSensorFusionTracker:
"""Asynchronous multi-sensor fusion with time alignment."""
def __init__(self, dt=0.01):
self.tracks = {} # track_id -> EKF instance
self.dt = dt
self.current_time = 0
def predict_to_time(self, track_id, target_time):
"""Advance a track's state to a target timestamp."""
track = self.tracks[track_id]
dt = target_time - track['last_update']
if dt <= 0:
return
# Update transition matrix for this dt
track['ekf'].F = self._constant_velocity_F(dt)
track['ekf'].Q = self._process_noise_Q(dt, sigma_a=2.0)
track['ekf'].predict()
track['last_update'] = target_time
def update_from_camera(self, track_id, timestamp, bbox_center, depth_est):
"""Camera provides (x, y) in image + estimated depth."""
self.predict_to_time(track_id, timestamp)
track = self.tracks[track_id]
# Camera measurement model: position with moderate noise
H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
R = np.diag([1.0, 1.0]) # ±1 meter position uncertainty
z = np.array([bbox_center[0], bbox_center[1]])
track['ekf'].update(z, H, R)
track['last_update'] = timestamp
def update_from_radar(self, track_id, timestamp, range_m, bearing, range_rate):
"""Radar provides range, bearing, and radial velocity."""
self.predict_to_time(track_id, timestamp)
track = self.tracks[track_id]
# Use EKF update with nonlinear radar model
z = np.array([range_m, bearing, range_rate])
R = np.diag([0.1, 0.01, 0.05]) # radar is very precise
# Radar update uses nonlinear h and H_jacobian (see EKF section)
track['ekf'].update_nonlinear(z, h_radar, H_radar_jacobian, R)
track['last_update'] = timestamp
def update_from_lidar(self, track_id, timestamp, cluster_center):
"""LiDAR provides 3D position of cluster centroid."""
self.predict_to_time(track_id, timestamp)
track = self.tracks[track_id]
H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
R = np.diag([0.05, 0.05]) # LiDAR is very precise in position
z = np.array([cluster_center[0], cluster_center[1]])
track['ekf'].update(z, H, R)
track['last_update'] = timestamp
def _constant_velocity_F(self, dt):
return np.array([
[1, dt, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, dt],
[0, 0, 0, 1]
])
def _process_noise_Q(self, dt, sigma_a):
q = sigma_a**2
return np.array([
[dt**4/4, dt**3/2, 0, 0],
[dt**3/2, dt**2, 0, 0],
[0, 0, dt**4/4, dt**3/2],
[0, 0, dt**3/2, dt**2]
]) * q
Cross-Sensor Calibration
Precise calibration between sensors is critical. LiDAR-to-camera calibration finds the rigid transformation (rotation + translation) between sensor coordinate frames:
def calibrate_lidar_camera(lidar_points_3d, image_points_2d, K, dist_coeffs):
"""Find extrinsic transform between LiDAR and camera.
lidar_points_3d: (N, 3) array of 3D points in LiDAR frame
image_points_2d: (N, 2) corresponding pixel coordinates
K: camera intrinsic matrix
"""
success, rvec, tvec = cv2.solvePnP(
lidar_points_3d.astype(np.float32),
image_points_2d.astype(np.float32),
K, dist_coeffs,
flags=cv2.SOLVEPNP_ITERATIVE
)
R, _ = cv2.Rodrigues(rvec)
T_lidar_to_cam = np.eye(4)
T_lidar_to_cam[:3, :3] = R
T_lidar_to_cam[:3, 3] = tvec.flatten()
return T_lidar_to_cam
def project_lidar_to_image(points_3d, T_lidar_to_cam, K):
"""Project LiDAR points onto camera image."""
points_hom = np.hstack([points_3d, np.ones((len(points_3d), 1))])
points_cam = (T_lidar_to_cam @ points_hom.T).T
points_cam = points_cam[points_cam[:, 2] > 0] # only in front of camera
pixels = (K @ points_cam[:, :3].T).T
pixels = pixels[:, :2] / pixels[:, 2:3]
return pixels
Using FilterPy
FilterPy provides production-tested implementations:
from filterpy.kalman import KalmanFilter, UnscentedKalmanFilter, MerweScaledSigmaPoints
# Linear KF for GPS+IMU fusion
kf = KalmanFilter(dim_x=4, dim_z=2)
kf.F = np.array([[1, 0.1, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0.1], [0, 0, 0, 1]])
kf.H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
kf.R *= 5 # GPS noise
kf.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.5, block_size=2)
# UKF for nonlinear fusion
points = MerweScaledSigmaPoints(n=4, alpha=0.1, beta=2., kappa=-1)
ukf = UnscentedKalmanFilter(dim_x=4, dim_z=3, fx=motion_model, hx=radar_model,
dt=0.1, points=points)
One thing to remember: Multi-sensor fusion in production requires handling different update rates and latencies through time-aligned prediction-update cycles, using EKF or UKF filters that weight each sensor’s contribution by its noise characteristics — with cross-sensor calibration being the single most important factor for accuracy.
See Also
- Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
- Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
- Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
- Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
- Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.