Python Structured Concurrency Patterns — Deep Dive

TaskGroup Internals

The asyncio.TaskGroup implementation (in Lib/asyncio/taskgroups.py) maintains a set of tasks and uses a carefully orchestrated shutdown sequence:

class TaskGroup:
    def __init__(self):
        self._tasks = set()
        self._errors = []
        self._base_task = None  # The task running the 'async with'
        self._aborting = False

    async def __aenter__(self):
        self._base_task = asyncio.current_task()
        return self

    def create_task(self, coro, *, name=None, context=None):
        task = self._base_task.get_loop().create_task(
            coro, name=name, context=context
        )
        self._tasks.add(task)
        task.add_done_callback(self._on_task_done)
        return task

    def _on_task_done(self, task):
        self._tasks.discard(task)
        if task.cancelled():
            return
        exc = task.exception()
        if exc is not None:
            self._errors.append(exc)
            if not self._aborting:
                self._abort()

    def _abort(self):
        self._aborting = True
        for task in self._tasks:
            task.cancel()
        # Cancel the base task to unblock __aexit__
        self._base_task.cancel()

The critical insight: when a child task fails, _abort() cancels all sibling tasks and the base task. The base task’s cancellation forces the async with body to stop if it’s still running.

The aexit Dance

The exit sequence is where the complexity lives:

async def __aexit__(self, exc_type, exc_val, exc_tb):
    self._aborting = exc_type is not None

    # If the body raised, cancel all children
    if self._aborting:
        for task in self._tasks:
            task.cancel()

    # Wait for all tasks to finish
    while self._tasks:
        # Propagation might cause new tasks to error
        try:
            await asyncio.gather(*self._tasks, return_exceptions=True)
        except asyncio.CancelledError:
            continue  # Retry — more tasks might need settling

    # Uncancel the base task if we cancelled it during abort
    if self._aborting:
        self._base_task.uncancel()

    # Raise collected errors
    if self._errors:
        raise BaseExceptionGroup(
            "unhandled errors in a TaskGroup",
            self._errors
        )

The while self._tasks loop handles the case where cancelling one task causes side effects in others. The uncancel() call ensures the TaskGroup’s internal cancellation doesn’t leak to the caller.

ExceptionGroup Semantics

ExceptionGroup (PEP 654) has specific matching rules:

try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(raise_value_error())
        tg.create_task(raise_type_error())
        tg.create_task(raise_value_error())
except* ValueError as eg:
    # eg contains TWO ValueErrors
    print(len(eg.exceptions))  # 2
except* TypeError as eg:
    # eg contains ONE TypeError
    print(len(eg.exceptions))  # 1

Both except* clauses execute — unlike regular except, except* doesn’t short-circuit. Each clause handles a subset of the group.

Nesting ExceptionGroups

ExceptionGroups can nest. If a TaskGroup contains another TaskGroup, the inner group’s errors are wrapped in an outer group:

# Outer group raises ExceptionGroup containing:
#   - ExceptionGroup from inner group (containing the actual errors)

Use eg.derive() and eg.subgroup() to navigate nested groups:

except* ValueError as eg:
    # eg.subgroup(lambda e: isinstance(e, SpecificError))
    # Returns a new ExceptionGroup with matching exceptions only

Trio-Inspired Patterns

Python’s TaskGroup was inspired by Trio’s nurseries. Some patterns from Trio translate directly:

Cancel Scope with Deadline

async def with_deadline(deadline, coro):
    """Run coro with a hard deadline, structured style."""
    async with asyncio.TaskGroup() as tg:
        async def watchdog():
            delay = deadline - asyncio.get_event_loop().time()
            if delay > 0:
                await asyncio.sleep(delay)
            raise TimeoutError("Deadline exceeded")

        tg.create_task(watchdog())
        result_task = tg.create_task(coro)

    return result_task.result()

Supervised Workers

A pattern where failed workers are automatically restarted:

async def supervised_workers(worker_fn, count=4):
    async with asyncio.TaskGroup() as tg:
        async def supervised(name):
            while True:
                try:
                    await worker_fn(name)
                except Exception as e:
                    logging.error(f"Worker {name} crashed: {e}")
                    await asyncio.sleep(1)  # Backoff before restart
                    continue
                break  # Clean exit

        for i in range(count):
            tg.create_task(supervised(f"worker-{i}"))

Nested TaskGroups

TaskGroups compose naturally:

async def pipeline():
    async with asyncio.TaskGroup() as outer:
        async def stage(items):
            async with asyncio.TaskGroup() as inner:
                for item in items:
                    inner.create_task(process(item))
            # All items in this batch are done

        outer.create_task(stage(batch_1))
        outer.create_task(stage(batch_2))
    # All batches are done

Inner group failures propagate to the outer group. If batch_1 fails, batch_2 is also cancelled.

Building Custom Concurrency Structures

Rate-Limited TaskGroup

class RateLimitedGroup:
    def __init__(self, tg, rate=10, per=1.0):
        self._tg = tg
        self._semaphore = asyncio.Semaphore(rate)
        self._per = per / rate

    async def create_task(self, coro):
        async def throttled():
            async with self._semaphore:
                result = await coro
                await asyncio.sleep(self._per)
                return result
        return self._tg.create_task(throttled())

Ordered Results

TaskGroup tasks complete in arbitrary order. To preserve input order:

async def ordered_map(fn, items):
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fn(item)) for item in items]
    return [task.result() for task in tasks]  # Original order preserved

Comparison: asyncio vs. Trio vs. anyio

Featureasyncio.TaskGrouptrio.open_nurseryanyio.create_task_group
Python version3.11+Any (third-party)Any (third-party)
Cancel scopesVia asyncio.timeoutBuilt-inBuilt-in
Strict modeAlways strictConfigurableAlways strict
Exception handlingExceptionGroupExceptionGroup (Trio 0.22+)ExceptionGroup
Shieldingasyncio.shield()trio.open_nursery(shield=True)anyio.move_on_after

Performance Characteristics

TaskGroup has minimal overhead over raw create_task():

  • Task creation: ~1μs additional for bookkeeping (set.add, callback registration)
  • Completion: ~0.5μs for the done callback
  • Error path: More expensive due to ExceptionGroup construction and uncancel() calls

For most applications, the safety guarantees far outweigh the nanosecond-level overhead.

One thing to remember: TaskGroup’s power comes from its strict invariant — when async with exits, every child task is settled — and its implementation achieves this through coordinated cancellation, uncancel() to prevent leaking, and ExceptionGroup to preserve all error information.

pythonconcurrencyasynciostructured-concurrency

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.