Python Cooperative Scheduling — Deep Dive
Event loop internals
Python’s asyncio event loop (implemented in _asyncio C module or the pure-Python fallback) maintains several data structures:
- Ready queue: tasks that can run right now (FIFO deque)
- Scheduled callbacks: timer-based callbacks sorted by deadline (heap)
- I/O watchers: file descriptors registered with the OS selector (epoll/kqueue/IOCP)
Each iteration of the loop:
# Simplified event loop pseudocode
def _run_once(self):
# 1. Calculate how long we can block in the selector
timeout = self._calculate_timeout()
# 2. Poll for I/O events
events = self._selector.select(timeout)
# 3. Process I/O events → add callbacks to ready queue
for key, mask in events:
self._process_event(key, mask)
# 4. Process scheduled callbacks whose time has come
now = time.monotonic()
while self._scheduled and self._scheduled[0].when <= now:
handle = heapq.heappop(self._scheduled)
self._ready.append(handle)
# 5. Run all ready callbacks
ntodo = len(self._ready)
for _ in range(ntodo):
handle = self._ready.popleft()
handle._run()
The key insight: step 5 runs a fixed number of callbacks (the ones ready at the start of this iteration). Callbacks added during execution go to the back of the queue, ensuring fairness within a single loop iteration.
Task states and transitions
┌─────────┐
create ──────▶ PENDING │
└────┬────┘
│ event loop picks it up
┌────▼────┐
┌────▶ RUNNING │
│ └────┬────┘
│ │ hits await
│ ┌────▼────┐
│ │ WAITING │ (suspended on I/O, timer, etc.)
│ └────┬────┘
│ │ event fires
└─────────┘
│ coroutine returns
┌────▼────┐
│ DONE │
└─────────┘
A task in RUNNING state has exclusive access to the event loop thread. No other task can be RUNNING simultaneously. This single-thread guarantee is what makes cooperative scheduling safe without locks.
How await actually works
Under the hood, await uses Python’s generator protocol. A coroutine is a generator that yields special objects to the event loop:
# What the interpreter roughly does with:
# result = await some_coroutine()
_coro = some_coroutine()
try:
_yielded = _coro.send(None)
while True:
# _yielded is a Future or another awaitable
# The event loop suspends this task and schedules
# a callback to resume it when _yielded resolves
_result = yield _yielded # suspend point
_yielded = _coro.send(_result)
except StopIteration as e:
result = e.value
The yield from the inner coroutine propagates up through the await chain until it reaches the event loop, which registers the appropriate I/O watcher or timer callback.
Starvation detection
A common production issue: a coroutine blocks the event loop without awaiting. You can detect this by monitoring loop iteration time:
import asyncio
import time
import logging
logger = logging.getLogger(__name__)
class SlowCallbackMonitor:
"""Detects coroutines that block the event loop too long."""
def __init__(self, threshold_ms: float = 100):
self.threshold = threshold_ms / 1000
self._original_run_once = None
def install(self, loop: asyncio.AbstractEventLoop):
# asyncio has built-in slow callback logging
loop.slow_callback_duration = self.threshold
# Also set the debug mode for detailed reporting
loop.set_debug(True)
# Simpler approach: use asyncio's built-in debug mode
async def main():
loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1 # 100ms threshold
# This will log a warning:
# "Executing <Task ...> took 2.5 seconds"
await cpu_heavy_task()
asyncio.run(main(), debug=True)
In production, wrap this with metrics: track the 99th percentile of callback durations and alert when it exceeds your latency budget.
Voluntary yielding for CPU-bound cooperation
When you must do CPU work in the event loop, insert yield points:
import asyncio
async def chunked_processing(data, chunk_size=1000):
"""Process data in chunks, yielding between each."""
results = []
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
# Process one chunk (CPU-bound)
result = [transform(item) for item in chunk]
results.extend(result)
# Yield to let other tasks run
await asyncio.sleep(0)
return results
def transform(item):
return item ** 2 + 1
await asyncio.sleep(0) schedules the current task to the back of the ready queue and runs one iteration of the event loop. This lets pending I/O callbacks and other tasks execute.
Building a custom cooperative scheduler
You can build a scheduler without asyncio for educational purposes:
from collections import deque
from typing import Generator, Any
class CooperativeScheduler:
"""Simple round-robin cooperative scheduler using generators."""
def __init__(self):
self._tasks: deque[Generator] = deque()
self._completed: list = []
def spawn(self, task: Generator):
self._tasks.append(task)
def run(self):
while self._tasks:
task = self._tasks.popleft()
try:
# Run until next yield
value = next(task)
# Task yielded — put it back in the queue
self._tasks.append(task)
except StopIteration as e:
self._completed.append(e.value)
def results(self):
return self._completed
# Tasks are generators that yield to cooperate
def counting_task(name, n):
total = 0
for i in range(n):
total += i
if i % 100 == 0:
yield # cooperate: let other tasks run
return total
scheduler = CooperativeScheduler()
scheduler.spawn(counting_task("A", 1000))
scheduler.spawn(counting_task("B", 500))
scheduler.spawn(counting_task("C", 750))
scheduler.run()
print(scheduler.results())
This demonstrates the core mechanism: tasks run until they yield, then go to the back of the queue. No preemption, no interrupts, no locks needed.
Trio’s structured concurrency model
Trio takes cooperative scheduling further with strict guarantees:
import trio
async def parent():
async with trio.open_nursery() as nursery:
nursery.start_soon(child, "A")
nursery.start_soon(child, "B")
# Both children are guaranteed to be done here
async def child(name):
await trio.sleep(1)
print(f"Child {name} done")
Trio’s scheduler enforces that:
- Tasks can only be spawned in nurseries (no orphan tasks)
- The nursery block waits for all children before exiting
- Cancellation propagates through the nursery tree
- Checkpoints (yield points) must happen regularly — Trio can warn if a task runs too long without one
Cancellation in cooperative scheduling
Since tasks can’t be interrupted, cancellation is cooperative too. asyncio.Task.cancel() sets a flag and raises CancelledError at the next await:
import asyncio
async def long_running():
try:
while True:
await asyncio.sleep(1)
print("Still running...")
except asyncio.CancelledError:
print("Cleaning up...")
await cleanup()
raise # re-raise to confirm cancellation
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
A task that never awaits can never be cancelled — another reason to avoid long CPU-bound stretches in coroutines.
Performance: cooperative vs preemptive
| Metric | Cooperative (asyncio) | Preemptive (threads) |
|---|---|---|
| Context switch cost | ~1-5μs | ~10-50μs |
| Memory per task | ~2KB (coroutine frame) | ~8MB (thread stack) |
| Max concurrent tasks | 100K+ easily | ~1K-10K practical |
| Shared state safety | Safe between awaits | Needs locks everywhere |
| CPU-bound fairness | Poor (must yield) | OS enforced |
| I/O throughput | Excellent | Good (but heavier) |
For I/O-heavy servers (web APIs, chat systems, proxies), cooperative scheduling with asyncio handles orders of magnitude more concurrent connections than threading.
The one thing to remember: cooperative scheduling gives each task complete control over when it yields, creating natural atomicity between await points. This eliminates most data races and enables massive concurrency for I/O workloads. The discipline required — yielding regularly and never blocking — is the price you pay for simplicity and performance.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.