TaskGroup and Structured Concurrency in Python — Deep Dive

Technical perspective

Structured concurrency fundamentally changes how Python programs manage concurrent lifetimes. The asyncio.TaskGroup implementation in CPython enforces a strict parent-child relationship between tasks, ensuring no task outlives its scope. This guarantee simplifies debugging, prevents resource leaks, and makes cancellation semantics predictable — but it requires rethinking patterns that relied on fire-and-forget task creation.

Internal mechanics

When you enter an async with asyncio.TaskGroup() block, the TaskGroup:

  1. Creates an internal set to track spawned tasks
  2. Installs a callback on each task that detects failures
  3. On first failure: sets an internal flag and calls cancel() on all remaining tasks
  4. On block exit (__aexit__): waits for all tasks to finish, collects exceptions, and raises ExceptionGroup if any occurred

The cancellation is cooperative — tasks receive asyncio.CancelledError at their next await point. Tasks that catch and suppress CancelledError will delay the group’s completion.

# Simplified TaskGroup pseudocode
class TaskGroup:
    async def __aenter__(self):
        self._tasks = set()
        self._errors = []
        return self

    def create_task(self, coro):
        task = asyncio.create_task(coro)
        self._tasks.add(task)
        task.add_done_callback(self._on_task_done)
        return task

    def _on_task_done(self, task):
        if task.cancelled():
            return
        if exc := task.exception():
            self._errors.append(exc)
            for t in self._tasks:
                t.cancel()

    async def __aexit__(self, *exc_info):
        await asyncio.gather(*self._tasks, return_exceptions=True)
        if self._errors:
            raise ExceptionGroup("tasks failed", self._errors)

The real implementation handles edge cases around the event loop, re-entrancy, and proper exception chaining, but this captures the core logic.

Pattern: fan-out with concurrency limits

TaskGroup doesn’t have built-in concurrency limiting. Combine it with asyncio.Semaphore to process large workloads without overwhelming downstream services:

import asyncio

async def fetch_url(session, url: str, sem: asyncio.Semaphore) -> dict:
    async with sem:
        resp = await session.get(url)
        return {"url": url, "status": resp.status_code}

async def crawl(urls: list[str], max_concurrent: int = 20):
    sem = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as session:
        async with asyncio.TaskGroup() as tg:
            tasks = [
                tg.create_task(fetch_url(session, url, sem))
                for url in urls
            ]
    return [t.result() for t in tasks]

This pattern gives you bounded concurrency with clean cancellation — if one URL causes a fatal error, all remaining requests cancel immediately.

Pattern: nested TaskGroups for partial failure tolerance

Sometimes you want some failures to be tolerable while others are fatal. Nest TaskGroups:

async def process_batch(items: list[dict]) -> list[dict]:
    results = []

    async with asyncio.TaskGroup() as outer:
        # Critical setup — failure here cancels everything
        config = outer.create_task(load_config())
        schema = outer.create_task(load_schema())

    # Non-critical processing — individual failures are OK
    for item in items:
        try:
            async with asyncio.TaskGroup() as inner:
                validated = inner.create_task(
                    validate(item, schema.result())
                )
                enriched = inner.create_task(
                    enrich(item, config.result())
                )
            results.append({
                "validated": validated.result(),
                "enriched": enriched.result(),
            })
        except* Exception as eg:
            results.append({"error": str(eg), "item": item})

    return results

The outer group handles critical dependencies — if config or schema loading fails, nothing proceeds. The inner groups handle per-item processing where individual failures are logged and skipped.

Pattern: timeout integration

Combine TaskGroup with asyncio.timeout() (Python 3.11+) for deadline-based cancellation:

async def fetch_with_deadline(urls: list[str], deadline_seconds: float):
    try:
        async with asyncio.timeout(deadline_seconds):
            async with asyncio.TaskGroup() as tg:
                tasks = [tg.create_task(fetch(url)) for url in urls]
            return [t.result() for t in tasks]
    except TimeoutError:
        print(f"Deadline of {deadline_seconds}s exceeded")
        return []

When the timeout fires, it cancels all tasks in the group. This is cleaner than setting per-task timeouts because it enforces a total budget rather than individual limits.

ExceptionGroup handling strategies

Strategy 1: type-based filtering with except*

try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(operation_a())
        tg.create_task(operation_b())
except* ConnectionError as eg:
    for exc in eg.exceptions:
        log_connection_failure(exc)
except* ValueError as eg:
    for exc in eg.exceptions:
        log_validation_error(exc)

Multiple except* clauses can fire for the same ExceptionGroup — each handles its matching subset.

Strategy 2: programmatic inspection

try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(operation_a())
        tg.create_task(operation_b())
except* Exception as eg:
    retryable = []
    fatal = []
    for exc in eg.exceptions:
        if isinstance(exc, (ConnectionError, TimeoutError)):
            retryable.append(exc)
        else:
            fatal.append(exc)
    
    if retryable and not fatal:
        await retry_operations(retryable)
    elif fatal:
        raise  # Re-raise the ExceptionGroup

Strategy 3: the subgroup method

ExceptionGroup provides .subgroup() for filtering:

except* Exception as eg:
    network_errors, other = eg.split(
        lambda e: isinstance(e, (ConnectionError, TimeoutError))
    )
    if network_errors:
        handle_network_issues(network_errors)
    if other:
        raise other

Graceful shutdown pattern

For long-running services, combine TaskGroup with signal handling:

import signal

async def serve():
    shutdown_event = asyncio.Event()

    def handle_signal():
        shutdown_event.set()

    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGTERM, handle_signal)
    loop.add_signal_handler(signal.SIGINT, handle_signal)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(http_server(shutdown_event))
        tg.create_task(background_worker(shutdown_event))
        tg.create_task(health_checker(shutdown_event))

        # Wait for shutdown signal
        await shutdown_event.wait()
        # Cancellation propagates to all tasks via the event

Each task checks shutdown_event in its main loop and exits cleanly, causing the TaskGroup to complete naturally.

Migration from gather to TaskGroup

gather patternTaskGroup equivalent
await gather(a(), b())async with TaskGroup() as tg: then create_task
return_exceptions=TrueWrap individual tasks in try/except, or use nested groups
Dynamic task listCall create_task in a loop inside the async with block
Partial resultsNested groups per item with individual error handling

Performance comparison

TaskGroup has negligible overhead compared to gather — the difference is a few task-tracking set operations. In benchmarks with 10,000 tasks, the overhead is under 1ms total.

The real performance difference is in failure scenarios: TaskGroup cancels immediately on first failure, while gather lets all tasks run to completion. For workloads where early cancellation matters (API calls with rate limits, database connections), TaskGroup can save significant resources.

Gotchas

  • CancelledError suppression: If a task catches CancelledError and doesn’t re-raise, the TaskGroup waits indefinitely. Always re-raise or let it propagate.
  • Synchronous exceptions in create_task: If the coroutine raises before its first await, the exception is still captured by the TaskGroup — no special handling needed.
  • Mixing with raw asyncio.create_task: Tasks created outside the group are not managed by it. They can outlive the group and won’t be cancelled on failure.
  • ExceptionGroup vs Exception: Code that catches Exception won’t catch ExceptionGroup. Update bare except clauses when migrating to TaskGroup.

The one thing to remember: TaskGroup enforces that concurrent tasks have a defined lifetime, automatic cancellation on failure, and complete error reporting — the three properties that make async Python code production-safe rather than demo-safe.

pythonconcurrencypython311

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.