Python Async Debugging — Deep Dive
Beyond Debug Mode
While debug=True is a great start, production async applications need deeper instrumentation. This guide covers techniques for debugging async code at scale.
Custom Task Factories for Tracing
You can intercept every task creation by installing a custom task factory:
import asyncio
import traceback
_original_factory = None
def tracing_task_factory(loop, coro, *, name=None, context=None):
task = _original_factory(loop, coro, name=name, context=context)
task._creation_stack = traceback.format_stack()
task._creation_time = loop.time()
return task
def install_tracing():
global _original_factory
loop = asyncio.get_running_loop()
_original_factory = loop.get_task_factory() or asyncio.Task
loop.set_task_factory(tracing_task_factory)
This attaches a creation stack trace and timestamp to every task. When investigating a stuck task, you can see exactly where and when it was created.
Walking the Await Chain
Every suspended coroutine has a cr_await attribute pointing to the next awaitable in the chain. Walking this chain reveals the full suspension path:
def get_await_chain(task):
"""Return the full chain of what a task is waiting for."""
chain = []
coro = task.get_coro()
while coro is not None:
if hasattr(coro, 'cr_code'):
frame = coro.cr_frame
if frame:
chain.append({
'function': coro.cr_code.co_name,
'file': coro.cr_code.co_filename,
'line': frame.f_lineno,
'locals': {k: repr(v) for k, v in frame.f_locals.items()
if not k.startswith('_')}
})
coro = coro.cr_await
elif hasattr(coro, 'gi_code'):
# Generator-based coroutine
frame = coro.gi_frame
if frame:
chain.append({
'function': coro.gi_code.co_name,
'file': coro.gi_code.co_filename,
'line': frame.f_lineno,
})
coro = coro.gi_yieldfrom
else:
chain.append({'awaitable': repr(coro)})
break
return chain
This is far more useful than task.print_stack() because it includes local variables and traverses through nested awaitables.
Signal-Based Debug Dump
In production, you can’t attach a debugger. Instead, install a signal handler that dumps the state of all tasks:
import asyncio
import signal
import sys
import json
def dump_tasks(sig, frame):
loop = asyncio.get_event_loop()
if not loop.is_running():
return
tasks = asyncio.all_tasks(loop)
report = []
for task in tasks:
info = {
'name': task.get_name(),
'state': 'done' if task.done() else 'pending',
'cancelled': task.cancelled(),
}
if hasattr(task, '_creation_time'):
info['age_seconds'] = loop.time() - task._creation_time
if not task.done():
info['await_chain'] = get_await_chain(task)
report.append(info)
with open('/tmp/asyncio-dump.json', 'w') as f:
json.dump(report, f, indent=2, default=str)
print(f"Dumped {len(report)} tasks to /tmp/asyncio-dump.json",
file=sys.stderr)
signal.signal(signal.SIGUSR1, dump_tasks)
Send kill -USR1 <pid> to get an instant snapshot. This is invaluable for diagnosing production hangs.
Event Loop Monitoring with Callbacks
Instrument the event loop to track callback execution times:
import asyncio
import time
class InstrumentedLoop(asyncio.SelectorEventLoop):
def __init__(self):
super().__init__()
self._callback_times = []
self._slow_threshold = 0.05 # 50ms
def _run_once(self):
start = time.monotonic()
super()._run_once()
elapsed = time.monotonic() - start
if elapsed > self._slow_threshold:
self._callback_times.append(elapsed)
def get_stats(self):
if not self._callback_times:
return {'slow_callbacks': 0}
return {
'slow_callbacks': len(self._callback_times),
'max_ms': max(self._callback_times) * 1000,
'avg_ms': sum(self._callback_times) / len(self._callback_times) * 1000,
}
Detecting Resource Leaks
Unclosed connections, file handles, and tasks are common in async code. Track them:
class ConnectionTracker:
def __init__(self):
self._active = {}
def opened(self, conn, stack=None):
self._active[id(conn)] = {
'conn': conn,
'opened_at': time.monotonic(),
'stack': stack or traceback.format_stack(),
}
def closed(self, conn):
self._active.pop(id(conn), None)
def report_leaks(self, max_age_seconds=300):
now = time.monotonic()
for info in self._active.values():
age = now - info['opened_at']
if age > max_age_seconds:
print(f"Leaked connection (age: {age:.0f}s):")
print(''.join(info['stack']))
Post-Mortem Analysis with asyncio.Task
When a task fails in production, capture its full context:
def exception_handler(loop, context):
task = context.get('future')
exception = context.get('exception')
message = context.get('message', 'Unknown')
report = {
'message': message,
'exception': repr(exception),
'task_name': getattr(task, 'get_name', lambda: 'N/A')(),
}
if hasattr(task, '_creation_stack'):
report['creation_site'] = task._creation_stack
if exception:
report['traceback'] = traceback.format_exception(
type(exception), exception, exception.__traceback__
)
# Send to your error tracking service
logging.error("Async task failure", extra=report)
loop.set_exception_handler(exception_handler)
Using aiomonitor for Interactive Debugging
The aiomonitor library provides a telnet-based console for running async applications:
import aiomonitor
async def main():
with aiomonitor.start_monitor(port=50101):
# Your application runs here
await run_server()
# Connect from another terminal:
# $ python -m aiomonitor.cli --port 50101
# > ps # List tasks
# > where <id> # Stack trace of a task
# > cancel <id> # Cancel a stuck task
Profiling Async Code
Standard profilers don’t work well with async code because they measure wall time including suspension. Use yappi with async support:
import yappi
yappi.set_clock_type("wall") # or "cpu" for CPU-only
yappi.start()
asyncio.run(main())
yappi.stop()
# Print stats for coroutines only
stats = yappi.get_func_stats(
filter_callback=lambda stat: stat.is_coroutine
)
stats.print_all()
Structured Logging for Async
Attach task context to every log message using contextvars:
import contextvars
import logging
request_id = contextvars.ContextVar('request_id', default='unknown')
class AsyncContextFilter(logging.Filter):
def filter(self, record):
record.request_id = request_id.get()
task = asyncio.current_task()
record.task_name = task.get_name() if task else 'no-task'
return True
handler = logging.StreamHandler()
handler.addFilter(AsyncContextFilter())
formatter = logging.Formatter(
'%(asctime)s [%(task_name)s] [%(request_id)s] %(message)s'
)
handler.setFormatter(formatter)
Common Debugging Patterns Summary
| Symptom | Likely Cause | Tool |
|---|---|---|
| Program hangs | Deadlock or forgotten await | all_tasks() + await chain walk |
| Silent failures | Unobserved task exception | Custom exception handler |
| Gradual slowdown | Resource leak | Connection/task tracker |
| Intermittent errors | Race condition in shared state | Structured logging with task IDs |
| High latency spikes | Blocking callback | Slow callback monitoring |
| Memory growth | Tasks never completing | Task factory with age tracking |
One thing to remember: Production async debugging requires proactive instrumentation — install custom task factories, exception handlers, and signal-based dump handlers before problems occur, because you can’t attach a debugger to a stuck production event loop.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.