Python graphlib Topological Sort — Deep Dive

Algorithm: Kahn’s BFS topological sort

TopologicalSorter implements Kahn’s algorithm (1962), which works by repeatedly removing nodes with no incoming edges:

  1. Compute the in-degree (number of predecessors) for each node.
  2. Add all nodes with in-degree 0 to a “ready” set.
  3. Remove a ready node, output it, and decrement the in-degree of its successors.
  4. Any successor whose in-degree reaches 0 joins the ready set.
  5. If all nodes are output, the sort succeeded. If not, there’s a cycle.

This is an O(V + E) algorithm where V is the number of nodes and E is the number of edges.

Why Kahn’s over DFS?

The alternative topological sort uses depth-first search with post-order numbering. Both are O(V + E), but Kahn’s has a key advantage for TopologicalSorter: it naturally produces layers of independent nodes (each iteration of step 2-4 yields one layer). This maps directly to the parallel scheduling API — each call to get_ready() returns the current layer.

Internal data structures

Looking at CPython’s implementation (Lib/graphlib.py, ~200 lines):

class TopologicalSorter:
    def __init__(self, graph=None):
        self._node2info = {}  # node → _NodeInfo(npredecessors, successors)
        # ...

    class _NodeInfo:
        __slots__ = ("npredecessors", "successors")
        def __init__(self):
            self.npredecessors = 0
            self.successors = []

Each node stores:

  • npredecessors: count of unfinished predecessors (the in-degree)
  • successors: list of nodes that depend on this one

The prepare() method scans all nodes, collects those with npredecessors == 0, and initializes the ready queue. Each done(node) call decrements npredecessors for all successors and adds newly-ready nodes to the queue.

State machine

TopologicalSorter has internal states:

  • Unprepared: add() calls are allowed
  • Prepared: add() raises ValueError; get_ready()/done() are allowed
  • Finished: is_active() returns False

Once prepare() is called, the graph is frozen. This prevents race conditions in concurrent usage.

Thread safety

The internal state modifications (done(), get_ready()) are not thread-safe. For concurrent execution, protect access with a lock:

import threading
from graphlib import TopologicalSorter

lock = threading.Lock()
ts = TopologicalSorter(graph)
ts.prepare()

def worker():
    while True:
        with lock:
            if not ts.is_active():
                break
            batch = ts.get_ready()
            if not batch:
                continue

        for task in batch:
            execute(task)
            with lock:
                ts.done(task)

A cleaner pattern uses a queue:

import concurrent.futures
from graphlib import TopologicalSorter

def parallel_execute(graph, executor_fn, max_workers=4):
    ts = TopologicalSorter(graph)
    ts.prepare()
    results = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
        pending = {}

        while ts.is_active():
            for node in ts.get_ready():
                future = pool.submit(executor_fn, node)
                pending[future] = node

            if not pending:
                break

            done_futures, _ = concurrent.futures.wait(
                pending, return_when=concurrent.futures.FIRST_COMPLETED
            )

            for future in done_futures:
                node = pending.pop(future)
                results[node] = future.result()
                ts.done(node)

    return results

Async parallel scheduling

For asyncio-based systems:

import asyncio
from graphlib import TopologicalSorter

async def async_parallel_execute(graph, async_executor):
    ts = TopologicalSorter(graph)
    ts.prepare()
    results = {}

    while ts.is_active():
        ready = ts.get_ready()
        if not ready:
            break

        tasks = [async_executor(node) for node in ready]
        batch_results = await asyncio.gather(*tasks, return_exceptions=True)

        for node, result in zip(ready, batch_results):
            if isinstance(result, Exception):
                raise result
            results[node] = result
            ts.done(node)

    return results

This executes each layer of independent tasks concurrently, then moves to the next layer. The maximum parallelism equals the width of the widest layer in the DAG.

Building a production task runner

Here’s a complete task runner with error handling, timeouts, and retries:

import asyncio
import time
from dataclasses import dataclass, field
from graphlib import TopologicalSorter, CycleError
from enum import Enum, auto

class TaskStatus(Enum):
    PENDING = auto()
    RUNNING = auto()
    SUCCESS = auto()
    FAILED = auto()
    SKIPPED = auto()

@dataclass
class TaskResult:
    node: str
    status: TaskStatus
    duration: float = 0.0
    error: str = ""

class DAGRunner:
    def __init__(self, graph: dict, timeout: float = 300):
        self.graph = graph
        self.timeout = timeout
        self.results: dict[str, TaskResult] = {}
        self._skip_descendants_of: set = set()

    async def run(self, executor):
        try:
            ts = TopologicalSorter(self.graph)
            ts.prepare()
        except CycleError as e:
            raise RuntimeError(f"Dependency cycle detected: {e}")

        while ts.is_active():
            ready = ts.get_ready()
            if not ready:
                break

            tasks = []
            for node in ready:
                if self._should_skip(node):
                    self.results[node] = TaskResult(
                        node, TaskStatus.SKIPPED
                    )
                    ts.done(node)
                else:
                    tasks.append(self._run_node(node, executor))

            if tasks:
                await asyncio.gather(*tasks)
                for node in ready:
                    if node in self.results:
                        ts.done(node)

        return self.results

    def _should_skip(self, node):
        predecessors = self.graph.get(node, set())
        return any(p in self._skip_descendants_of for p in predecessors)

    async def _run_node(self, node, executor):
        start = time.monotonic()
        try:
            await asyncio.wait_for(
                executor(node), timeout=self.timeout
            )
            self.results[node] = TaskResult(
                node, TaskStatus.SUCCESS,
                duration=time.monotonic() - start
            )
        except asyncio.TimeoutError:
            self.results[node] = TaskResult(
                node, TaskStatus.FAILED,
                duration=self.timeout,
                error="Timeout"
            )
            self._skip_descendants_of.add(node)
        except Exception as e:
            self.results[node] = TaskResult(
                node, TaskStatus.FAILED,
                duration=time.monotonic() - start,
                error=str(e)
            )
            self._skip_descendants_of.add(node)

