Python Asyncio Queues — Deep Dive
Queue Internals
asyncio.Queue is built on three primitives:
collections.deque— the actual item storage (unbounded deque, regardless ofmaxsize).asyncio.Event/ internal waiters —_gettersand_puttersare deques ofasyncio.Futureobjects. When a consumer callsget()on an empty queue, a Future is appended to_getters. When a producerput()s an item, it pops from_gettersand resolves that Future._unfinished_taskscounter — incremented byput(), decremented bytask_done(), with_finishedEvent thatjoin()awaits.
The source code (CPython Lib/asyncio/queues.py) is under 200 lines — it’s one of the most readable stdlib modules.
Put/Get Flow
put(item):
1. If _getters has a waiting consumer:
→ Pop that consumer's Future, resolve it with item
→ Item never enters the deque (direct handoff)
2. Else if queue is not full:
→ Append item to deque
3. Else (queue is full):
→ Create a Future, append to _putters
→ await that Future (suspended until a consumer frees space)
get():
1. If deque is non-empty:
→ Popleft from deque
→ If _putters has a waiting producer, resolve it (freeing one slot)
2. Else:
→ Create a Future, append to _getters
→ await that Future (suspended until a producer adds an item)
The direct handoff in step 1 of put() means that when consumers are already waiting, items bypass the deque entirely — reducing latency.
Bounded vs Unbounded Queues
Unbounded (maxsize=0, the default)
put() never blocks. This is convenient but dangerous:
queue = asyncio.Queue() # unbounded
async def fast_producer():
while True:
await queue.put(generate_data())
# Never blocks — queue grows without limit
async def slow_consumer():
while True:
item = await queue.get()
await slow_processing(item) # takes 100x longer than producing
If the producer outpaces the consumer, memory grows until the process is killed. In production, this is the #1 cause of asyncio queue-related incidents.
Bounded (maxsize > 0)
put() blocks when the queue reaches capacity, creating natural backpressure:
queue = asyncio.Queue(maxsize=100)
Sizing the buffer: The maxsize should absorb temporary bursts without blocking the producer during steady-state operation. A good starting point: maxsize = consumer_count * 10. Monitor queue.qsize() in production to tune.
Multi-Stage Pipelines
Chain queues to create processing pipelines:
async def stage_download(url_queue, raw_queue):
while True:
url = await url_queue.get()
if url is None:
await raw_queue.put(None)
break
data = await fetch(url)
await raw_queue.put((url, data))
url_queue.task_done()
async def stage_parse(raw_queue, result_queue):
while True:
item = await raw_queue.get()
if item is None:
await result_queue.put(None)
break
url, data = item
parsed = parse_html(data)
await result_queue.put((url, parsed))
raw_queue.task_done()
async def stage_store(result_queue):
while True:
item = await result_queue.get()
if item is None:
break
url, parsed = item
await save_to_db(url, parsed)
result_queue.task_done()
async def main():
url_q = asyncio.Queue(maxsize=50)
raw_q = asyncio.Queue(maxsize=20)
result_q = asyncio.Queue(maxsize=20)
async with asyncio.TaskGroup() as tg:
# 5 downloaders, 3 parsers, 2 storers
for _ in range(5):
tg.create_task(stage_download(url_q, raw_q))
for _ in range(3):
tg.create_task(stage_parse(raw_q, result_q))
for _ in range(2):
tg.create_task(stage_store(result_q))
# Feed URLs
for url in url_list:
await url_q.put(url)
for _ in range(5):
await url_q.put(None) # one sentinel per downloader
Each stage has independently tuned concurrency and buffer sizes. Backpressure propagates upstream: if storage is slow, the result queue fills up, parsers block, the raw queue fills up, and downloaders slow down.
Graceful Shutdown Patterns
Sentinel Value
The simplest approach — put a special value (often None) to signal “no more items”:
# One sentinel per consumer
for _ in range(num_consumers):
await queue.put(None)
Drawback: Requires knowing the consumer count at shutdown time.
Cancellation-Based
Use asyncio.TaskGroup or manual cancellation:
async def consumer(queue):
try:
while True:
item = await queue.get()
await process(item)
queue.task_done()
except asyncio.CancelledError:
# Drain remaining items if needed
while not queue.empty():
item = queue.get_nowait()
await process(item)
queue.task_done()
Event-Based
shutdown = asyncio.Event()
async def consumer(queue):
while not shutdown.is_set():
try:
item = await asyncio.wait_for(queue.get(), timeout=1.0)
await process(item)
queue.task_done()
except asyncio.TimeoutError:
continue
Custom Queue Implementations
Drop-Oldest Queue
When the queue is full, discard the oldest item instead of blocking:
class DropOldestQueue(asyncio.Queue):
def put_nowait(self, item):
if self.full():
try:
self.get_nowait() # discard oldest
except asyncio.QueueEmpty:
pass
super().put_nowait(item)
Useful for real-time telemetry where stale data is worthless.
Batching Queue
Collect items and yield them in batches:
class BatchQueue:
def __init__(self, batch_size, timeout=1.0):
self._queue = asyncio.Queue()
self._batch_size = batch_size
self._timeout = timeout
async def put(self, item):
await self._queue.put(item)
async def get_batch(self):
batch = []
try:
while len(batch) < self._batch_size:
if batch:
item = await asyncio.wait_for(
self._queue.get(), timeout=self._timeout
)
else:
item = await self._queue.get()
batch.append(item)
except asyncio.TimeoutError:
pass
return batch
The first item blocks indefinitely; subsequent items time out after the deadline — balancing latency and throughput.
Monitoring and Observability
Track queue health metrics in production:
async def monitor_queues(queues: dict, interval=10):
while True:
for name, q in queues.items():
metrics.gauge(f"queue.{name}.size", q.qsize())
metrics.gauge(f"queue.{name}.maxsize", q.maxsize or -1)
utilization = q.qsize() / q.maxsize if q.maxsize else 0
metrics.gauge(f"queue.{name}.utilization", utilization)
await asyncio.sleep(interval)
Alert on sustained high utilization (>80%) — it means consumers can’t keep up and the pipeline is at risk of stalling.
Performance Considerations
- Direct handoff is faster than enqueueing + dequeueing. With balanced producer/consumer speeds and small
maxsize, most operations are direct handoffs. - PriorityQueue uses
heapqinternally — O(log n) per put/get instead of O(1) for regular Queue. - Queue creation is cheap — creating per-request queues for scatter-gather patterns is fine.
- Thread safety:
asyncio.Queueis NOT thread-safe. If you need to put items from a thread, useloop.call_soon_threadsafe(queue.put_nowait, item)orjanuslibrary for a dual-mode queue.
One thing to remember: Asyncio queues are coordination primitives, not just data structures — bounded queues with backpressure prevent memory explosions, task_done()/join() track completion, and multi-stage pipelines let you independently tune concurrency at each processing step.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.