Python Async Debugging — Deep Dive

Beyond Debug Mode

While debug=True is a great start, production async applications need deeper instrumentation. This guide covers techniques for debugging async code at scale.

Custom Task Factories for Tracing

You can intercept every task creation by installing a custom task factory:

import asyncio
import traceback

_original_factory = None

def tracing_task_factory(loop, coro, *, name=None, context=None):
    task = _original_factory(loop, coro, name=name, context=context)
    task._creation_stack = traceback.format_stack()
    task._creation_time = loop.time()
    return task

def install_tracing():
    global _original_factory
    loop = asyncio.get_running_loop()
    _original_factory = loop.get_task_factory() or asyncio.Task
    loop.set_task_factory(tracing_task_factory)

This attaches a creation stack trace and timestamp to every task. When investigating a stuck task, you can see exactly where and when it was created.

Walking the Await Chain

Every suspended coroutine has a cr_await attribute pointing to the next awaitable in the chain. Walking this chain reveals the full suspension path:

def get_await_chain(task):
    """Return the full chain of what a task is waiting for."""
    chain = []
    coro = task.get_coro()
    while coro is not None:
        if hasattr(coro, 'cr_code'):
            frame = coro.cr_frame
            if frame:
                chain.append({
                    'function': coro.cr_code.co_name,
                    'file': coro.cr_code.co_filename,
                    'line': frame.f_lineno,
                    'locals': {k: repr(v) for k, v in frame.f_locals.items()
                              if not k.startswith('_')}
                })
            coro = coro.cr_await
        elif hasattr(coro, 'gi_code'):
            # Generator-based coroutine
            frame = coro.gi_frame
            if frame:
                chain.append({
                    'function': coro.gi_code.co_name,
                    'file': coro.gi_code.co_filename,
                    'line': frame.f_lineno,
                })
            coro = coro.gi_yieldfrom
        else:
            chain.append({'awaitable': repr(coro)})
            break
    return chain

This is far more useful than task.print_stack() because it includes local variables and traverses through nested awaitables.

Signal-Based Debug Dump

In production, you can’t attach a debugger. Instead, install a signal handler that dumps the state of all tasks:

import asyncio
import signal
import sys
import json

def dump_tasks(sig, frame):
    loop = asyncio.get_event_loop()
    if not loop.is_running():
        return

    tasks = asyncio.all_tasks(loop)
    report = []
    for task in tasks:
        info = {
            'name': task.get_name(),
            'state': 'done' if task.done() else 'pending',
            'cancelled': task.cancelled(),
        }
        if hasattr(task, '_creation_time'):
            info['age_seconds'] = loop.time() - task._creation_time
        if not task.done():
            info['await_chain'] = get_await_chain(task)
        report.append(info)

    with open('/tmp/asyncio-dump.json', 'w') as f:
        json.dump(report, f, indent=2, default=str)
    print(f"Dumped {len(report)} tasks to /tmp/asyncio-dump.json",
          file=sys.stderr)

signal.signal(signal.SIGUSR1, dump_tasks)

Send kill -USR1 <pid> to get an instant snapshot. This is invaluable for diagnosing production hangs.

Event Loop Monitoring with Callbacks

Instrument the event loop to track callback execution times:

import asyncio
import time

class InstrumentedLoop(asyncio.SelectorEventLoop):
    def __init__(self):
        super().__init__()
        self._callback_times = []
        self._slow_threshold = 0.05  # 50ms

    def _run_once(self):
        start = time.monotonic()
        super()._run_once()
        elapsed = time.monotonic() - start
        if elapsed > self._slow_threshold:
            self._callback_times.append(elapsed)

    def get_stats(self):
        if not self._callback_times:
            return {'slow_callbacks': 0}
        return {
            'slow_callbacks': len(self._callback_times),
            'max_ms': max(self._callback_times) * 1000,
            'avg_ms': sum(self._callback_times) / len(self._callback_times) * 1000,
        }

Detecting Resource Leaks

Unclosed connections, file handles, and tasks are common in async code. Track them:

class ConnectionTracker:
    def __init__(self):
        self._active = {}

    def opened(self, conn, stack=None):
        self._active[id(conn)] = {
            'conn': conn,
            'opened_at': time.monotonic(),
            'stack': stack or traceback.format_stack(),
        }

    def closed(self, conn):
        self._active.pop(id(conn), None)

    def report_leaks(self, max_age_seconds=300):
        now = time.monotonic()
        for info in self._active.values():
            age = now - info['opened_at']
            if age > max_age_seconds:
                print(f"Leaked connection (age: {age:.0f}s):")
                print(''.join(info['stack']))

Post-Mortem Analysis with asyncio.Task

When a task fails in production, capture its full context:

def exception_handler(loop, context):
    task = context.get('future')
    exception = context.get('exception')
    message = context.get('message', 'Unknown')

    report = {
        'message': message,
        'exception': repr(exception),
        'task_name': getattr(task, 'get_name', lambda: 'N/A')(),
    }

    if hasattr(task, '_creation_stack'):
        report['creation_site'] = task._creation_stack

    if exception:
        report['traceback'] = traceback.format_exception(
            type(exception), exception, exception.__traceback__
        )

    # Send to your error tracking service
    logging.error("Async task failure", extra=report)

loop.set_exception_handler(exception_handler)

Using aiomonitor for Interactive Debugging

The aiomonitor library provides a telnet-based console for running async applications:

import aiomonitor

async def main():
    with aiomonitor.start_monitor(port=50101):
        # Your application runs here
        await run_server()

# Connect from another terminal:
# $ python -m aiomonitor.cli --port 50101
# > ps            # List tasks
# > where <id>    # Stack trace of a task
# > cancel <id>   # Cancel a stuck task

Profiling Async Code

Standard profilers don’t work well with async code because they measure wall time including suspension. Use yappi with async support:

import yappi

yappi.set_clock_type("wall")  # or "cpu" for CPU-only
yappi.start()
asyncio.run(main())
yappi.stop()

# Print stats for coroutines only
stats = yappi.get_func_stats(
    filter_callback=lambda stat: stat.is_coroutine
)
stats.print_all()

Structured Logging for Async

Attach task context to every log message using contextvars:

import contextvars
import logging

request_id = contextvars.ContextVar('request_id', default='unknown')

class AsyncContextFilter(logging.Filter):
    def filter(self, record):
        record.request_id = request_id.get()
        task = asyncio.current_task()
        record.task_name = task.get_name() if task else 'no-task'
        return True

handler = logging.StreamHandler()
handler.addFilter(AsyncContextFilter())
formatter = logging.Formatter(
    '%(asctime)s [%(task_name)s] [%(request_id)s] %(message)s'
)
handler.setFormatter(formatter)

Common Debugging Patterns Summary

SymptomLikely CauseTool
Program hangsDeadlock or forgotten awaitall_tasks() + await chain walk
Silent failuresUnobserved task exceptionCustom exception handler
Gradual slowdownResource leakConnection/task tracker
Intermittent errorsRace condition in shared stateStructured logging with task IDs
High latency spikesBlocking callbackSlow callback monitoring
Memory growthTasks never completingTask factory with age tracking

One thing to remember: Production async debugging requires proactive instrumentation — install custom task factories, exception handlers, and signal-based dump handlers before problems occur, because you can’t attach a debugger to a stuck production event loop.

pythonconcurrencyasynciodebugging

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.