Python Structured Concurrency Patterns — Core Concepts

The Problem with Unstructured Concurrency

Traditional asyncio code creates tasks that float freely:

async def main():
    task1 = asyncio.create_task(download("a.txt"))
    task2 = asyncio.create_task(download("b.txt"))
    # What if task1 fails? task2 keeps running.
    # What if main() exits? Tasks might be orphaned.
    result1 = await task1  # If this raises, task2 is abandoned
    result2 = await task2

Problems compound: exceptions are lost, cleanup is missed, and shutdown becomes unpredictable.

TaskGroup: The Core Primitive

Python 3.11 introduced asyncio.TaskGroup, which enforces structured concurrency:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(download("a.txt"))
        task2 = tg.create_task(download("b.txt"))
    # Here: both tasks are complete
    print(task1.result(), task2.result())

The rules are strict:

  1. All tasks must finish before the async with block exits
  2. If any task raises, all other tasks are cancelled and the group raises an ExceptionGroup
  3. No orphan tasks — every task has a clear parent scope

Error Handling with ExceptionGroup

When tasks fail inside a TaskGroup, errors are collected into an ExceptionGroup:

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(failing_task())
            tg.create_task(another_failing_task())
    except* ValueError as eg:
        # Handle ValueError instances
        for exc in eg.exceptions:
            print(f"ValueError: {exc}")
    except* ConnectionError as eg:
        # Handle ConnectionError instances
        for exc in eg.exceptions:
            print(f"ConnectionError: {exc}")

The except* syntax (Python 3.11+) lets you handle different exception types from the group separately.

Pattern: Fan-Out / Fan-In

The most common structured concurrency pattern — dispatch work in parallel and collect results:

async def fetch_all_pages(urls):
    results = {}
    async with asyncio.TaskGroup() as tg:
        tasks = {
            url: tg.create_task(fetch(url))
            for url in urls
        }
    return {url: task.result() for url, task in tasks.items()}

If any URL fails, all fetches are cancelled. No partial results leak out.

Pattern: Worker Pool with Bounded Concurrency

Limit how many tasks run simultaneously:

async def bounded_gather(coros, limit=10):
    semaphore = asyncio.Semaphore(limit)
    async def limited(coro):
        async with semaphore:
            return await coro

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(limited(c)) for c in coros]
    return [t.result() for t in tasks]

Pattern: First-Completed with Cleanup

Sometimes you want the first successful result and cancel the rest:

async def first_success(coros):
    winner = None
    async with asyncio.TaskGroup() as tg:
        event = asyncio.Event()
        async def race(coro):
            nonlocal winner
            result = await coro
            if not event.is_set():
                winner = result
                event.set()
                # Cancel sibling tasks
                for task in tg._tasks:
                    task.cancel()
        for c in coros:
            tg.create_task(race(c))
    return winner

Common Misconception: “TaskGroup Is Just gather() with Extra Steps”

asyncio.gather() and TaskGroup look similar but differ fundamentally:

  • gather() — returns results in order, can optionally swallow exceptions with return_exceptions=True. Doesn’t cancel siblings on failure unless you pass return_exceptions=False.
  • TaskGroup — enforces structured concurrency. Cancels all tasks on first failure. Collects errors into ExceptionGroup. Guarantees no orphans.

Use gather() for simple parallel awaits. Use TaskGroup when you need strong guarantees about task lifetime and error propagation.

When to Use Which

ScenarioTool
Run N things in parallel, need all resultsTaskGroup
Run N things, failures are independentgather(return_exceptions=True)
Run with concurrency limitTaskGroup + Semaphore
Long-running background workersTaskGroup in a context manager
Fire-and-forget (logging, metrics)create_task() with exception handler

One thing to remember: TaskGroup guarantees that when you exit the async with block, every task you created is either completed or cancelled — no orphans, no leaks, no silent failures.

pythonconcurrencyasynciostructured-concurrency

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.