Python Async Iterators — Deep Dive
Inside the Async Iteration Protocol
The async iteration protocol was defined in PEP 492 (Python 3.5) and extended by PEP 525 (async generators, Python 3.6). At the bytecode level, async for compiles to GET_AITER and GET_ANEXT opcodes, which call __aiter__ and __anext__ respectively, with the results being awaited.
The Full Protocol Handshake
# What `async for item in source: body` compiles to (conceptually):
_iter = type(source).__aiter__(source)
while True:
try:
item = await type(_iter).__anext__(_iter)
except StopAsyncIteration:
break
body
The type(x).__method__(x) form is significant — Python uses the type’s method, not the instance’s, preventing instance-level monkey-patching from breaking the protocol.
Async Generator Internals
Async generators are more complex than they appear. They’re implemented as a special kind of coroutine that maintains state between yields.
Lifecycle of an Async Generator
async def numbers():
try:
i = 0
while True:
yield i
i += 1
finally:
print("Cleanup!")
- Creation: Calling
numbers()returns an async generator object. No code runs yet. - First
__anext__: Executes until the firstyield, suspends, returns the yielded value. - Subsequent
__anext__: Resumes from afteryield, runs to the nextyield. - Finalization: When the generator is garbage collected or explicitly closed, Python throws
GeneratorExitinto it, triggering thefinallyblock.
The Finalization Problem
Async generators have a notorious issue: their finally blocks contain async code, but garbage collection is synchronous. Python can’t await cleanup during GC.
async def db_rows(conn):
cursor = await conn.cursor("SELECT * FROM big_table")
try:
async for row in cursor:
yield row
finally:
await cursor.close() # This might not run during GC!
Solutions:
-
Explicit
aclose(): Always close async generators explicitly.gen = db_rows(conn) try: async for row in gen: if found_enough: break finally: await gen.aclose() -
contextlib.aclosing(Python 3.10+):from contextlib import aclosing async with aclosing(db_rows(conn)) as rows: async for row in rows: process(row) -
Shutdown callback:
sys.set_asyncgen_hooks()lets you install a finalizer that the event loop calls.
Building Advanced Async Iterators
Buffered Async Iterator
Fetch items in batches but yield them one at a time:
class BufferedAsyncIterator:
def __init__(self, source, buffer_size=100):
self.source = source
self.buffer_size = buffer_size
self.buffer = []
self.index = 0
self.exhausted = False
def __aiter__(self):
return self
async def _fill_buffer(self):
self.buffer = []
self.index = 0
count = 0
async for item in self.source:
self.buffer.append(item)
count += 1
if count >= self.buffer_size:
break
if count == 0:
self.exhausted = True
async def __anext__(self):
if self.index >= len(self.buffer):
if self.exhausted:
raise StopAsyncIteration
await self._fill_buffer()
if self.exhausted:
raise StopAsyncIteration
item = self.buffer[self.index]
self.index += 1
return item
Merge Multiple Async Iterators
Process items from multiple sources as they arrive:
async def merge_iterators(*aiters):
"""Yield items from multiple async iterators as they become available."""
queue = asyncio.Queue()
sentinel = object()
active = len(aiters)
async def feed(aiter):
nonlocal active
async for item in aiter:
await queue.put(item)
active -= 1
if active == 0:
await queue.put(sentinel)
async with asyncio.TaskGroup() as tg:
for aiter in aiters:
tg.create_task(feed(aiter))
while True:
item = await queue.get()
if item is sentinel:
break
yield item
Async Iterator with Backpressure
When the producer is faster than the consumer:
class BackpressuredStream:
def __init__(self, source, max_pending=10):
self.source = source
self.semaphore = asyncio.Semaphore(max_pending)
self.queue = asyncio.Queue(maxsize=max_pending)
self._producer_task = None
def __aiter__(self):
self._producer_task = asyncio.create_task(self._produce())
return self
async def _produce(self):
try:
async for item in self.source:
await self.queue.put(item) # Blocks when queue is full
await self.queue.put(None) # Sentinel
except asyncio.CancelledError:
await self.queue.put(None)
async def __anext__(self):
item = await self.queue.get()
if item is None:
if self._producer_task and not self._producer_task.done():
self._producer_task.cancel()
raise StopAsyncIteration
return item
async for with else
Like regular for loops, async for supports an else clause:
async for message in stream:
if message.type == "shutdown":
break
else:
# Only runs if the loop completed without break
print("Stream ended naturally")
Performance Considerations
Overhead of Async vs Sync Iteration
Async iteration has measurable overhead compared to sync iteration, even when no actual I/O occurs:
- Each
__anext__call involves creating and awaiting a coroutine - The event loop’s scheduling machinery runs between iterations
- For CPU-bound processing, this overhead can be 5-10× slower than sync iteration
Rule of thumb: If you’re iterating over data that’s already in memory, use regular iteration. Async iteration only makes sense when there’s actual I/O between items.
Reducing Round Trips
Instead of:
async for row in cursor: # One network round-trip per row
process(row)
Use batched fetching:
while batch := await cursor.fetchmany(1000): # One trip per 1000 rows
for row in batch:
process(row)
Real-World Patterns
Paginated API Consumer
async def paginated_fetch(client, endpoint, page_size=100):
"""Async iterator over paginated API results."""
cursor = None
while True:
params = {"limit": page_size}
if cursor:
params["cursor"] = cursor
response = await client.get(endpoint, params=params)
data = response.json()
for item in data["results"]:
yield item
cursor = data.get("next_cursor")
if not cursor:
break
Server-Sent Events Consumer
async def sse_events(url):
"""Async iterator over server-sent events."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
buffer = ""
async for chunk in response.content.iter_any():
buffer += chunk.decode()
while "\n\n" in buffer:
event_str, buffer = buffer.split("\n\n", 1)
yield parse_sse(event_str)
Database Change Stream
async def watch_changes(collection, pipeline=None):
"""MongoDB change stream as an async iterator."""
async with collection.watch(pipeline or []) as stream:
async for change in stream:
yield change
Type Annotations
from typing import AsyncIterator, AsyncGenerator
# For classes implementing the protocol
class MyAsyncIter:
async def __anext__(self) -> int: ...
def __aiter__(self) -> AsyncIterator[int]:
return self
# For async generator functions
async def my_gen() -> AsyncGenerator[int, None]:
yield 1
yield 2
# In function signatures
async def consume(source: AsyncIterator[str]) -> list[str]:
return [item async for item in source]
One thing to remember: Async iterators shine for streaming I/O — paginated APIs, database cursors, event streams. Always close async generators explicitly (use aclosing()), batch network round-trips for performance, and remember that async iteration is sequential — use merging patterns or TaskGroups when you need concurrent consumption.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.