Python Async Iterators — Deep Dive

Inside the Async Iteration Protocol

The async iteration protocol was defined in PEP 492 (Python 3.5) and extended by PEP 525 (async generators, Python 3.6). At the bytecode level, async for compiles to GET_AITER and GET_ANEXT opcodes, which call __aiter__ and __anext__ respectively, with the results being awaited.

The Full Protocol Handshake

# What `async for item in source: body` compiles to (conceptually):
_iter = type(source).__aiter__(source)
while True:
    try:
        item = await type(_iter).__anext__(_iter)
    except StopAsyncIteration:
        break
    body

The type(x).__method__(x) form is significant — Python uses the type’s method, not the instance’s, preventing instance-level monkey-patching from breaking the protocol.

Async Generator Internals

Async generators are more complex than they appear. They’re implemented as a special kind of coroutine that maintains state between yields.

Lifecycle of an Async Generator

async def numbers():
    try:
        i = 0
        while True:
            yield i
            i += 1
    finally:
        print("Cleanup!")
  1. Creation: Calling numbers() returns an async generator object. No code runs yet.
  2. First __anext__: Executes until the first yield, suspends, returns the yielded value.
  3. Subsequent __anext__: Resumes from after yield, runs to the next yield.
  4. Finalization: When the generator is garbage collected or explicitly closed, Python throws GeneratorExit into it, triggering the finally block.

The Finalization Problem

Async generators have a notorious issue: their finally blocks contain async code, but garbage collection is synchronous. Python can’t await cleanup during GC.

async def db_rows(conn):
    cursor = await conn.cursor("SELECT * FROM big_table")
    try:
        async for row in cursor:
            yield row
    finally:
        await cursor.close()  # This might not run during GC!

Solutions:

  1. Explicit aclose(): Always close async generators explicitly.

    gen = db_rows(conn)
    try:
        async for row in gen:
            if found_enough:
                break
    finally:
        await gen.aclose()
  2. contextlib.aclosing (Python 3.10+):

    from contextlib import aclosing
    
    async with aclosing(db_rows(conn)) as rows:
        async for row in rows:
            process(row)
  3. Shutdown callback: sys.set_asyncgen_hooks() lets you install a finalizer that the event loop calls.

Building Advanced Async Iterators

Buffered Async Iterator

Fetch items in batches but yield them one at a time:

class BufferedAsyncIterator:
    def __init__(self, source, buffer_size=100):
        self.source = source
        self.buffer_size = buffer_size
        self.buffer = []
        self.index = 0
        self.exhausted = False
    
    def __aiter__(self):
        return self
    
    async def _fill_buffer(self):
        self.buffer = []
        self.index = 0
        count = 0
        async for item in self.source:
            self.buffer.append(item)
            count += 1
            if count >= self.buffer_size:
                break
        if count == 0:
            self.exhausted = True
    
    async def __anext__(self):
        if self.index >= len(self.buffer):
            if self.exhausted:
                raise StopAsyncIteration
            await self._fill_buffer()
            if self.exhausted:
                raise StopAsyncIteration
        item = self.buffer[self.index]
        self.index += 1
        return item

Merge Multiple Async Iterators

Process items from multiple sources as they arrive:

async def merge_iterators(*aiters):
    """Yield items from multiple async iterators as they become available."""
    queue = asyncio.Queue()
    sentinel = object()
    active = len(aiters)
    
    async def feed(aiter):
        nonlocal active
        async for item in aiter:
            await queue.put(item)
        active -= 1
        if active == 0:
            await queue.put(sentinel)
    
    async with asyncio.TaskGroup() as tg:
        for aiter in aiters:
            tg.create_task(feed(aiter))
        
        while True:
            item = await queue.get()
            if item is sentinel:
                break
            yield item

Async Iterator with Backpressure

When the producer is faster than the consumer:

class BackpressuredStream:
    def __init__(self, source, max_pending=10):
        self.source = source
        self.semaphore = asyncio.Semaphore(max_pending)
        self.queue = asyncio.Queue(maxsize=max_pending)
        self._producer_task = None
    
    def __aiter__(self):
        self._producer_task = asyncio.create_task(self._produce())
        return self
    
    async def _produce(self):
        try:
            async for item in self.source:
                await self.queue.put(item)  # Blocks when queue is full
            await self.queue.put(None)  # Sentinel
        except asyncio.CancelledError:
            await self.queue.put(None)
    
    async def __anext__(self):
        item = await self.queue.get()
        if item is None:
            if self._producer_task and not self._producer_task.done():
                self._producer_task.cancel()
            raise StopAsyncIteration
        return item

async for with else

Like regular for loops, async for supports an else clause:

async for message in stream:
    if message.type == "shutdown":
        break
else:
    # Only runs if the loop completed without break
    print("Stream ended naturally")

Performance Considerations

Overhead of Async vs Sync Iteration

Async iteration has measurable overhead compared to sync iteration, even when no actual I/O occurs:

  • Each __anext__ call involves creating and awaiting a coroutine
  • The event loop’s scheduling machinery runs between iterations
  • For CPU-bound processing, this overhead can be 5-10× slower than sync iteration

Rule of thumb: If you’re iterating over data that’s already in memory, use regular iteration. Async iteration only makes sense when there’s actual I/O between items.

Reducing Round Trips

Instead of:

async for row in cursor:  # One network round-trip per row
    process(row)

Use batched fetching:

while batch := await cursor.fetchmany(1000):  # One trip per 1000 rows
    for row in batch:
        process(row)

Real-World Patterns

Paginated API Consumer

async def paginated_fetch(client, endpoint, page_size=100):
    """Async iterator over paginated API results."""
    cursor = None
    while True:
        params = {"limit": page_size}
        if cursor:
            params["cursor"] = cursor
        
        response = await client.get(endpoint, params=params)
        data = response.json()
        
        for item in data["results"]:
            yield item
        
        cursor = data.get("next_cursor")
        if not cursor:
            break

Server-Sent Events Consumer

async def sse_events(url):
    """Async iterator over server-sent events."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            buffer = ""
            async for chunk in response.content.iter_any():
                buffer += chunk.decode()
                while "\n\n" in buffer:
                    event_str, buffer = buffer.split("\n\n", 1)
                    yield parse_sse(event_str)

Database Change Stream

async def watch_changes(collection, pipeline=None):
    """MongoDB change stream as an async iterator."""
    async with collection.watch(pipeline or []) as stream:
        async for change in stream:
            yield change

Type Annotations

from typing import AsyncIterator, AsyncGenerator

# For classes implementing the protocol
class MyAsyncIter:
    async def __anext__(self) -> int: ...
    def __aiter__(self) -> AsyncIterator[int]:
        return self

# For async generator functions
async def my_gen() -> AsyncGenerator[int, None]:
    yield 1
    yield 2

# In function signatures
async def consume(source: AsyncIterator[str]) -> list[str]:
    return [item async for item in source]

One thing to remember: Async iterators shine for streaming I/O — paginated APIs, database cursors, event streams. Always close async generators explicitly (use aclosing()), batch network round-trips for performance, and remember that async iteration is sequential — use merging patterns or TaskGroups when you need concurrent consumption.

pythonasynciterators

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.