Fleet Management in Python — Deep Dive

Production fleet management combines high-throughput telematics ingestion, geospatial processing, time-series analytics, and machine learning into a system that monitors hundreds of vehicles in real time. This guide covers the technical implementation from GPS stream processing through predictive maintenance and driver scoring.

Telematics ingestion pipeline

Vehicles report positions via MQTT or HTTP to a message broker. A Python consumer processes events and writes to a time-series database:

import json
from datetime import datetime
from dataclasses import dataclass
from kafka import KafkaConsumer
import psycopg2

@dataclass
class TelematicsEvent:
    vehicle_id: str
    timestamp: datetime
    lat: float
    lon: float
    speed_kmh: float
    heading: float
    fuel_level: float
    engine_rpm: int
    odometer_km: float
    dtc_codes: list[str]

def consume_telematics(kafka_brokers: str, topic: str, db_conn_str: str):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=kafka_brokers,
        value_deserializer=lambda m: json.loads(m.decode("utf-8")),
        group_id="fleet-processor",
        auto_offset_reset="latest",
    )
    conn = psycopg2.connect(db_conn_str)

    batch = []
    for message in consumer:
        event = TelematicsEvent(**message.value)
        batch.append(event)

        if len(batch) >= 100:
            flush_to_db(conn, batch)
            batch = []

