Logistics Scheduling in Python — Deep Dive
Production logistics scheduling solves tightly constrained assignment problems under time pressure. The solver must produce feasible, near-optimal schedules in seconds — and re-solve when disruptions hit. This guide implements dock scheduling, driver assignment with hours-of-service compliance, and reactive re-planning using OR-Tools CP-SAT.
Dock appointment scheduling
Warehouses with limited dock doors need to schedule inbound and outbound trucks so they do not queue excessively:
from ortools.sat.python import cp_model
def schedule_dock_appointments(
trucks: list[dict], # each: {id, earliest, latest, duration, dock_preference}
num_docks: int,
horizon: int = 1440, # minutes in a day
) -> dict:
model = cp_model.CpModel()
starts = {}
ends = {}
intervals = {}
dock_assignments = {}
for t in trucks:
tid = t["id"]
starts[tid] = model.NewIntVar(t["earliest"], t["latest"] - t["duration"], f"start_{tid}")
ends[tid] = model.NewIntVar(t["earliest"] + t["duration"], t["latest"], f"end_{tid}")
intervals[tid] = model.NewIntervalVar(starts[tid], t["duration"], ends[tid], f"interval_{tid}")
dock_assignments[tid] = model.NewIntVar(0, num_docks - 1, f"dock_{tid}")
# No overlap per dock: create optional intervals per dock
for d in range(num_docks):
dock_intervals = []
for t in trucks:
tid = t["id"]
is_on_dock = model.NewBoolVar(f"{tid}_on_dock_{d}")
model.Add(dock_assignments[tid] == d).OnlyEnforceIf(is_on_dock)
model.Add(dock_assignments[tid] != d).OnlyEnforceIf(is_on_dock.Not())
opt_interval = model.NewOptionalIntervalVar(
starts[tid], t["duration"], ends[tid], is_on_dock, f"opt_{tid}_{d}"
)
dock_intervals.append(opt_interval)
model.AddNoOverlap(dock_intervals)
# Respect dock preferences (soft constraint)
preference_penalty = []
for t in trucks:
tid = t["id"]
if "dock_preference" in t and t["dock_preference"] is not None:
matches = model.NewBoolVar(f"pref_{tid}")
model.Add(dock_assignments[tid] == t["dock_preference"]).OnlyEnforceIf(matches)
model.Add(dock_assignments[tid] != t["dock_preference"]).OnlyEnforceIf(matches.Not())
preference_penalty.append(matches.Not())
# Minimize total wait time + preference violations
total_wait = sum(starts[t["id"]] - t["earliest"] for t in trucks)
model.Minimize(total_wait + 100 * sum(preference_penalty))
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = 10
status = solver.Solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
schedule = {}
for t in trucks:
tid = t["id"]
schedule[tid] = {
"dock": solver.Value(dock_assignments[tid]),
"start": solver.Value(starts[tid]),
"end": solver.Value(ends[tid]),
"wait_minutes": solver.Value(starts[tid]) - t["earliest"],
}
return schedule
return None
Driver assignment with hours-of-service
European HOS rules: maximum 9 hours driving (extendable to 10 twice per week), 45-minute break after 4.5 hours, 11 hours daily rest.
def schedule_drivers(
tasks: list[dict], # {id, start_time, duration, location, requires_cdl}
drivers: list[dict], # {id, has_cdl, available_from, max_drive_hours, current_driven}
max_daily_hours: int = 9 * 60, # 540 minutes
break_after: int = 270, # 4.5 hours in minutes
break_duration: int = 45,
) -> dict:
model = cp_model.CpModel()
horizon = 1440
# Assignment variables: which driver handles which task
assignments = {}
for t in tasks:
for d in drivers:
key = (t["id"], d["id"])
assignments[key] = model.NewBoolVar(f"assign_{t['id']}_{d['id']}")
# Each task assigned to exactly one driver
for t in tasks:
model.AddExactlyOne(assignments[(t["id"], d["id"])] for d in drivers)
# CDL requirement
for t in tasks:
if t.get("requires_cdl"):
for d in drivers:
if not d["has_cdl"]:
model.Add(assignments[(t["id"], d["id"])] == 0)
# Driving time limit per driver
for d in drivers:
driver_tasks = []
for t in tasks:
scaled_duration = model.NewIntVar(0, horizon, f"dur_{t['id']}_{d['id']}")
model.Add(scaled_duration == t["duration"]).OnlyEnforceIf(assignments[(t["id"], d["id"])])
model.Add(scaled_duration == 0).OnlyEnforceIf(assignments[(t["id"], d["id"])].Not())
driver_tasks.append(scaled_duration)
remaining = max_daily_hours - d.get("current_driven", 0)
model.Add(sum(driver_tasks) <= remaining)
# Minimize makespan (latest task end)
task_ends = []
for t in tasks:
task_ends.append(t["start_time"] + t["duration"])
makespan = model.NewIntVar(0, horizon, "makespan")
model.AddMaxEquality(makespan, task_ends)
model.Minimize(makespan)
solver = cp_model.CpSolver()
solver.parameters.max_time_in_seconds = 15
status = solver.Solve(model)
if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
result = {}
for t in tasks:
for d in drivers:
if solver.Value(assignments[(t["id"], d["id"])]):
result[t["id"]] = d["id"]
return result
return None
Reactive re-scheduling
When disruptions occur mid-day, the scheduler must adapt without starting from scratch:
def reschedule_after_disruption(
original_schedule: dict,
completed_tasks: set[str],
disruption: dict, # {type, affected_resource, available_after}
remaining_tasks: list[dict],
drivers: list[dict],
) -> dict:
"""Re-schedule only incomplete tasks, fixing completed assignments."""
# Filter to only tasks not yet done
tasks_to_schedule = [t for t in remaining_tasks if t["id"] not in completed_tasks]
# Update driver availability based on what they have already done
for d in drivers:
completed_by_driver = [
t for t in remaining_tasks
if t["id"] in completed_tasks and original_schedule.get(t["id"]) == d["id"]
]
d["current_driven"] = sum(t["duration"] for t in completed_by_driver)
d["available_from"] = max(
d["available_from"],
max((t["start_time"] + t["duration"] for t in completed_by_driver), default=0),
)
# If a resource is disrupted, mark it unavailable
if disruption["type"] == "driver_unavailable":
drivers = [d for d in drivers if d["id"] != disruption["affected_resource"]]
elif disruption["type"] == "dock_closed":
# Adjust task time windows that depend on the closed dock
for t in tasks_to_schedule:
if t.get("dock") == disruption["affected_resource"]:
t["start_time"] = max(t["start_time"], disruption["available_after"])
return schedule_drivers(tasks_to_schedule, drivers)
Key design decisions for reactive scheduling:
- Stability — minimize changes from the original schedule. Drivers and customers dislike frequent changes. Add a penalty for reassigning tasks from their original driver.
- Speed — re-scheduling must complete in under 60 seconds. Reduce the CP-SAT time limit and warm-start from the original solution when possible.
- Rolling horizon — only re-schedule the next 2-4 hours. Tasks further out will be re-scheduled again before execution.
Multi-objective scheduling
Real logistics scheduling balances competing objectives:
- Minimize total cost (fuel + labor + penalties).
- Maximize on-time delivery percentage.
- Minimize driver overtime.
- Balance workload fairness.
CP-SAT handles this through weighted combination:
cost_weight = 1
tardiness_weight = 10
fairness_weight = 5
model.Minimize(
cost_weight * total_cost
+ tardiness_weight * total_tardiness
+ fairness_weight * workload_variance
)
Tuning weights requires business input. A premium logistics provider might set tardiness_weight very high; a cost-focused carrier emphasizes cost_weight.
Integration patterns
Production scheduling systems fit into the logistics stack as:
- Batch planner — runs nightly, produces the next day’s schedule. Dispatchers review and adjust before releasing to drivers.
- Real-time API — FastAPI endpoint that accepts new orders or disruptions and returns updated schedules within seconds.
- Event-driven pipeline — Kafka or Redis streams feed order events to the scheduler, which publishes updated assignments. Drivers receive push notifications.
from fastapi import FastAPI
app = FastAPI()
@app.post("/reschedule")
async def handle_disruption(disruption: dict):
current = await load_current_schedule()
completed = await get_completed_tasks()
remaining = await get_remaining_tasks()
drivers = await get_available_drivers()
new_schedule = reschedule_after_disruption(
current, completed, disruption, remaining, drivers
)
await publish_schedule_updates(new_schedule)
return {"status": "rescheduled", "affected_tasks": len(new_schedule)}
Pitfalls
- Ignoring travel time between tasks — a driver finishing a delivery across town cannot instantly start loading at the warehouse. Include inter-task travel in the model.
- Over-constraining — too many hard constraints produce infeasible models. Convert nice-to-have rules into soft constraints with penalties.
- Deterministic planning for stochastic operations — task durations vary. Build buffer time (10-15%) into estimated durations rather than planning to 100% utilization.
- Not versioning schedules — when re-scheduling happens multiple times per day, keep an audit trail. Drivers need to see what changed and why.
The one thing to remember: Production logistics scheduling combines CP-SAT constraint models for dock, driver, and task assignment with a reactive re-scheduling layer that adapts to disruptions in real time while maintaining schedule stability.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.