Key design decisions:

  • Failed tasks cause descendants to be skipped (fail-fast)
  • Timeouts prevent hung tasks from blocking the pipeline
  • Results include timing for observability

Advanced: dynamic graph modification

TopologicalSorter freezes the graph after prepare(). For dynamic workflows where tasks can add new dependencies, you need a custom implementation:

from collections import defaultdict

class DynamicTopoScheduler:
    def __init__(self):
        self.predecessors = defaultdict(set)
        self.successors = defaultdict(set)
        self.in_degree = defaultdict(int)
        self.done_nodes = set()

    def add(self, node, *deps):
        for dep in deps:
            if dep not in self.successors:
                self.successors[dep]  # ensure node exists
            self.predecessors[node].add(dep)
            self.successors[dep].add(node)
        self.in_degree[node] = len([d for d in self.predecessors[node]
                                     if d not in self.done_nodes])
        if node not in self.in_degree:
            self.in_degree[node] = 0

    def get_ready(self):
        return [n for n, deg in self.in_degree.items()
                if deg == 0 and n not in self.done_nodes]

    def mark_done(self, node):
        self.done_nodes.add(node)
        for succ in self.successors[node]:
            self.in_degree[succ] = max(0, self.in_degree[succ] - 1)

This allows adding nodes after execution has started — useful for workflow engines where tasks discover new work at runtime.

Performance characteristics

OperationTimeNotes
add(node, *deps)O(d)d = number of dependencies
prepare()O(V + E)Full graph scan
static_order()O(V + E)Single pass
get_ready()O(r)r = number of ready nodes
done(node)O(s)s = number of successors

For typical dependency graphs (hundreds to thousands of nodes), all operations are effectively instant. The practical bottleneck is always task execution time, not scheduling overhead.

Memory usage

Each node stores its info object (~100 bytes) plus successor list entries. For a graph with 10,000 nodes and 50,000 edges, memory usage is roughly 5-10 MB — trivial for modern systems.

Comparison with alternatives

FeaturegraphlibNetworkXcustom DFS
Standard library
Parallel schedulingManual
Cycle detectionManual
Weighted edgesManual
Dynamic modificationManual
Visualization
DependenciesNonenumpy, scipyNone

Use graphlib for dependency resolution and task scheduling. Use NetworkX when you need graph algorithms beyond topological sort (shortest paths, centrality, community detection).

Real-world applications

Build systems: Make, Bazel, and Gradle all use topological sort to determine build order. A Makefile’s dependency declarations map directly to TopologicalSorter.add() calls.

Package managers: pip, npm, and cargo resolve installation order using topological sort. Cycle detection catches circular dependencies before installation begins.

Spreadsheet engines: Excel’s calculation order is a topological sort of cell dependencies. When you change A1, Excel determines which cells to recalculate and in what order.

CI/CD pipelines: GitHub Actions, GitLab CI, and Jenkins pipelines with needs: declarations are DAGs executed via topological scheduling.

Pitfalls

  1. Forgetting prepare(). Calling get_ready() without prepare() raises ValueError. This is by design — it prevents accidental graph modification during execution.

  2. Adding nodes after prepare(). Raises ValueError. If you need dynamic graphs, use a custom scheduler or reset and re-prepare.

  3. Forgetting to call done(). If you process a node but don’t call done(), its dependents never become ready. is_active() returns True forever. Always call done() even if the task failed.

  4. Non-hashable nodes. Nodes must be hashable (usable as dict keys). Lists and dicts can’t be nodes; use strings, tuples, or frozen dataclasses.

  5. CycleError doesn’t show the full cycle. The error message includes only a portion of the cycle. For debugging complex graphs, extract cycles with DFS or use NetworkX’s find_cycle().

The one thing to remember: graphlib.TopologicalSorter is a zero-dependency task scheduler hiding in the standard library — its prepare/get_ready/done API maps directly to parallel execution patterns used by production build systems and CI/CD pipelines.

pythonstandard-libraryalgorithms

See Also

  • Python Atexit How Python's atexit module lets your program clean up after itself right before it shuts down.
  • Python Bisect Sorted Lists How Python's bisect module finds things in sorted lists the way you'd find a word in a dictionary — by jumping to the middle.
  • Python Contextlib How Python's contextlib module makes the 'with' statement work for anything, not just files.
  • Python Copy Module Why copying data in Python isn't as simple as it sounds, and how the copy module prevents sneaky bugs.
  • Python Dataclass Field Metadata How Python dataclass fields can carry hidden notes — like sticky notes on a filing cabinet that tools read automatically.