Python Backpressure Handling — Core Concepts

What Backpressure Really Is

Backpressure is a feedback mechanism that slows producers when consumers can’t keep up. Without it, unbounded queues grow until memory runs out, latency spikes, and the application crashes.

In async Python, backpressure manifests in three ways:

  1. Memory growth — queues or buffers expand without limit
  2. Latency inflation — items wait longer and longer before processing
  3. OOM crashes — the process is killed when memory is exhausted

Bounded Queues: The Basic Tool

asyncio.Queue(maxsize=N) is the simplest backpressure mechanism:

import asyncio

async def producer(queue):
    for i in range(1000):
        await queue.put(i)       # Blocks when queue is full
        print(f"Produced {i}")

async def consumer(queue):
    while True:
        item = await queue.get()  # Blocks when queue is empty
        await slow_process(item)
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=20)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))
        tg.create_task(consumer(queue))

When maxsize=20 and the queue is full, producer pauses at put(). This naturally throttles the producer to match the consumer’s speed.

Choosing the Right Queue Size

The queue size determines the trade-off between throughput and memory:

  • Too small (1-5): Producer idles frequently, lower throughput
  • Sweet spot (10-100): Absorbs bursts while limiting memory
  • Too large (10,000+): Basically unbounded — backpressure kicks in too late

A good starting point: maxsize = 2 × consumer_count × average_processing_time / average_production_time.

Semaphore-Based Throttling

For cases where you don’t have a queue (e.g., launching HTTP requests), use a semaphore:

async def fetch_all(urls, concurrency=10):
    semaphore = asyncio.Semaphore(concurrency)

    async def fetch_one(url):
        async with semaphore:  # At most 10 concurrent fetches
            return await httpx.get(url)

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch_one(url)) for url in urls]
    return [t.result() for t in tasks]

The semaphore limits how many tasks are actively doing I/O. The rest wait their turn.

Flow Control in asyncio Streams

TCP sockets have built-in backpressure via the OS send buffer. asyncio exposes this through the StreamWriter API:

async def send_data(writer, data_chunks):
    for chunk in data_chunks:
        writer.write(chunk)
        await writer.drain()  # Waits if the write buffer is too full

writer.drain() is the backpressure point. It returns immediately if the buffer is below the high-water mark. If the buffer exceeds the mark, it waits until it drops below the low-water mark.

# Configuring water marks
transport = writer.transport
transport.set_write_buffer_limits(high=64*1024, low=16*1024)

Async Generator Backpressure

Async generators naturally provide backpressure because they produce one item at a time:

async def read_large_file(path, chunk_size=8192):
    async with aiofiles.open(path, 'rb') as f:
        while chunk := await f.read(chunk_size):
            yield chunk  # Consumer pulls at its own pace

async def upload(path, dest):
    async for chunk in read_large_file(path):
        await dest.write(chunk)  # Upload at network speed

The producer only generates the next chunk when the consumer asks for it.

Common Misconception: “Just Use a Bigger Queue”

A bigger queue doesn’t solve backpressure — it delays it. If your producer consistently outpaces your consumer, even a queue of 1 million will eventually fill up. The only real solutions are:

  1. Slow down the producer (bounded queue, semaphore)
  2. Speed up the consumer (more workers, optimization)
  3. Drop items intentionally (load shedding)

Pattern: Multiple Consumers

Scale the consumer side to match the producer:

async def main():
    queue = asyncio.Queue(maxsize=50)
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))
        # 5 consumers process in parallel
        for _ in range(5):
            tg.create_task(consumer(queue))

Monitor the queue size to decide when to add or remove consumers.

One thing to remember: Backpressure is about making the fast part wait for the slow part — bounded queues, semaphores, and writer.drain() are the three tools that prevent async systems from drowning in their own output.

pythonconcurrencyasynciobackpressure

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.