Python Work Stealing Scheduler — Deep Dive

The theory behind work stealing

Work stealing was formalized in the Cilk system (Blumofe and Leiserson, 1999). The key theorem: a work-stealing scheduler executes a computation with work W and span S on P processors in expected time W/P + O(S). This means it achieves near-optimal speedup as long as there’s enough parallelism (W >> S·P).

The randomized stealing strategy is crucial — a thief picks a victim uniformly at random. This prevents herding (all thieves targeting the same busy worker) and provides probabilistic load balance guarantees.

The Chase-Lev work-stealing deque

The classic data structure is the Chase-Lev deque, which allows lock-free operations for the owner and uses atomic compare-and-swap for thieves:

import threading
from collections import deque
from typing import Any, Optional

class WorkStealingDeque:
    """Simplified work-stealing deque for demonstration.

    In production, this would use atomic operations
    instead of a lock for the steal path.
    """

    def __init__(self):
        self._items: deque = deque()
        self._lock = threading.Lock()

    def push_bottom(self, task: Any) -> None:
        """Owner pushes a task (no contention expected)."""
        with self._lock:
            self._items.append(task)

    def pop_bottom(self) -> Optional[Any]:
        """Owner pops a task from their end."""
        with self._lock:
            if self._items:
                return self._items.pop()
            return None

    def steal_top(self) -> Optional[Any]:
        """Thief steals from the opposite end."""
        with self._lock:
            if self._items:
                return self._items.popleft()
            return None

    def size(self) -> int:
        return len(self._items)

In a real C implementation, push_bottom and pop_bottom are lock-free (only the owner touches the bottom), and steal_top uses a single CAS operation. Python’s GIL actually makes the threading version simpler since we can’t have true data races on Python objects, but the conceptual separation matters for understanding.

Building a complete work-stealing scheduler

import threading
import random
import time
from typing import Callable, List, Optional

class Task:
    def __init__(self, fn: Callable, *args):
        self.fn = fn
        self.args = args

    def execute(self, scheduler: "WorkStealingScheduler"):
        return self.fn(scheduler, *self.args)


class Worker(threading.Thread):
    def __init__(self, worker_id: int, scheduler: "WorkStealingScheduler"):
        super().__init__(daemon=True)
        self.worker_id = worker_id
        self.scheduler = scheduler
        self.deque = WorkStealingDeque()
        self.tasks_executed = 0
        self.tasks_stolen = 0
        self._running = True

    def run(self):
        while self._running:
            # Try own deque first
            task = self.deque.pop_bottom()
            if task is None:
                # Try stealing from a random victim
                task = self._try_steal()
            if task is not None:
                task.execute(self.scheduler)
                self.tasks_executed += 1
            else:
                # Backoff to avoid spinning
                time.sleep(0.0001)

    def _try_steal(self) -> Optional[Task]:
        workers = self.scheduler.workers
        if len(workers) <= 1:
            return None
        # Random victim selection
        victims = [w for w in workers if w is not self]
        victim = random.choice(victims)
        task = victim.deque.steal_top()
        if task is not None:
            self.tasks_stolen += 1
        return task

    def stop(self):
        self._running = False


class WorkStealingScheduler:
    def __init__(self, num_workers: int = 4):
        self.workers: List[Worker] = []
        self._next_worker = 0
        self._lock = threading.Lock()

        for i in range(num_workers):
            w = Worker(i, self)
            self.workers.append(w)

    def start(self):
        for w in self.workers:
            w.start()

    def submit(self, task: Task, worker_id: Optional[int] = None):
        """Submit a task to a specific worker or round-robin."""
        if worker_id is not None:
            self.workers[worker_id].deque.push_bottom(task)
        else:
            with self._lock:
                idx = self._next_worker % len(self.workers)
                self._next_worker += 1
            self.workers[idx].deque.push_bottom(task)

    def spawn(self, task: Task):
        """Called from within a task to spawn child tasks.
        Pushes onto the current thread's worker deque."""
        current = threading.current_thread()
        if isinstance(current, Worker):
            current.deque.push_bottom(task)
        else:
            self.submit(task)

    def shutdown(self, wait: bool = True):
        for w in self.workers:
            w.stop()
        if wait:
            for w in self.workers:
                w.join(timeout=2)

    def stats(self) -> dict:
        return {
            f"worker-{w.worker_id}": {
                "executed": w.tasks_executed,
                "stolen": w.tasks_stolen,
                "pending": w.deque.size(),
            }
            for w in self.workers
        }

