Python Coroutine Lifecycle — Deep Dive

The Coroutine Object Internals

When CPython compiles an async def function, it sets the CO_COROUTINE flag on the code object. Calling the function creates a PyCoroObject — essentially a generator with extra restrictions (you can’t use next() on it).

The coroutine object holds:

  • cr_frame — the execution frame with local variables and instruction pointer
  • cr_code — the compiled code object
  • cr_origin — traceback of where the coroutine was created (debug mode only)
  • cr_await — the object currently being awaited (None if not suspended)
async def example():
    await asyncio.sleep(1)
    return 42

coro = example()
print(coro.cr_frame)   # <frame at 0x...>
print(coro.cr_await)   # None (not yet running)
print(coro.cr_code)    # <code object example at 0x...>

The send/throw Protocol

Coroutines implement the same send/throw protocol as generators:

# Advance the coroutine (like next() but with a value)
result = coro.send(None)  # First call must send None

# Inject an exception at the suspension point
coro.throw(SomeError("failed"))

# Force the coroutine to exit (raises GeneratorExit inside)
coro.close()

When send() is called, CPython resumes execution at the last yield point (internally, await compiles down to yield from over __await__ iterators). The coroutine runs until it hits another await, returns, or raises.

What send() Returns

The value returned by send() is whatever the innermost awaitable yields. For asyncio futures, this is typically the future itself — the Task uses this to attach a completion callback.

Bytecode Deep Dive

Consider this coroutine:

async def fetch(url):
    response = await http_get(url)
    return response.json()

The bytecode (simplified from dis.dis(fetch)) looks like:

  LOAD_GLOBAL          http_get
  LOAD_FAST            url
  CALL_FUNCTION        1
  GET_AWAITABLE
  LOAD_CONST           None
  YIELD_FROM                    # <-- suspension point
  STORE_FAST           response
  LOAD_FAST            response
  LOAD_METHOD          json
  CALL_METHOD          0
  RETURN_VALUE

The critical instructions are GET_AWAITABLE (calls __await__() on the result) and YIELD_FROM (suspends the coroutine and propagates values up the chain).

In Python 3.11+, the bytecode changed with the introduction of SEND and END_SEND instructions, replacing YIELD_FROM for better performance and clearer semantics.

Task.__step — The State Machine Driver

The Task.__step method is where the lifecycle transitions happen:

def __step(self, exc=None):
    coro = self._coro
    self.__fut_waiter = None

    try:
        if exc is None:
            result = coro.send(None)
        else:
            result = coro.throw(type(exc), exc, exc.__traceback__)
    except StopIteration as e:
        # Coroutine completed normally
        super().set_result(e.value)
    except CancelledError as e:
        # Cancellation propagated
        super().cancel(msg=e.args[0] if e.args else None)
    except BaseException as e:
        # Coroutine raised an exception
        super().set_exception(e)
    else:
        # Coroutine suspended — result should be a Future
        if isinstance(result, futures._PyFuture):
            result.add_done_callback(self.__wakeup)
            self.__fut_waiter = result
        elif result is None:
            # A bare yield — reschedule for next iteration
            self._loop.call_soon(self.__step)
        else:
            # Invalid yield
            raise RuntimeError(f"Task got bad yield: {result!r}")

Notice the __fut_waiter attribute — it tracks which Future the Task is currently waiting for. This is used by Task.cancel() to propagate cancellation to the inner awaitable.

Coroutine Finalization

When a coroutine is garbage collected without completing, CPython calls coro.close(), which throws GeneratorExit into the coroutine. If the coroutine catches this and tries to yield again, a RuntimeError is raised.

In debug mode, asyncio also tracks “never awaited” coroutines using cr_origin:

# CPython sets this during creation when PYTHONASYNCIODEBUG is set
coro.cr_origin = traceback.extract_stack()[:limit]

This is how the RuntimeWarning: coroutine was never awaited message includes the creation site.

Nested Await Chains

When coroutine A awaits coroutine B which awaits a Future F, the chain looks like:

Task.__step
  → coro_A.send(None)
    → coro_B.send(None)        # via YIELD_FROM / SEND
      → future.__await__()
        → yields future itself  # propagates back up
Task stores future in __fut_waiter
Task attaches __wakeup to future

# When future resolves:
Task.__wakeup
  → Task.__step
    → coro_A.send(None)
      → coro_B.send(result)    # result propagates down
        → returns to coro_A

The entire chain suspends and resumes atomically from the Task’s perspective. The event loop only interacts with the outermost Task — the intermediate coroutines are driven by Python’s YIELD_FROM / SEND machinery.

Coroutine Wrapper Deprecation

In older Python versions (3.4-3.10), asyncio.coroutine decorated regular generators to make them act like coroutines. This used CO_ITERABLE_COROUTINE flag and types.coroutine(). As of Python 3.11, this is deprecated. Native async def coroutines are the only supported form.

Performance Considerations

Coroutine creation is cheap (~0.5μs) but not free. Each coroutine allocates a frame object with space for local variables. For very hot paths, you might see:

# Slower — creates a coroutine even for cache hits
async def get_cached(key):
    if key in cache:
        return cache[key]
    return await fetch(key)

# Faster — avoids coroutine overhead for cache hits
def get_cached(key):
    if key in cache:
        fut = asyncio.get_event_loop().create_future()
        fut.set_result(cache[key])
        return fut
    return fetch(key)

Python 3.12’s Task is implemented in C, reducing per-step overhead by roughly 40% compared to the pure Python version.

Practical Debugging

Track coroutine states across your application:

import asyncio
import sys

def coroutine_report():
    """Print all active tasks and their current await chain."""
    for task in asyncio.all_tasks():
        coro = task.get_coro()
        chain = []
        while coro is not None:
            chain.append(f"{coro.cr_code.co_name} at {coro.cr_code.co_filename}:{coro.cr_frame.f_lineno if coro.cr_frame else '?'}")
            coro = coro.cr_await
        print(f"Task {task.get_name()}: {' → '.join(chain)}")

This walks the cr_await chain to show exactly where each task is suspended — invaluable for diagnosing hangs in production.

One thing to remember: A coroutine is a frame-preserving state machine driven by send()/throw() — the Task class orchestrates these calls, and suspension happens at YIELD_FROM/SEND bytecode instructions where the coroutine yields a Future back to the event loop.

pythonconcurrencyasynciocoroutines

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.