Python Sensor Fusion — Deep Dive

Kalman Filter from Scratch

The Kalman filter estimates a state vector x and covariance matrix P through predict-update cycles:

import numpy as np

class KalmanFilter:
    def __init__(self, state_dim, meas_dim):
        self.x = np.zeros(state_dim)           # state estimate
        self.P = np.eye(state_dim) * 1000      # state covariance (high initial uncertainty)
        self.F = np.eye(state_dim)              # state transition matrix
        self.H = np.zeros((meas_dim, state_dim))  # measurement matrix
        self.R = np.eye(meas_dim)               # measurement noise covariance
        self.Q = np.eye(state_dim)              # process noise covariance

    def predict(self):
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q

    def update(self, z):
        y = z - self.H @ self.x                 # innovation
        S = self.H @ self.P @ self.H.T + self.R  # innovation covariance
        K = self.P @ self.H.T @ np.linalg.inv(S)  # Kalman gain
        self.x = self.x + K @ y
        I = np.eye(len(self.x))
        self.P = (I - K @ self.H) @ self.P @ (I - K @ self.H).T + K @ self.R @ K.T
        # Joseph form above is numerically more stable than (I - KH)P

Tracking a Vehicle with Position and Velocity

def create_vehicle_tracker(dt=0.1):
    """Track vehicle position (x, y) and velocity (vx, vy)."""
    kf = KalmanFilter(state_dim=4, meas_dim=2)

    # State: [x, vx, y, vy]
    kf.F = np.array([
        [1, dt, 0,  0],
        [0,  1, 0,  0],
        [0,  0, 1, dt],
        [0,  0, 0,  1]
    ])

    # Measurement: [x, y] (position only)
    kf.H = np.array([
        [1, 0, 0, 0],
        [0, 0, 1, 0]
    ])

    # Measurement noise (GPS-like: ±2 meters)
    kf.R = np.diag([4.0, 4.0])

    # Process noise (acceleration uncertainty)
    q = 0.5  # m/s^2
    kf.Q = np.array([
        [dt**4/4, dt**3/2, 0, 0],
        [dt**3/2, dt**2,   0, 0],
        [0, 0, dt**4/4, dt**3/2],
        [0, 0, dt**3/2, dt**2]
    ]) * q**2

    return kf

Extended Kalman Filter (EKF)

When the motion or measurement model is nonlinear, the EKF linearizes using Jacobians:

class ExtendedKalmanFilter:
    def __init__(self, state_dim):
        self.x = np.zeros(state_dim)
        self.P = np.eye(state_dim) * 100

    def predict(self, f, F_jacobian, Q):
        """
        f: nonlinear state transition function x_new = f(x)
        F_jacobian: Jacobian of f evaluated at current x
        """
        self.x = f(self.x)
        F = F_jacobian(self.x)
        self.P = F @ self.P @ F.T + Q

    def update(self, z, h, H_jacobian, R):
        """
        z: measurement
        h: nonlinear measurement function z_predicted = h(x)
        H_jacobian: Jacobian of h evaluated at current x
        """
        z_pred = h(self.x)
        y = z - z_pred
        H = H_jacobian(self.x)
        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ np.linalg.inv(S)
        self.x = self.x + K @ y
        I = np.eye(len(self.x))
        self.P = (I - K @ H) @ self.P

EKF for Radar Measurements

Radar reports range and bearing (polar coordinates), which have a nonlinear relationship to Cartesian position:

def radar_ekf_example():
    ekf = ExtendedKalmanFilter(state_dim=4)  # [x, vx, y, vy]
    ekf.x = np.array([10, 1, 5, 0.5])

    def h_radar(x):
        """Measurement model: radar reports (range, bearing, range_rate)."""
        px, vx, py, vy = x
        rng = np.sqrt(px**2 + py**2)
        bearing = np.arctan2(py, px)
        range_rate = (px*vx + py*vy) / rng
        return np.array([rng, bearing, range_rate])

    def H_radar(x):
        """Jacobian of radar measurement model."""
        px, vx, py, vy = x
        rng = np.sqrt(px**2 + py**2)
        rng2 = rng**2
        rng3 = rng**3

        H = np.zeros((3, 4))
        H[0, 0] = px / rng
        H[0, 2] = py / rng
        H[1, 0] = -py / rng2
        H[1, 2] = px / rng2
        H[2, 0] = py*(vx*py - vy*px) / rng3
        H[2, 1] = px / rng
        H[2, 2] = px*(vy*px - vx*py) / rng3
        H[2, 3] = py / rng
        return H

    R_radar = np.diag([0.09, 0.0009, 0.09])  # range, bearing, range_rate noise
    return ekf, h_radar, H_radar, R_radar

