Python graphlib Topological Sort — Deep Dive
Algorithm: Kahn’s BFS topological sort
TopologicalSorter implements Kahn’s algorithm (1962), which works by repeatedly removing nodes with no incoming edges:
- Compute the in-degree (number of predecessors) for each node.
- Add all nodes with in-degree 0 to a “ready” set.
- Remove a ready node, output it, and decrement the in-degree of its successors.
- Any successor whose in-degree reaches 0 joins the ready set.
- If all nodes are output, the sort succeeded. If not, there’s a cycle.
This is an O(V + E) algorithm where V is the number of nodes and E is the number of edges.
Why Kahn’s over DFS?
The alternative topological sort uses depth-first search with post-order numbering. Both are O(V + E), but Kahn’s has a key advantage for TopologicalSorter: it naturally produces layers of independent nodes (each iteration of step 2-4 yields one layer). This maps directly to the parallel scheduling API — each call to get_ready() returns the current layer.
Internal data structures
Looking at CPython’s implementation (Lib/graphlib.py, ~200 lines):
class TopologicalSorter:
def __init__(self, graph=None):
self._node2info = {} # node → _NodeInfo(npredecessors, successors)
# ...
class _NodeInfo:
__slots__ = ("npredecessors", "successors")
def __init__(self):
self.npredecessors = 0
self.successors = []
Each node stores:
npredecessors: count of unfinished predecessors (the in-degree)successors: list of nodes that depend on this one
The prepare() method scans all nodes, collects those with npredecessors == 0, and initializes the ready queue. Each done(node) call decrements npredecessors for all successors and adds newly-ready nodes to the queue.
State machine
TopologicalSorter has internal states:
- Unprepared:
add()calls are allowed - Prepared:
add()raisesValueError;get_ready()/done()are allowed - Finished:
is_active()returns False
Once prepare() is called, the graph is frozen. This prevents race conditions in concurrent usage.
Thread safety
The internal state modifications (done(), get_ready()) are not thread-safe. For concurrent execution, protect access with a lock:
import threading
from graphlib import TopologicalSorter
lock = threading.Lock()
ts = TopologicalSorter(graph)
ts.prepare()
def worker():
while True:
with lock:
if not ts.is_active():
break
batch = ts.get_ready()
if not batch:
continue
for task in batch:
execute(task)
with lock:
ts.done(task)
A cleaner pattern uses a queue:
import concurrent.futures
from graphlib import TopologicalSorter
def parallel_execute(graph, executor_fn, max_workers=4):
ts = TopologicalSorter(graph)
ts.prepare()
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
pending = {}
while ts.is_active():
for node in ts.get_ready():
future = pool.submit(executor_fn, node)
pending[future] = node
if not pending:
break
done_futures, _ = concurrent.futures.wait(
pending, return_when=concurrent.futures.FIRST_COMPLETED
)
for future in done_futures:
node = pending.pop(future)
results[node] = future.result()
ts.done(node)
return results
Async parallel scheduling
For asyncio-based systems:
import asyncio
from graphlib import TopologicalSorter
async def async_parallel_execute(graph, async_executor):
ts = TopologicalSorter(graph)
ts.prepare()
results = {}
while ts.is_active():
ready = ts.get_ready()
if not ready:
break
tasks = [async_executor(node) for node in ready]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
for node, result in zip(ready, batch_results):
if isinstance(result, Exception):
raise result
results[node] = result
ts.done(node)
return results
This executes each layer of independent tasks concurrently, then moves to the next layer. The maximum parallelism equals the width of the widest layer in the DAG.
Building a production task runner
Here’s a complete task runner with error handling, timeouts, and retries:
import asyncio
import time
from dataclasses import dataclass, field
from graphlib import TopologicalSorter, CycleError
from enum import Enum, auto
class TaskStatus(Enum):
PENDING = auto()
RUNNING = auto()
SUCCESS = auto()
FAILED = auto()
SKIPPED = auto()
@dataclass
class TaskResult:
node: str
status: TaskStatus
duration: float = 0.0
error: str = ""
class DAGRunner:
def __init__(self, graph: dict, timeout: float = 300):
self.graph = graph
self.timeout = timeout
self.results: dict[str, TaskResult] = {}
self._skip_descendants_of: set = set()
async def run(self, executor):
try:
ts = TopologicalSorter(self.graph)
ts.prepare()
except CycleError as e:
raise RuntimeError(f"Dependency cycle detected: {e}")
while ts.is_active():
ready = ts.get_ready()
if not ready:
break
tasks = []
for node in ready:
if self._should_skip(node):
self.results[node] = TaskResult(
node, TaskStatus.SKIPPED
)
ts.done(node)
else:
tasks.append(self._run_node(node, executor))
if tasks:
await asyncio.gather(*tasks)
for node in ready:
if node in self.results:
ts.done(node)
return self.results
def _should_skip(self, node):
predecessors = self.graph.get(node, set())
return any(p in self._skip_descendants_of for p in predecessors)
async def _run_node(self, node, executor):
start = time.monotonic()
try:
await asyncio.wait_for(
executor(node), timeout=self.timeout
)
self.results[node] = TaskResult(
node, TaskStatus.SUCCESS,
duration=time.monotonic() - start
)
except asyncio.TimeoutError:
self.results[node] = TaskResult(
node, TaskStatus.FAILED,
duration=self.timeout,
error="Timeout"
)
self._skip_descendants_of.add(node)
except Exception as e:
self.results[node] = TaskResult(
node, TaskStatus.FAILED,
duration=time.monotonic() - start,
error=str(e)
)
self._skip_descendants_of.add(node)
Key design decisions:
- Failed tasks cause descendants to be skipped (fail-fast)
- Timeouts prevent hung tasks from blocking the pipeline
- Results include timing for observability
Advanced: dynamic graph modification
TopologicalSorter freezes the graph after prepare(). For dynamic workflows where tasks can add new dependencies, you need a custom implementation:
from collections import defaultdict
class DynamicTopoScheduler:
def __init__(self):
self.predecessors = defaultdict(set)
self.successors = defaultdict(set)
self.in_degree = defaultdict(int)
self.done_nodes = set()
def add(self, node, *deps):
for dep in deps:
if dep not in self.successors:
self.successors[dep] # ensure node exists
self.predecessors[node].add(dep)
self.successors[dep].add(node)
self.in_degree[node] = len([d for d in self.predecessors[node]
if d not in self.done_nodes])
if node not in self.in_degree:
self.in_degree[node] = 0
def get_ready(self):
return [n for n, deg in self.in_degree.items()
if deg == 0 and n not in self.done_nodes]
def mark_done(self, node):
self.done_nodes.add(node)
for succ in self.successors[node]:
self.in_degree[succ] = max(0, self.in_degree[succ] - 1)
This allows adding nodes after execution has started — useful for workflow engines where tasks discover new work at runtime.
Performance characteristics
| Operation | Time | Notes |
|---|---|---|
add(node, *deps) | O(d) | d = number of dependencies |
prepare() | O(V + E) | Full graph scan |
static_order() | O(V + E) | Single pass |
get_ready() | O(r) | r = number of ready nodes |
done(node) | O(s) | s = number of successors |
For typical dependency graphs (hundreds to thousands of nodes), all operations are effectively instant. The practical bottleneck is always task execution time, not scheduling overhead.
Memory usage
Each node stores its info object (~100 bytes) plus successor list entries. For a graph with 10,000 nodes and 50,000 edges, memory usage is roughly 5-10 MB — trivial for modern systems.
Comparison with alternatives
| Feature | graphlib | NetworkX | custom DFS |
|---|---|---|---|
| Standard library | ✅ | ❌ | ❌ |
| Parallel scheduling | ✅ | ❌ | Manual |
| Cycle detection | ✅ | ✅ | Manual |
| Weighted edges | ❌ | ✅ | Manual |
| Dynamic modification | ❌ | ✅ | Manual |
| Visualization | ❌ | ✅ | ❌ |
| Dependencies | None | numpy, scipy | None |
Use graphlib for dependency resolution and task scheduling. Use NetworkX when you need graph algorithms beyond topological sort (shortest paths, centrality, community detection).
Real-world applications
Build systems: Make, Bazel, and Gradle all use topological sort to determine build order. A Makefile’s dependency declarations map directly to TopologicalSorter.add() calls.
Package managers: pip, npm, and cargo resolve installation order using topological sort. Cycle detection catches circular dependencies before installation begins.
Spreadsheet engines: Excel’s calculation order is a topological sort of cell dependencies. When you change A1, Excel determines which cells to recalculate and in what order.
CI/CD pipelines: GitHub Actions, GitLab CI, and Jenkins pipelines with needs: declarations are DAGs executed via topological scheduling.
Pitfalls
-
Forgetting
prepare(). Callingget_ready()withoutprepare()raisesValueError. This is by design — it prevents accidental graph modification during execution. -
Adding nodes after
prepare(). RaisesValueError. If you need dynamic graphs, use a custom scheduler or reset and re-prepare. -
Forgetting to call
done(). If you process a node but don’t calldone(), its dependents never become ready.is_active()returns True forever. Always calldone()even if the task failed. -
Non-hashable nodes. Nodes must be hashable (usable as dict keys). Lists and dicts can’t be nodes; use strings, tuples, or frozen dataclasses.
-
CycleError doesn’t show the full cycle. The error message includes only a portion of the cycle. For debugging complex graphs, extract cycles with DFS or use NetworkX’s
find_cycle().
The one thing to remember: graphlib.TopologicalSorter is a zero-dependency task scheduler hiding in the standard library — its prepare/get_ready/done API maps directly to parallel execution patterns used by production build systems and CI/CD pipelines.
See Also
- Python Atexit How Python's atexit module lets your program clean up after itself right before it shuts down.
- Python Bisect Sorted Lists How Python's bisect module finds things in sorted lists the way you'd find a word in a dictionary — by jumping to the middle.
- Python Contextlib How Python's contextlib module makes the 'with' statement work for anything, not just files.
- Python Copy Module Why copying data in Python isn't as simple as it sounds, and how the copy module prevents sneaky bugs.
- Python Dataclass Field Metadata How Python dataclass fields can carry hidden notes — like sticky notes on a filing cabinet that tools read automatically.