Python Asyncio Queues — Deep Dive

Queue Internals

asyncio.Queue is built on three primitives:

  1. collections.deque — the actual item storage (unbounded deque, regardless of maxsize).
  2. asyncio.Event / internal waiters_getters and _putters are deques of asyncio.Future objects. When a consumer calls get() on an empty queue, a Future is appended to _getters. When a producer put()s an item, it pops from _getters and resolves that Future.
  3. _unfinished_tasks counter — incremented by put(), decremented by task_done(), with _finished Event that join() awaits.

The source code (CPython Lib/asyncio/queues.py) is under 200 lines — it’s one of the most readable stdlib modules.

Put/Get Flow

put(item):
  1. If _getters has a waiting consumer:
     → Pop that consumer's Future, resolve it with item
     → Item never enters the deque (direct handoff)
  2. Else if queue is not full:
     → Append item to deque
  3. Else (queue is full):
     → Create a Future, append to _putters
     → await that Future (suspended until a consumer frees space)

get():
  1. If deque is non-empty:
     → Popleft from deque
     → If _putters has a waiting producer, resolve it (freeing one slot)
  2. Else:
     → Create a Future, append to _getters
     → await that Future (suspended until a producer adds an item)

The direct handoff in step 1 of put() means that when consumers are already waiting, items bypass the deque entirely — reducing latency.

Bounded vs Unbounded Queues

Unbounded (maxsize=0, the default)

put() never blocks. This is convenient but dangerous:

queue = asyncio.Queue()  # unbounded

async def fast_producer():
    while True:
        await queue.put(generate_data())
        # Never blocks — queue grows without limit

async def slow_consumer():
    while True:
        item = await queue.get()
        await slow_processing(item)  # takes 100x longer than producing

If the producer outpaces the consumer, memory grows until the process is killed. In production, this is the #1 cause of asyncio queue-related incidents.

Bounded (maxsize > 0)

put() blocks when the queue reaches capacity, creating natural backpressure:

queue = asyncio.Queue(maxsize=100)

Sizing the buffer: The maxsize should absorb temporary bursts without blocking the producer during steady-state operation. A good starting point: maxsize = consumer_count * 10. Monitor queue.qsize() in production to tune.

Multi-Stage Pipelines

Chain queues to create processing pipelines:

async def stage_download(url_queue, raw_queue):
    while True:
        url = await url_queue.get()
        if url is None:
            await raw_queue.put(None)
            break
        data = await fetch(url)
        await raw_queue.put((url, data))
        url_queue.task_done()

async def stage_parse(raw_queue, result_queue):
    while True:
        item = await raw_queue.get()
        if item is None:
            await result_queue.put(None)
            break
        url, data = item
        parsed = parse_html(data)
        await result_queue.put((url, parsed))
        raw_queue.task_done()

async def stage_store(result_queue):
    while True:
        item = await result_queue.get()
        if item is None:
            break
        url, parsed = item
        await save_to_db(url, parsed)
        result_queue.task_done()

async def main():
    url_q = asyncio.Queue(maxsize=50)
    raw_q = asyncio.Queue(maxsize=20)
    result_q = asyncio.Queue(maxsize=20)
    
    async with asyncio.TaskGroup() as tg:
        # 5 downloaders, 3 parsers, 2 storers
        for _ in range(5):
            tg.create_task(stage_download(url_q, raw_q))
        for _ in range(3):
            tg.create_task(stage_parse(raw_q, result_q))
        for _ in range(2):
            tg.create_task(stage_store(result_q))
        
        # Feed URLs
        for url in url_list:
            await url_q.put(url)
        for _ in range(5):
            await url_q.put(None)  # one sentinel per downloader

Each stage has independently tuned concurrency and buffer sizes. Backpressure propagates upstream: if storage is slow, the result queue fills up, parsers block, the raw queue fills up, and downloaders slow down.

Graceful Shutdown Patterns

Sentinel Value

The simplest approach — put a special value (often None) to signal “no more items”:

# One sentinel per consumer
for _ in range(num_consumers):
    await queue.put(None)

Drawback: Requires knowing the consumer count at shutdown time.

Cancellation-Based

Use asyncio.TaskGroup or manual cancellation:

async def consumer(queue):
    try:
        while True:
            item = await queue.get()
            await process(item)
            queue.task_done()
    except asyncio.CancelledError:
        # Drain remaining items if needed
        while not queue.empty():
            item = queue.get_nowait()
            await process(item)
            queue.task_done()

Event-Based

shutdown = asyncio.Event()

async def consumer(queue):
    while not shutdown.is_set():
        try:
            item = await asyncio.wait_for(queue.get(), timeout=1.0)
            await process(item)
            queue.task_done()
        except asyncio.TimeoutError:
            continue

Custom Queue Implementations

Drop-Oldest Queue

When the queue is full, discard the oldest item instead of blocking:

class DropOldestQueue(asyncio.Queue):
    def put_nowait(self, item):
        if self.full():
            try:
                self.get_nowait()  # discard oldest
            except asyncio.QueueEmpty:
                pass
        super().put_nowait(item)

Useful for real-time telemetry where stale data is worthless.

Batching Queue

Collect items and yield them in batches:

class BatchQueue:
    def __init__(self, batch_size, timeout=1.0):
        self._queue = asyncio.Queue()
        self._batch_size = batch_size
        self._timeout = timeout
    
    async def put(self, item):
        await self._queue.put(item)
    
    async def get_batch(self):
        batch = []
        try:
            while len(batch) < self._batch_size:
                if batch:
                    item = await asyncio.wait_for(
                        self._queue.get(), timeout=self._timeout
                    )
                else:
                    item = await self._queue.get()
                batch.append(item)
        except asyncio.TimeoutError:
            pass
        return batch

The first item blocks indefinitely; subsequent items time out after the deadline — balancing latency and throughput.

Monitoring and Observability

Track queue health metrics in production:

async def monitor_queues(queues: dict, interval=10):
    while True:
        for name, q in queues.items():
            metrics.gauge(f"queue.{name}.size", q.qsize())
            metrics.gauge(f"queue.{name}.maxsize", q.maxsize or -1)
            utilization = q.qsize() / q.maxsize if q.maxsize else 0
            metrics.gauge(f"queue.{name}.utilization", utilization)
        await asyncio.sleep(interval)

Alert on sustained high utilization (>80%) — it means consumers can’t keep up and the pipeline is at risk of stalling.

Performance Considerations

  • Direct handoff is faster than enqueueing + dequeueing. With balanced producer/consumer speeds and small maxsize, most operations are direct handoffs.
  • PriorityQueue uses heapq internally — O(log n) per put/get instead of O(1) for regular Queue.
  • Queue creation is cheap — creating per-request queues for scatter-gather patterns is fine.
  • Thread safety: asyncio.Queue is NOT thread-safe. If you need to put items from a thread, use loop.call_soon_threadsafe(queue.put_nowait, item) or janus library for a dual-mode queue.

One thing to remember: Asyncio queues are coordination primitives, not just data structures — bounded queues with backpressure prevent memory explosions, task_done()/join() track completion, and multi-stage pipelines let you independently tune concurrency at each processing step.

pythonconcurrencyasyncio

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.