Python Trio Concurrency — Deep Dive
How Trio’s Scheduler Works
Trio uses a run-to-completion, single-threaded scheduler similar to asyncio — but with tighter invariants. The scheduler maintains:
- Run queue: Tasks ready to execute, processed in FIFO order.
- I/O waiting set: Tasks blocked on file descriptor readiness (via the OS selector).
- Deadline heap: Cancel scopes sorted by their deadline, checked every iteration.
Each scheduler iteration:
- Check expired deadlines → cancel affected scopes → reschedule affected tasks.
- Run all tasks in the run queue until empty (each task runs until it hits a checkpoint).
- Poll the OS selector for I/O readiness, with timeout = earliest unexpired deadline.
A critical difference from asyncio: Trio’s scheduler explicitly tracks which cancel scope every task is inside. When a scope’s deadline expires, the scheduler walks the scope tree and delivers Cancelled exceptions to every task within that scope.
The Scope Tree
Trio maintains a tree of cancel scopes. Every task exists inside at least one scope (the nursery’s implicit scope). Scopes can be nested:
async def complex_operation():
with trio.fail_after(30): # outer: 30s total
async with trio.open_nursery() as nursery:
with trio.fail_after(5): # inner: 5s per task
nursery.start_soon(subtask_a)
with trio.fail_after(5):
nursery.start_soon(subtask_b)
When the inner scope expires, only that task is cancelled. When the outer scope expires, everything — nursery and all tasks — gets cancelled. This nesting behavior makes timeout composition predictable.
Nursery Internals
A nursery is implemented as a cancel scope plus bookkeeping:
Nursery:
├── CancelScope (shields tasks from external cancellation if desired)
├── Set[Task] — currently running child tasks
├── waiting_for_children: Event — signaled when Set[Task] is empty
└── exception_list: List[BaseException] — collected errors
When the async with block’s body finishes, the nursery enters “waiting” mode. It awaits waiting_for_children. When the last child task completes, the nursery checks exception_list. If non-empty, it raises an ExceptionGroup (or re-raises a single exception if there’s only one).
Shielding
Sometimes you need a task to survive cancellation (e.g., flushing a buffer on shutdown):
async def graceful_cleanup():
with trio.CancelScope(shield=True):
await flush_buffer() # won't be cancelled even if parent scope is
Shielded scopes ignore cancellation from parent scopes but respect their own deadlines.
Advanced Patterns
Producer-Consumer with Memory Channels
Trio’s channels are the equivalent of asyncio.Queue, but designed for structured concurrency:
async def producer(send_channel):
async with send_channel:
for i in range(100):
await send_channel.send(i)
await trio.sleep(0.01)
async def consumer(receive_channel):
async with receive_channel:
async for item in receive_channel:
await process(item)
async def main():
send_ch, recv_ch = trio.open_memory_channel(max_buffer_size=10)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_ch)
nursery.start_soon(consumer, recv_ch)
The max_buffer_size provides backpressure — the producer blocks when the buffer is full. Setting it to 0 creates a rendezvous channel (synchronous handoff).
Service Startup with start()
For tasks that need initialization before they’re “ready”:
async def database_pool(task_status=trio.TASK_STATUS_IGNORED):
pool = await create_pool()
task_status.started(pool) # signal "I'm ready"
await trio.sleep_forever() # keep running until cancelled
async def main():
async with trio.open_nursery() as nursery:
pool = await nursery.start(database_pool)
# pool is guaranteed to be initialized here
await do_queries(pool)
nursery.start() suspends the parent until the child calls task_status.started(). This eliminates race conditions during initialization.
Limiting Concurrency with CapacityLimiter
limiter = trio.CapacityLimiter(10) # max 10 concurrent
async def rate_limited_fetch(url):
async with limiter:
return await fetch(url)
async def main():
async with trio.open_nursery() as nursery:
for url in thousands_of_urls:
nursery.start_soon(rate_limited_fetch, url)
Testing Trio Code
Trio ships with testing utilities that make async tests deterministic:
import trio.testing
async def test_timeout_behavior():
clock = trio.testing.MockClock(autojump_threshold=0)
async def sleeper():
await trio.sleep(3600) # "sleeps" for an hour
# With MockClock, this completes instantly
with trio.move_on_after(7200):
await sleeper()
MockClock with autojump_threshold=0 skips forward to the next scheduled deadline automatically. Tests that involve timeouts run in milliseconds instead of waiting real time.
Property-Based Testing with Hypothesis
Trio integrates with Hypothesis for testing concurrent interleavings:
from hypothesis import given
from hypothesis import strategies as st
@given(st.lists(st.integers()))
async def test_producer_consumer(items):
results = []
send_ch, recv_ch = trio.open_memory_channel(0)
async with trio.open_nursery() as nursery:
async def producer():
async with send_ch:
for item in items:
await send_ch.send(item)
async def consumer():
async with recv_ch:
async for item in recv_ch:
results.append(item)
nursery.start_soon(producer)
nursery.start_soon(consumer)
assert results == items
Trio vs asyncio: Architectural Tradeoffs
| Aspect | Trio | asyncio |
|---|---|---|
| Task spawning | Only inside nurseries | Anywhere via create_task |
| Cancellation | Scope-tree based, automatic | Manual via task.cancel() |
| Error propagation | ExceptionGroup, automatic | Must check task.exception() |
| Checkpoint model | Explicit, enforced | Implicit |
| Ecosystem | Smaller (anyio bridges gap) | Vast |
| Learning curve | Steeper initially | Lower entry |
Bridging with anyio
anyio is an abstraction layer that works with both Trio and asyncio:
import anyio
async def portable_task():
async with anyio.create_task_group() as tg:
tg.start_soon(fetch, "url-a")
tg.start_soon(fetch, "url-b")
# Run with either backend
anyio.run(portable_task, backend="trio")
anyio.run(portable_task, backend="asyncio")
Libraries built on anyio (like Starlette, httpx) work with both runtimes. This is the pragmatic path for teams that want Trio’s safety guarantees without cutting off the asyncio ecosystem entirely.
One thing to remember: Trio’s structured concurrency model — nurseries + cancel scopes + mandatory checkpoints — eliminates entire classes of async bugs by making task lifetimes explicit and error propagation automatic. The tradeoff is a smaller ecosystem, mitigated by the anyio bridge.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.