Fork-join with work stealing

The classic use case is divide-and-conquer algorithms where tasks spawn subtasks:

result_store = {}
result_lock = threading.Lock()

def parallel_sum(scheduler, data, task_id):
    if len(data) <= 100:
        # Base case: compute directly
        total = sum(data)
        with result_lock:
            result_store[task_id] = total
        return

    mid = len(data) // 2
    left_id = f"{task_id}_L"
    right_id = f"{task_id}_R"

    # Spawn left half as a new task (may be stolen)
    scheduler.spawn(Task(parallel_sum, data[:mid], left_id))
    # Execute right half directly (locality)
    parallel_sum(scheduler, data[mid:], right_id)

The key insight: the spawned task goes onto the local deque and may be stolen by an idle worker. The current worker continues with the other half. This naturally distributes work across available processors.

How Dask implements work stealing

Dask’s distributed scheduler maintains a task graph where each task has dependencies and a target worker. Work stealing activates when:

  1. A worker finishes all assigned tasks
  2. The scheduler detects imbalanced queue lengths
  3. Task transfer cost (data movement) is less than the expected compute savings

Dask’s stealing is cost-aware: it considers the size of task inputs that would need to be transferred. A task requiring a 10GB DataFrame is unlikely to be stolen because the network transfer would negate any benefit.

# Dask automatically uses work stealing
import dask.bag as db

# These tasks will be rebalanced across workers
results = (
    db.from_sequence(range(10000), npartitions=100)
    .map(lambda x: x ** 2 + expensive_function(x))
    .compute()
)

Ray’s scheduling approach

Ray combines work stealing with a two-level scheduling hierarchy:

  • Global scheduler: assigns tasks to nodes based on resource requirements and data locality
  • Local scheduler (per node): manages tasks within a node, uses work stealing between local worker processes
  • Spillback: if a node is overloaded, it returns tasks to the global scheduler for reassignment
import ray

@ray.remote
def process_chunk(data):
    # Ray handles scheduling and stealing
    return heavy_computation(data)

# Launch many tasks — Ray steals work between nodes
futures = [process_chunk.remote(chunk) for chunk in chunks]
results = ray.get(futures)

Performance characteristics and tuning

ParameterEffectRecommendation
Number of workersMore workers = more parallelism but more steal attemptsMatch CPU cores
Task granularityToo fine = overhead dominates; too coarse = poor balance1-10ms per task
Steal backoffExponential backoff reduces contention when all workers are busyStart at 100μs, cap at 10ms
Victim selectionRandom is theoretically optimal for general workloadsUse random selection

Steal-half optimization

Instead of stealing one task at a time, steal half of the victim’s deque. This reduces the number of steal operations and amortizes the synchronization cost:

def steal_half(self) -> list:
    """Steal half the tasks from the top of the deque."""
    with self._lock:
        count = len(self._items) // 2
        stolen = []
        for _ in range(count):
            if self._items:
                stolen.append(self._items.popleft())
        return stolen

Java’s ForkJoinPool uses this optimization. It’s particularly effective when task counts are high and individual tasks are small.

Limitations in Python

The GIL limits the effectiveness of thread-based work stealing in CPU-bound Python code. The scheduler overhead runs in parallel (acquiring deque locks), but actual task execution is serialized. Solutions:

  • Use multiprocessing for CPU-bound work (each process has its own GIL)
  • Use work stealing primarily for I/O-bound tasks where threads yield the GIL
  • Consider Cython or C extensions for the hot paths

For CPU-bound parallelism in Python, Dask and Ray’s multi-process architecture bypasses the GIL entirely while still using work-stealing principles.

The one thing to remember: work stealing achieves near-optimal load balancing by giving each worker a private deque and letting idle workers steal from busy ones. The randomized stealing strategy and per-worker deques minimize contention, making it the algorithm of choice for parallel runtimes from Java’s ForkJoinPool to Python’s Dask and Ray.

pythonadvancedconcurrency

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.