TaskGroup and Structured Concurrency in Python — Deep Dive
Technical perspective
Structured concurrency fundamentally changes how Python programs manage concurrent lifetimes. The asyncio.TaskGroup implementation in CPython enforces a strict parent-child relationship between tasks, ensuring no task outlives its scope. This guarantee simplifies debugging, prevents resource leaks, and makes cancellation semantics predictable — but it requires rethinking patterns that relied on fire-and-forget task creation.
Internal mechanics
When you enter an async with asyncio.TaskGroup() block, the TaskGroup:
- Creates an internal set to track spawned tasks
- Installs a callback on each task that detects failures
- On first failure: sets an internal flag and calls
cancel()on all remaining tasks - On block exit (
__aexit__): waits for all tasks to finish, collects exceptions, and raisesExceptionGroupif any occurred
The cancellation is cooperative — tasks receive asyncio.CancelledError at their next await point. Tasks that catch and suppress CancelledError will delay the group’s completion.
# Simplified TaskGroup pseudocode
class TaskGroup:
async def __aenter__(self):
self._tasks = set()
self._errors = []
return self
def create_task(self, coro):
task = asyncio.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._on_task_done)
return task
def _on_task_done(self, task):
if task.cancelled():
return
if exc := task.exception():
self._errors.append(exc)
for t in self._tasks:
t.cancel()
async def __aexit__(self, *exc_info):
await asyncio.gather(*self._tasks, return_exceptions=True)
if self._errors:
raise ExceptionGroup("tasks failed", self._errors)
The real implementation handles edge cases around the event loop, re-entrancy, and proper exception chaining, but this captures the core logic.
Pattern: fan-out with concurrency limits
TaskGroup doesn’t have built-in concurrency limiting. Combine it with asyncio.Semaphore to process large workloads without overwhelming downstream services:
import asyncio
async def fetch_url(session, url: str, sem: asyncio.Semaphore) -> dict:
async with sem:
resp = await session.get(url)
return {"url": url, "status": resp.status_code}
async def crawl(urls: list[str], max_concurrent: int = 20):
sem = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as session:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(fetch_url(session, url, sem))
for url in urls
]
return [t.result() for t in tasks]
This pattern gives you bounded concurrency with clean cancellation — if one URL causes a fatal error, all remaining requests cancel immediately.
Pattern: nested TaskGroups for partial failure tolerance
Sometimes you want some failures to be tolerable while others are fatal. Nest TaskGroups:
async def process_batch(items: list[dict]) -> list[dict]:
results = []
async with asyncio.TaskGroup() as outer:
# Critical setup — failure here cancels everything
config = outer.create_task(load_config())
schema = outer.create_task(load_schema())
# Non-critical processing — individual failures are OK
for item in items:
try:
async with asyncio.TaskGroup() as inner:
validated = inner.create_task(
validate(item, schema.result())
)
enriched = inner.create_task(
enrich(item, config.result())
)
results.append({
"validated": validated.result(),
"enriched": enriched.result(),
})
except* Exception as eg:
results.append({"error": str(eg), "item": item})
return results
The outer group handles critical dependencies — if config or schema loading fails, nothing proceeds. The inner groups handle per-item processing where individual failures are logged and skipped.
Pattern: timeout integration
Combine TaskGroup with asyncio.timeout() (Python 3.11+) for deadline-based cancellation:
async def fetch_with_deadline(urls: list[str], deadline_seconds: float):
try:
async with asyncio.timeout(deadline_seconds):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(url)) for url in urls]
return [t.result() for t in tasks]
except TimeoutError:
print(f"Deadline of {deadline_seconds}s exceeded")
return []
When the timeout fires, it cancels all tasks in the group. This is cleaner than setting per-task timeouts because it enforces a total budget rather than individual limits.
ExceptionGroup handling strategies
Strategy 1: type-based filtering with except*
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(operation_a())
tg.create_task(operation_b())
except* ConnectionError as eg:
for exc in eg.exceptions:
log_connection_failure(exc)
except* ValueError as eg:
for exc in eg.exceptions:
log_validation_error(exc)
Multiple except* clauses can fire for the same ExceptionGroup — each handles its matching subset.
Strategy 2: programmatic inspection
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(operation_a())
tg.create_task(operation_b())
except* Exception as eg:
retryable = []
fatal = []
for exc in eg.exceptions:
if isinstance(exc, (ConnectionError, TimeoutError)):
retryable.append(exc)
else:
fatal.append(exc)
if retryable and not fatal:
await retry_operations(retryable)
elif fatal:
raise # Re-raise the ExceptionGroup
Strategy 3: the subgroup method
ExceptionGroup provides .subgroup() for filtering:
except* Exception as eg:
network_errors, other = eg.split(
lambda e: isinstance(e, (ConnectionError, TimeoutError))
)
if network_errors:
handle_network_issues(network_errors)
if other:
raise other
Graceful shutdown pattern
For long-running services, combine TaskGroup with signal handling:
import signal
async def serve():
shutdown_event = asyncio.Event()
def handle_signal():
shutdown_event.set()
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, handle_signal)
loop.add_signal_handler(signal.SIGINT, handle_signal)
async with asyncio.TaskGroup() as tg:
tg.create_task(http_server(shutdown_event))
tg.create_task(background_worker(shutdown_event))
tg.create_task(health_checker(shutdown_event))
# Wait for shutdown signal
await shutdown_event.wait()
# Cancellation propagates to all tasks via the event
Each task checks shutdown_event in its main loop and exits cleanly, causing the TaskGroup to complete naturally.
Migration from gather to TaskGroup
| gather pattern | TaskGroup equivalent |
|---|---|
await gather(a(), b()) | async with TaskGroup() as tg: then create_task |
return_exceptions=True | Wrap individual tasks in try/except, or use nested groups |
| Dynamic task list | Call create_task in a loop inside the async with block |
| Partial results | Nested groups per item with individual error handling |
Performance comparison
TaskGroup has negligible overhead compared to gather — the difference is a few task-tracking set operations. In benchmarks with 10,000 tasks, the overhead is under 1ms total.
The real performance difference is in failure scenarios: TaskGroup cancels immediately on first failure, while gather lets all tasks run to completion. For workloads where early cancellation matters (API calls with rate limits, database connections), TaskGroup can save significant resources.
Gotchas
- CancelledError suppression: If a task catches
CancelledErrorand doesn’t re-raise, the TaskGroup waits indefinitely. Always re-raise or let it propagate. - Synchronous exceptions in create_task: If the coroutine raises before its first
await, the exception is still captured by the TaskGroup — no special handling needed. - Mixing with raw asyncio.create_task: Tasks created outside the group are not managed by it. They can outlive the group and won’t be cancelled on failure.
- ExceptionGroup vs Exception: Code that catches
Exceptionwon’t catchExceptionGroup. Update bare except clauses when migrating to TaskGroup.
The one thing to remember: TaskGroup enforces that concurrent tasks have a defined lifetime, automatic cancellation on failure, and complete error reporting — the three properties that make async Python code production-safe rather than demo-safe.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.