Python Structured Concurrency Patterns — Core Concepts
The Problem with Unstructured Concurrency
Traditional asyncio code creates tasks that float freely:
async def main():
task1 = asyncio.create_task(download("a.txt"))
task2 = asyncio.create_task(download("b.txt"))
# What if task1 fails? task2 keeps running.
# What if main() exits? Tasks might be orphaned.
result1 = await task1 # If this raises, task2 is abandoned
result2 = await task2
Problems compound: exceptions are lost, cleanup is missed, and shutdown becomes unpredictable.
TaskGroup: The Core Primitive
Python 3.11 introduced asyncio.TaskGroup, which enforces structured concurrency:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(download("a.txt"))
task2 = tg.create_task(download("b.txt"))
# Here: both tasks are complete
print(task1.result(), task2.result())
The rules are strict:
- All tasks must finish before the
async withblock exits - If any task raises, all other tasks are cancelled and the group raises an
ExceptionGroup - No orphan tasks — every task has a clear parent scope
Error Handling with ExceptionGroup
When tasks fail inside a TaskGroup, errors are collected into an ExceptionGroup:
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(failing_task())
tg.create_task(another_failing_task())
except* ValueError as eg:
# Handle ValueError instances
for exc in eg.exceptions:
print(f"ValueError: {exc}")
except* ConnectionError as eg:
# Handle ConnectionError instances
for exc in eg.exceptions:
print(f"ConnectionError: {exc}")
The except* syntax (Python 3.11+) lets you handle different exception types from the group separately.
Pattern: Fan-Out / Fan-In
The most common structured concurrency pattern — dispatch work in parallel and collect results:
async def fetch_all_pages(urls):
results = {}
async with asyncio.TaskGroup() as tg:
tasks = {
url: tg.create_task(fetch(url))
for url in urls
}
return {url: task.result() for url, task in tasks.items()}
If any URL fails, all fetches are cancelled. No partial results leak out.
Pattern: Worker Pool with Bounded Concurrency
Limit how many tasks run simultaneously:
async def bounded_gather(coros, limit=10):
semaphore = asyncio.Semaphore(limit)
async def limited(coro):
async with semaphore:
return await coro
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(limited(c)) for c in coros]
return [t.result() for t in tasks]
Pattern: First-Completed with Cleanup
Sometimes you want the first successful result and cancel the rest:
async def first_success(coros):
winner = None
async with asyncio.TaskGroup() as tg:
event = asyncio.Event()
async def race(coro):
nonlocal winner
result = await coro
if not event.is_set():
winner = result
event.set()
# Cancel sibling tasks
for task in tg._tasks:
task.cancel()
for c in coros:
tg.create_task(race(c))
return winner
Common Misconception: “TaskGroup Is Just gather() with Extra Steps”
asyncio.gather() and TaskGroup look similar but differ fundamentally:
- gather() — returns results in order, can optionally swallow exceptions with
return_exceptions=True. Doesn’t cancel siblings on failure unless you passreturn_exceptions=False. - TaskGroup — enforces structured concurrency. Cancels all tasks on first failure. Collects errors into
ExceptionGroup. Guarantees no orphans.
Use gather() for simple parallel awaits. Use TaskGroup when you need strong guarantees about task lifetime and error propagation.
When to Use Which
| Scenario | Tool |
|---|---|
| Run N things in parallel, need all results | TaskGroup |
| Run N things, failures are independent | gather(return_exceptions=True) |
| Run with concurrency limit | TaskGroup + Semaphore |
| Long-running background workers | TaskGroup in a context manager |
| Fire-and-forget (logging, metrics) | create_task() with exception handler |
One thing to remember: TaskGroup guarantees that when you exit the async with block, every task you created is either completed or cancelled — no orphans, no leaks, no silent failures.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.