Logistics Scheduling in Python — Deep Dive

Production logistics scheduling solves tightly constrained assignment problems under time pressure. The solver must produce feasible, near-optimal schedules in seconds — and re-solve when disruptions hit. This guide implements dock scheduling, driver assignment with hours-of-service compliance, and reactive re-planning using OR-Tools CP-SAT.

Dock appointment scheduling

Warehouses with limited dock doors need to schedule inbound and outbound trucks so they do not queue excessively:

from ortools.sat.python import cp_model

def schedule_dock_appointments(
    trucks: list[dict],  # each: {id, earliest, latest, duration, dock_preference}
    num_docks: int,
    horizon: int = 1440,  # minutes in a day
) -> dict:
    model = cp_model.CpModel()

    starts = {}
    ends = {}
    intervals = {}
    dock_assignments = {}

    for t in trucks:
        tid = t["id"]
        starts[tid] = model.NewIntVar(t["earliest"], t["latest"] - t["duration"], f"start_{tid}")
        ends[tid] = model.NewIntVar(t["earliest"] + t["duration"], t["latest"], f"end_{tid}")
        intervals[tid] = model.NewIntervalVar(starts[tid], t["duration"], ends[tid], f"interval_{tid}")
        dock_assignments[tid] = model.NewIntVar(0, num_docks - 1, f"dock_{tid}")

    # No overlap per dock: create optional intervals per dock
    for d in range(num_docks):
        dock_intervals = []
        for t in trucks:
            tid = t["id"]
            is_on_dock = model.NewBoolVar(f"{tid}_on_dock_{d}")
            model.Add(dock_assignments[tid] == d).OnlyEnforceIf(is_on_dock)
            model.Add(dock_assignments[tid] != d).OnlyEnforceIf(is_on_dock.Not())
            opt_interval = model.NewOptionalIntervalVar(
                starts[tid], t["duration"], ends[tid], is_on_dock, f"opt_{tid}_{d}"
            )
            dock_intervals.append(opt_interval)
        model.AddNoOverlap(dock_intervals)

    # Respect dock preferences (soft constraint)
    preference_penalty = []
    for t in trucks:
        tid = t["id"]
        if "dock_preference" in t and t["dock_preference"] is not None:
            matches = model.NewBoolVar(f"pref_{tid}")
            model.Add(dock_assignments[tid] == t["dock_preference"]).OnlyEnforceIf(matches)
            model.Add(dock_assignments[tid] != t["dock_preference"]).OnlyEnforceIf(matches.Not())
            preference_penalty.append(matches.Not())

    # Minimize total wait time + preference violations
    total_wait = sum(starts[t["id"]] - t["earliest"] for t in trucks)
    model.Minimize(total_wait + 100 * sum(preference_penalty))

    solver = cp_model.CpSolver()
    solver.parameters.max_time_in_seconds = 10
    status = solver.Solve(model)

    if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
        schedule = {}
        for t in trucks:
            tid = t["id"]
            schedule[tid] = {
                "dock": solver.Value(dock_assignments[tid]),
                "start": solver.Value(starts[tid]),
                "end": solver.Value(ends[tid]),
                "wait_minutes": solver.Value(starts[tid]) - t["earliest"],
            }
        return schedule
    return None

Driver assignment with hours-of-service

European HOS rules: maximum 9 hours driving (extendable to 10 twice per week), 45-minute break after 4.5 hours, 11 hours daily rest.

def schedule_drivers(
    tasks: list[dict],      # {id, start_time, duration, location, requires_cdl}
    drivers: list[dict],    # {id, has_cdl, available_from, max_drive_hours, current_driven}
    max_daily_hours: int = 9 * 60,  # 540 minutes
    break_after: int = 270,          # 4.5 hours in minutes
    break_duration: int = 45,
) -> dict:
    model = cp_model.CpModel()
    horizon = 1440

    # Assignment variables: which driver handles which task
    assignments = {}
    for t in tasks:
        for d in drivers:
            key = (t["id"], d["id"])
            assignments[key] = model.NewBoolVar(f"assign_{t['id']}_{d['id']}")

    # Each task assigned to exactly one driver
    for t in tasks:
        model.AddExactlyOne(assignments[(t["id"], d["id"])] for d in drivers)

    # CDL requirement
    for t in tasks:
        if t.get("requires_cdl"):
            for d in drivers:
                if not d["has_cdl"]:
                    model.Add(assignments[(t["id"], d["id"])] == 0)

    # Driving time limit per driver
    for d in drivers:
        driver_tasks = []
        for t in tasks:
            scaled_duration = model.NewIntVar(0, horizon, f"dur_{t['id']}_{d['id']}")
            model.Add(scaled_duration == t["duration"]).OnlyEnforceIf(assignments[(t["id"], d["id"])])
            model.Add(scaled_duration == 0).OnlyEnforceIf(assignments[(t["id"], d["id"])].Not())
            driver_tasks.append(scaled_duration)

        remaining = max_daily_hours - d.get("current_driven", 0)
        model.Add(sum(driver_tasks) <= remaining)

    # Minimize makespan (latest task end)
    task_ends = []
    for t in tasks:
        task_ends.append(t["start_time"] + t["duration"])
    makespan = model.NewIntVar(0, horizon, "makespan")
    model.AddMaxEquality(makespan, task_ends)
    model.Minimize(makespan)

    solver = cp_model.CpSolver()
    solver.parameters.max_time_in_seconds = 15
    status = solver.Solve(model)

    if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
        result = {}
        for t in tasks:
            for d in drivers:
                if solver.Value(assignments[(t["id"], d["id"])]):
                    result[t["id"]] = d["id"]
        return result
    return None