Unscented Kalman Filter (UKF)

The UKF avoids Jacobian computation by using “sigma points” — carefully chosen sample points that capture the mean and covariance of the state distribution:

class UnscentedKalmanFilter:
    def __init__(self, state_dim, meas_dim, alpha=1e-3, beta=2, kappa=0):
        self.n = state_dim
        self.m = meas_dim
        self.x = np.zeros(state_dim)
        self.P = np.eye(state_dim) * 100

        # UKF parameters
        self.alpha = alpha
        self.beta = beta
        self.kappa = kappa
        self.lam = alpha**2 * (self.n + kappa) - self.n

        # Weights for mean and covariance
        self.Wm = np.full(2 * self.n + 1, 1 / (2 * (self.n + self.lam)))
        self.Wc = np.full(2 * self.n + 1, 1 / (2 * (self.n + self.lam)))
        self.Wm[0] = self.lam / (self.n + self.lam)
        self.Wc[0] = self.lam / (self.n + self.lam) + (1 - alpha**2 + beta)

    def _sigma_points(self):
        n = self.n
        sigma_pts = np.zeros((2 * n + 1, n))
        sigma_pts[0] = self.x

        sqrt_P = np.linalg.cholesky((n + self.lam) * self.P)
        for i in range(n):
            sigma_pts[i + 1] = self.x + sqrt_P[i]
            sigma_pts[n + i + 1] = self.x - sqrt_P[i]

        return sigma_pts

    def predict(self, f, Q):
        sigma_pts = self._sigma_points()

        # Transform sigma points through motion model
        transformed = np.array([f(sp) for sp in sigma_pts])

        # Weighted mean and covariance
        self.x = np.average(transformed, weights=self.Wm, axis=0)
        self.P = Q.copy()
        for i in range(len(transformed)):
            diff = transformed[i] - self.x
            self.P += self.Wc[i] * np.outer(diff, diff)

    def update(self, z, h, R):
        sigma_pts = self._sigma_points()

        # Transform through measurement model
        z_sigma = np.array([h(sp) for sp in sigma_pts])
        z_mean = np.average(z_sigma, weights=self.Wm, axis=0)

        # Innovation covariance and cross-covariance
        S = R.copy()
        Pxz = np.zeros((self.n, self.m))
        for i in range(len(sigma_pts)):
            z_diff = z_sigma[i] - z_mean
            x_diff = sigma_pts[i] - self.x
            S += self.Wc[i] * np.outer(z_diff, z_diff)
            Pxz += self.Wc[i] * np.outer(x_diff, z_diff)

        K = Pxz @ np.linalg.inv(S)
        self.x += K @ (z - z_mean)
        self.P -= K @ S @ K.T

Multi-Sensor Fusion Architecture

A production fusion system must handle sensors with different update rates, latencies, and coordinate frames:

