Python Structured Concurrency Patterns — Deep Dive
TaskGroup Internals
The asyncio.TaskGroup implementation (in Lib/asyncio/taskgroups.py) maintains a set of tasks and uses a carefully orchestrated shutdown sequence:
class TaskGroup:
def __init__(self):
self._tasks = set()
self._errors = []
self._base_task = None # The task running the 'async with'
self._aborting = False
async def __aenter__(self):
self._base_task = asyncio.current_task()
return self
def create_task(self, coro, *, name=None, context=None):
task = self._base_task.get_loop().create_task(
coro, name=name, context=context
)
self._tasks.add(task)
task.add_done_callback(self._on_task_done)
return task
def _on_task_done(self, task):
self._tasks.discard(task)
if task.cancelled():
return
exc = task.exception()
if exc is not None:
self._errors.append(exc)
if not self._aborting:
self._abort()
def _abort(self):
self._aborting = True
for task in self._tasks:
task.cancel()
# Cancel the base task to unblock __aexit__
self._base_task.cancel()
The critical insight: when a child task fails, _abort() cancels all sibling tasks and the base task. The base task’s cancellation forces the async with body to stop if it’s still running.
The aexit Dance
The exit sequence is where the complexity lives:
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._aborting = exc_type is not None
# If the body raised, cancel all children
if self._aborting:
for task in self._tasks:
task.cancel()
# Wait for all tasks to finish
while self._tasks:
# Propagation might cause new tasks to error
try:
await asyncio.gather(*self._tasks, return_exceptions=True)
except asyncio.CancelledError:
continue # Retry — more tasks might need settling
# Uncancel the base task if we cancelled it during abort
if self._aborting:
self._base_task.uncancel()
# Raise collected errors
if self._errors:
raise BaseExceptionGroup(
"unhandled errors in a TaskGroup",
self._errors
)
The while self._tasks loop handles the case where cancelling one task causes side effects in others. The uncancel() call ensures the TaskGroup’s internal cancellation doesn’t leak to the caller.
ExceptionGroup Semantics
ExceptionGroup (PEP 654) has specific matching rules:
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(raise_value_error())
tg.create_task(raise_type_error())
tg.create_task(raise_value_error())
except* ValueError as eg:
# eg contains TWO ValueErrors
print(len(eg.exceptions)) # 2
except* TypeError as eg:
# eg contains ONE TypeError
print(len(eg.exceptions)) # 1
Both except* clauses execute — unlike regular except, except* doesn’t short-circuit. Each clause handles a subset of the group.
Nesting ExceptionGroups
ExceptionGroups can nest. If a TaskGroup contains another TaskGroup, the inner group’s errors are wrapped in an outer group:
# Outer group raises ExceptionGroup containing:
# - ExceptionGroup from inner group (containing the actual errors)
Use eg.derive() and eg.subgroup() to navigate nested groups:
except* ValueError as eg:
# eg.subgroup(lambda e: isinstance(e, SpecificError))
# Returns a new ExceptionGroup with matching exceptions only
Trio-Inspired Patterns
Python’s TaskGroup was inspired by Trio’s nurseries. Some patterns from Trio translate directly:
Cancel Scope with Deadline
async def with_deadline(deadline, coro):
"""Run coro with a hard deadline, structured style."""
async with asyncio.TaskGroup() as tg:
async def watchdog():
delay = deadline - asyncio.get_event_loop().time()
if delay > 0:
await asyncio.sleep(delay)
raise TimeoutError("Deadline exceeded")
tg.create_task(watchdog())
result_task = tg.create_task(coro)
return result_task.result()
Supervised Workers
A pattern where failed workers are automatically restarted:
async def supervised_workers(worker_fn, count=4):
async with asyncio.TaskGroup() as tg:
async def supervised(name):
while True:
try:
await worker_fn(name)
except Exception as e:
logging.error(f"Worker {name} crashed: {e}")
await asyncio.sleep(1) # Backoff before restart
continue
break # Clean exit
for i in range(count):
tg.create_task(supervised(f"worker-{i}"))
Nested TaskGroups
TaskGroups compose naturally:
async def pipeline():
async with asyncio.TaskGroup() as outer:
async def stage(items):
async with asyncio.TaskGroup() as inner:
for item in items:
inner.create_task(process(item))
# All items in this batch are done
outer.create_task(stage(batch_1))
outer.create_task(stage(batch_2))
# All batches are done
Inner group failures propagate to the outer group. If batch_1 fails, batch_2 is also cancelled.
Building Custom Concurrency Structures
Rate-Limited TaskGroup
class RateLimitedGroup:
def __init__(self, tg, rate=10, per=1.0):
self._tg = tg
self._semaphore = asyncio.Semaphore(rate)
self._per = per / rate
async def create_task(self, coro):
async def throttled():
async with self._semaphore:
result = await coro
await asyncio.sleep(self._per)
return result
return self._tg.create_task(throttled())
Ordered Results
TaskGroup tasks complete in arbitrary order. To preserve input order:
async def ordered_map(fn, items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fn(item)) for item in items]
return [task.result() for task in tasks] # Original order preserved
Comparison: asyncio vs. Trio vs. anyio
| Feature | asyncio.TaskGroup | trio.open_nursery | anyio.create_task_group |
|---|---|---|---|
| Python version | 3.11+ | Any (third-party) | Any (third-party) |
| Cancel scopes | Via asyncio.timeout | Built-in | Built-in |
| Strict mode | Always strict | Configurable | Always strict |
| Exception handling | ExceptionGroup | ExceptionGroup (Trio 0.22+) | ExceptionGroup |
| Shielding | asyncio.shield() | trio.open_nursery(shield=True) | anyio.move_on_after |
Performance Characteristics
TaskGroup has minimal overhead over raw create_task():
- Task creation: ~1μs additional for bookkeeping (set.add, callback registration)
- Completion: ~0.5μs for the done callback
- Error path: More expensive due to ExceptionGroup construction and
uncancel()calls
For most applications, the safety guarantees far outweigh the nanosecond-level overhead.
One thing to remember: TaskGroup’s power comes from its strict invariant — when async with exits, every child task is settled — and its implementation achieves this through coordinated cancellation, uncancel() to prevent leaking, and ExceptionGroup to preserve all error information.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.