Reactive re-scheduling

When disruptions occur mid-day, the scheduler must adapt without starting from scratch:

def reschedule_after_disruption(
    original_schedule: dict,
    completed_tasks: set[str],
    disruption: dict,  # {type, affected_resource, available_after}
    remaining_tasks: list[dict],
    drivers: list[dict],
) -> dict:
    """Re-schedule only incomplete tasks, fixing completed assignments."""
    # Filter to only tasks not yet done
    tasks_to_schedule = [t for t in remaining_tasks if t["id"] not in completed_tasks]

    # Update driver availability based on what they have already done
    for d in drivers:
        completed_by_driver = [
            t for t in remaining_tasks
            if t["id"] in completed_tasks and original_schedule.get(t["id"]) == d["id"]
        ]
        d["current_driven"] = sum(t["duration"] for t in completed_by_driver)
        d["available_from"] = max(
            d["available_from"],
            max((t["start_time"] + t["duration"] for t in completed_by_driver), default=0),
        )

    # If a resource is disrupted, mark it unavailable
    if disruption["type"] == "driver_unavailable":
        drivers = [d for d in drivers if d["id"] != disruption["affected_resource"]]
    elif disruption["type"] == "dock_closed":
        # Adjust task time windows that depend on the closed dock
        for t in tasks_to_schedule:
            if t.get("dock") == disruption["affected_resource"]:
                t["start_time"] = max(t["start_time"], disruption["available_after"])

    return schedule_drivers(tasks_to_schedule, drivers)

Key design decisions for reactive scheduling:

  1. Stability — minimize changes from the original schedule. Drivers and customers dislike frequent changes. Add a penalty for reassigning tasks from their original driver.
  2. Speed — re-scheduling must complete in under 60 seconds. Reduce the CP-SAT time limit and warm-start from the original solution when possible.
  3. Rolling horizon — only re-schedule the next 2-4 hours. Tasks further out will be re-scheduled again before execution.

Multi-objective scheduling

Real logistics scheduling balances competing objectives:

  • Minimize total cost (fuel + labor + penalties).
  • Maximize on-time delivery percentage.
  • Minimize driver overtime.
  • Balance workload fairness.

CP-SAT handles this through weighted combination:

cost_weight = 1
tardiness_weight = 10
fairness_weight = 5

model.Minimize(
    cost_weight * total_cost
    + tardiness_weight * total_tardiness
    + fairness_weight * workload_variance
)

Tuning weights requires business input. A premium logistics provider might set tardiness_weight very high; a cost-focused carrier emphasizes cost_weight.

Integration patterns

Production scheduling systems fit into the logistics stack as:

  1. Batch planner — runs nightly, produces the next day’s schedule. Dispatchers review and adjust before releasing to drivers.
  2. Real-time API — FastAPI endpoint that accepts new orders or disruptions and returns updated schedules within seconds.
  3. Event-driven pipeline — Kafka or Redis streams feed order events to the scheduler, which publishes updated assignments. Drivers receive push notifications.
from fastapi import FastAPI

app = FastAPI()

@app.post("/reschedule")
async def handle_disruption(disruption: dict):
    current = await load_current_schedule()
    completed = await get_completed_tasks()
    remaining = await get_remaining_tasks()
    drivers = await get_available_drivers()

    new_schedule = reschedule_after_disruption(
        current, completed, disruption, remaining, drivers
    )
    await publish_schedule_updates(new_schedule)
    return {"status": "rescheduled", "affected_tasks": len(new_schedule)}

Pitfalls

  • Ignoring travel time between tasks — a driver finishing a delivery across town cannot instantly start loading at the warehouse. Include inter-task travel in the model.
  • Over-constraining — too many hard constraints produce infeasible models. Convert nice-to-have rules into soft constraints with penalties.
  • Deterministic planning for stochastic operations — task durations vary. Build buffer time (10-15%) into estimated durations rather than planning to 100% utilization.
  • Not versioning schedules — when re-scheduling happens multiple times per day, keep an audit trail. Drivers need to see what changed and why.

The one thing to remember: Production logistics scheduling combines CP-SAT constraint models for dock, driver, and task assignment with a reactive re-scheduling layer that adapts to disruptions in real time while maintaining schedule stability.

pythonlogisticsschedulingoperations-research

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.