Python Backpressure Handling — Core Concepts
What Backpressure Really Is
Backpressure is a feedback mechanism that slows producers when consumers can’t keep up. Without it, unbounded queues grow until memory runs out, latency spikes, and the application crashes.
In async Python, backpressure manifests in three ways:
- Memory growth — queues or buffers expand without limit
- Latency inflation — items wait longer and longer before processing
- OOM crashes — the process is killed when memory is exhausted
Bounded Queues: The Basic Tool
asyncio.Queue(maxsize=N) is the simplest backpressure mechanism:
import asyncio
async def producer(queue):
for i in range(1000):
await queue.put(i) # Blocks when queue is full
print(f"Produced {i}")
async def consumer(queue):
while True:
item = await queue.get() # Blocks when queue is empty
await slow_process(item)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=20)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue))
tg.create_task(consumer(queue))
When maxsize=20 and the queue is full, producer pauses at put(). This naturally throttles the producer to match the consumer’s speed.
Choosing the Right Queue Size
The queue size determines the trade-off between throughput and memory:
- Too small (1-5): Producer idles frequently, lower throughput
- Sweet spot (10-100): Absorbs bursts while limiting memory
- Too large (10,000+): Basically unbounded — backpressure kicks in too late
A good starting point: maxsize = 2 × consumer_count × average_processing_time / average_production_time.
Semaphore-Based Throttling
For cases where you don’t have a queue (e.g., launching HTTP requests), use a semaphore:
async def fetch_all(urls, concurrency=10):
semaphore = asyncio.Semaphore(concurrency)
async def fetch_one(url):
async with semaphore: # At most 10 concurrent fetches
return await httpx.get(url)
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_one(url)) for url in urls]
return [t.result() for t in tasks]
The semaphore limits how many tasks are actively doing I/O. The rest wait their turn.
Flow Control in asyncio Streams
TCP sockets have built-in backpressure via the OS send buffer. asyncio exposes this through the StreamWriter API:
async def send_data(writer, data_chunks):
for chunk in data_chunks:
writer.write(chunk)
await writer.drain() # Waits if the write buffer is too full
writer.drain() is the backpressure point. It returns immediately if the buffer is below the high-water mark. If the buffer exceeds the mark, it waits until it drops below the low-water mark.
# Configuring water marks
transport = writer.transport
transport.set_write_buffer_limits(high=64*1024, low=16*1024)
Async Generator Backpressure
Async generators naturally provide backpressure because they produce one item at a time:
async def read_large_file(path, chunk_size=8192):
async with aiofiles.open(path, 'rb') as f:
while chunk := await f.read(chunk_size):
yield chunk # Consumer pulls at its own pace
async def upload(path, dest):
async for chunk in read_large_file(path):
await dest.write(chunk) # Upload at network speed
The producer only generates the next chunk when the consumer asks for it.
Common Misconception: “Just Use a Bigger Queue”
A bigger queue doesn’t solve backpressure — it delays it. If your producer consistently outpaces your consumer, even a queue of 1 million will eventually fill up. The only real solutions are:
- Slow down the producer (bounded queue, semaphore)
- Speed up the consumer (more workers, optimization)
- Drop items intentionally (load shedding)
Pattern: Multiple Consumers
Scale the consumer side to match the producer:
async def main():
queue = asyncio.Queue(maxsize=50)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue))
# 5 consumers process in parallel
for _ in range(5):
tg.create_task(consumer(queue))
Monitor the queue size to decide when to add or remove consumers.
One thing to remember: Backpressure is about making the fast part wait for the slow part — bounded queues, semaphores, and writer.drain() are the three tools that prevent async systems from drowning in their own output.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.