class MultiSensorFusionTracker:
    """Asynchronous multi-sensor fusion with time alignment."""
    def __init__(self, dt=0.01):
        self.tracks = {}  # track_id -> EKF instance
        self.dt = dt
        self.current_time = 0

    def predict_to_time(self, track_id, target_time):
        """Advance a track's state to a target timestamp."""
        track = self.tracks[track_id]
        dt = target_time - track['last_update']
        if dt <= 0:
            return

        # Update transition matrix for this dt
        track['ekf'].F = self._constant_velocity_F(dt)
        track['ekf'].Q = self._process_noise_Q(dt, sigma_a=2.0)
        track['ekf'].predict()
        track['last_update'] = target_time

    def update_from_camera(self, track_id, timestamp, bbox_center, depth_est):
        """Camera provides (x, y) in image + estimated depth."""
        self.predict_to_time(track_id, timestamp)
        track = self.tracks[track_id]

        # Camera measurement model: position with moderate noise
        H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
        R = np.diag([1.0, 1.0])  # ±1 meter position uncertainty

        z = np.array([bbox_center[0], bbox_center[1]])
        track['ekf'].update(z, H, R)
        track['last_update'] = timestamp

    def update_from_radar(self, track_id, timestamp, range_m, bearing, range_rate):
        """Radar provides range, bearing, and radial velocity."""
        self.predict_to_time(track_id, timestamp)
        track = self.tracks[track_id]

        # Use EKF update with nonlinear radar model
        z = np.array([range_m, bearing, range_rate])
        R = np.diag([0.1, 0.01, 0.05])  # radar is very precise

        # Radar update uses nonlinear h and H_jacobian (see EKF section)
        track['ekf'].update_nonlinear(z, h_radar, H_radar_jacobian, R)
        track['last_update'] = timestamp

    def update_from_lidar(self, track_id, timestamp, cluster_center):
        """LiDAR provides 3D position of cluster centroid."""
        self.predict_to_time(track_id, timestamp)
        track = self.tracks[track_id]

        H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
        R = np.diag([0.05, 0.05])  # LiDAR is very precise in position

        z = np.array([cluster_center[0], cluster_center[1]])
        track['ekf'].update(z, H, R)
        track['last_update'] = timestamp

    def _constant_velocity_F(self, dt):
        return np.array([
            [1, dt, 0, 0],
            [0, 1,  0, 0],
            [0, 0,  1, dt],
            [0, 0,  0, 1]
        ])

    def _process_noise_Q(self, dt, sigma_a):
        q = sigma_a**2
        return np.array([
            [dt**4/4, dt**3/2, 0, 0],
            [dt**3/2, dt**2,   0, 0],
            [0, 0, dt**4/4, dt**3/2],
            [0, 0, dt**3/2, dt**2]
        ]) * q

Cross-Sensor Calibration

Precise calibration between sensors is critical. LiDAR-to-camera calibration finds the rigid transformation (rotation + translation) between sensor coordinate frames:

def calibrate_lidar_camera(lidar_points_3d, image_points_2d, K, dist_coeffs):
    """Find extrinsic transform between LiDAR and camera.

    lidar_points_3d: (N, 3) array of 3D points in LiDAR frame
    image_points_2d: (N, 2) corresponding pixel coordinates
    K: camera intrinsic matrix
    """
    success, rvec, tvec = cv2.solvePnP(
        lidar_points_3d.astype(np.float32),
        image_points_2d.astype(np.float32),
        K, dist_coeffs,
        flags=cv2.SOLVEPNP_ITERATIVE
    )

    R, _ = cv2.Rodrigues(rvec)
    T_lidar_to_cam = np.eye(4)
    T_lidar_to_cam[:3, :3] = R
    T_lidar_to_cam[:3, 3] = tvec.flatten()

    return T_lidar_to_cam

def project_lidar_to_image(points_3d, T_lidar_to_cam, K):
    """Project LiDAR points onto camera image."""
    points_hom = np.hstack([points_3d, np.ones((len(points_3d), 1))])
    points_cam = (T_lidar_to_cam @ points_hom.T).T
    points_cam = points_cam[points_cam[:, 2] > 0]  # only in front of camera
    pixels = (K @ points_cam[:, :3].T).T
    pixels = pixels[:, :2] / pixels[:, 2:3]
    return pixels

Using FilterPy

FilterPy provides production-tested implementations:

from filterpy.kalman import KalmanFilter, UnscentedKalmanFilter, MerweScaledSigmaPoints

# Linear KF for GPS+IMU fusion
kf = KalmanFilter(dim_x=4, dim_z=2)
kf.F = np.array([[1, 0.1, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0.1], [0, 0, 0, 1]])
kf.H = np.array([[1, 0, 0, 0], [0, 0, 1, 0]])
kf.R *= 5  # GPS noise
kf.Q = Q_discrete_white_noise(dim=2, dt=0.1, var=0.5, block_size=2)

# UKF for nonlinear fusion
points = MerweScaledSigmaPoints(n=4, alpha=0.1, beta=2., kappa=-1)
ukf = UnscentedKalmanFilter(dim_x=4, dim_z=3, fx=motion_model, hx=radar_model,
                            dt=0.1, points=points)

One thing to remember: Multi-sensor fusion in production requires handling different update rates and latencies through time-aligned prediction-update cycles, using EKF or UKF filters that weight each sensor’s contribution by its noise characteristics — with cross-sensor calibration being the single most important factor for accuracy.

pythonsensor-fusionroboticsautonomous-vehicleskalman-filter

See Also

  • Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
  • Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
  • Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
  • Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
  • Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.