def flush_to_db(conn, events: list[TelematicsEvent]):
    with conn.cursor() as cur:
        args = [(e.vehicle_id, e.timestamp, e.lat, e.lon, e.speed_kmh,
                 e.heading, e.fuel_level, e.engine_rpm, e.odometer_km,
                 e.dtc_codes) for e in events]
        cur.executemany(
            """INSERT INTO telematics (vehicle_id, ts, lat, lon, speed, heading,
               fuel_level, engine_rpm, odometer, dtc_codes)
               VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
            args,
        )
    conn.commit()

For 500 vehicles reporting every 10 seconds, this produces 50 events/second — easily handled by a single Kafka partition. At 5,000+ vehicles, partition by vehicle_id hash for horizontal scaling.

Geofencing engine

Geofences trigger events when vehicles enter or exit defined polygons:

from shapely.geometry import Point, Polygon
from shapely.prepared import prep

class GeofenceEngine:
    def __init__(self):
        self.fences: dict[str, dict] = {}  # fence_id -> {polygon, name, type}
        self.vehicle_states: dict[str, set[str]] = {}  # vehicle_id -> set of fence_ids inside

    def add_fence(self, fence_id: str, name: str, coordinates: list[tuple[float, float]], fence_type: str = "zone"):
        polygon = Polygon(coordinates)
        self.fences[fence_id] = {
            "polygon": prep(polygon),
            "name": name,
            "type": fence_type,
        }

    def check_position(self, vehicle_id: str, lat: float, lon: float) -> list[dict]:
        point = Point(lon, lat)
        current_fences = set()
        events = []

        for fence_id, fence in self.fences.items():
            if fence["polygon"].contains(point):
                current_fences.add(fence_id)

        previous = self.vehicle_states.get(vehicle_id, set())

        # Enter events
        for fid in current_fences - previous:
            events.append({"vehicle_id": vehicle_id, "fence_id": fid,
                          "event": "enter", "fence_name": self.fences[fid]["name"]})

        # Exit events
        for fid in previous - current_fences:
            events.append({"vehicle_id": vehicle_id, "fence_id": fid,
                          "event": "exit", "fence_name": self.fences[fid]["name"]})

        self.vehicle_states[vehicle_id] = current_fences
        return events

Prepared geometries from Shapely provide fast point-in-polygon tests. For thousands of geofences, use an R-tree spatial index (shapely.strtree.STRtree) to filter candidates before exact tests.

ETA prediction

Simple ETA divides remaining distance by current speed. Production ETA accounts for traffic patterns:

import numpy as np
from datetime import timedelta

def predict_eta(
    current_lat: float, current_lon: float,
    destination_lat: float, destination_lon: float,
    remaining_route_km: float,
    current_speed_kmh: float,
    historical_speeds: dict[int, float],  # hour_of_day -> avg speed on this route
) -> timedelta:
    now_hour = datetime.now().hour

    # Weighted average of current speed and historical pattern
    historical_speed = historical_speeds.get(now_hour, current_speed_kmh)
    if current_speed_kmh > 5:
        effective_speed = 0.6 * current_speed_kmh + 0.4 * historical_speed
    else:
        effective_speed = historical_speed  # vehicle likely stopped

    if effective_speed < 1:
        return timedelta(hours=99)  # stuck

    hours = remaining_route_km / effective_speed
    return timedelta(hours=hours)

For more accuracy, segment the remaining route by road type and apply different speed models per segment using OSRM traffic data.

Predictive maintenance model

Train a classifier to predict breakdowns within the next 14 days:

import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import precision_score, recall_score

def build_maintenance_model(vehicle_features: pd.DataFrame) -> GradientBoostingClassifier:
    """
    Features per vehicle-week: avg_rpm, max_rpm, avg_coolant_temp, max_coolant_temp,
    fuel_consumption_rate, harsh_brake_count, total_km, days_since_last_service,
    km_since_last_service, active_dtc_count.
    Target: breakdown_within_14_days (0/1).
    """
    feature_cols = [
        "avg_rpm", "max_rpm", "avg_coolant_temp", "max_coolant_temp",
        "fuel_consumption_rate", "harsh_brake_count", "total_km",
        "days_since_last_service", "km_since_last_service", "active_dtc_count",
    ]
    X = vehicle_features[feature_cols]
    y = vehicle_features["breakdown_within_14_days"]

    # Time-aware split to prevent leakage
    tscv = TimeSeriesSplit(n_splits=5)

    best_model = None
    best_recall = 0

    for train_idx, test_idx in tscv.split(X):
        model = GradientBoostingClassifier(
            n_estimators=200, max_depth=5, learning_rate=0.1,
            min_samples_leaf=20, subsample=0.8,
        )
        model.fit(X.iloc[train_idx], y.iloc[train_idx])
        preds = model.predict(X.iloc[test_idx])
        recall = recall_score(y.iloc[test_idx], preds)
        precision = precision_score(y.iloc[test_idx], preds)

        if recall > best_recall:
            best_recall = recall
            best_model = model

    return best_model

Prioritize recall over precision — missing a breakdown is costlier than a false alarm that triggers an unnecessary inspection.

Driver scoring system

from dataclasses import dataclass

@dataclass
class DrivingEvent:
    event_type: str  # harsh_brake, rapid_accel, speeding, excessive_idle, hard_corner
    severity: float  # 0-1
    timestamp: datetime
    vehicle_id: str
    driver_id: str

def compute_driver_scores(
    events: list[DrivingEvent],
    distance_driven: dict[str, float],  # driver_id -> km
) -> dict[str, dict]:
    """Score drivers on a 0-100 scale (100 = perfect)."""
    penalties = {
        "harsh_brake": 3,
        "rapid_accel": 2,
        "speeding": 5,
        "excessive_idle": 1,
        "hard_corner": 2,
    }

    driver_deductions = {}
    for event in events:
        did = event.driver_id
        if did not in driver_deductions:
            driver_deductions[did] = 0
        driver_deductions[did] += penalties.get(event.event_type, 1) * event.severity

    scores = {}
    for driver_id, km in distance_driven.items():
        raw_deduction = driver_deductions.get(driver_id, 0)
        # Normalize by distance (events per 100 km)
        normalized = (raw_deduction / max(km, 1)) * 100
        score = max(0, 100 - normalized)
        scores[driver_id] = {
            "score": round(score, 1),
            "events": raw_deduction,
            "km_driven": round(km, 1),
            "events_per_100km": round(raw_deduction / max(km, 1) * 100, 2),
        }

    return scores

Normalizing by distance ensures that drivers covering more kilometers are not unfairly penalized for having more total events.

Fleet cost analytics

def compute_fleet_tco(vehicles: pd.DataFrame, months: int = 12) -> pd.DataFrame:
    """
    Columns: vehicle_id, acquisition_cost, monthly_lease, fuel_cost_per_km,
    maintenance_ytd, insurance_annual, km_driven_ytd, age_months.
    """
    v = vehicles.copy()
    v["fuel_cost"] = v["fuel_cost_per_km"] * v["km_driven_ytd"]
    v["depreciation"] = v["acquisition_cost"] * (1 - 0.85 ** (v["age_months"] / 12))
    v["total_cost"] = (
        v["monthly_lease"] * months
        + v["fuel_cost"]
        + v["maintenance_ytd"]
        + v["insurance_annual"]
    )
    v["cost_per_km"] = v["total_cost"] / v["km_driven_ytd"].clip(lower=1)

    # Flag vehicles where maintenance cost exceeds 40% of total — replacement candidates
    v["maintenance_ratio"] = v["maintenance_ytd"] / v["total_cost"]
    v["replacement_candidate"] = v["maintenance_ratio"] > 0.4

    return v[["vehicle_id", "total_cost", "cost_per_km", "maintenance_ratio", "replacement_candidate"]]

Real-time dashboard architecture

A production fleet dashboard stacks:

  1. Kafka — ingests raw telematics.
  2. Python stream processor — enriches events (geofence checks, anomaly detection), writes to TimescaleDB.
  3. FastAPI — serves REST endpoints for vehicle status, scores, and alerts.
  4. Redis — caches latest position per vehicle for sub-10ms dashboard updates.
  5. WebSocket — pushes position updates to the map UI in real time.
from fastapi import FastAPI, WebSocket
import redis.asyncio as redis

app = FastAPI()
redis_client = redis.from_url("redis://localhost")

@app.websocket("/ws/fleet-map")
async def fleet_map_stream(ws: WebSocket):
    await ws.accept()
    pubsub = redis_client.pubsub()
    await pubsub.subscribe("vehicle_positions")

    async for message in pubsub.listen():
        if message["type"] == "message":
            await ws.send_text(message["data"].decode())

Pitfalls

  • GPS drift indoors — vehicles inside warehouses or parking garages produce noisy positions. Filter positions with low satellite count or high HDOP values.
  • Ignoring time zones — a fleet spanning multiple time zones makes HOS calculations and reporting confusing. Store everything in UTC, convert for display.
  • Alert fatigue — too many notifications desensitize dispatchers. Prioritize alerts by severity and suppress repeated alerts for known ongoing issues.
  • Privacy concerns — tracking driver location raises legal issues in many jurisdictions. Consult local labor laws and be transparent with drivers about what data is collected and how it is used.

The one thing to remember: Production fleet management pipelines chain Kafka telematics ingestion, geospatial processing with Shapely, predictive maintenance with scikit-learn, and real-time dashboards to give fleet operators actionable visibility across hundreds of vehicles simultaneously.

pythonfleet-managementlogisticstelematics

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.