Python Asyncio Queues — Core Concepts
What Asyncio Queues Solve
When you have coroutines producing data and other coroutines consuming it, you need a coordination mechanism. Directly coupling producers and consumers creates tight dependencies and makes it hard to control throughput.
asyncio.Queue decouples them: producers put() items, consumers get() items, and the queue handles the timing. Both operations are await-able — they suspend the coroutine without blocking the event loop.
Basic Producer-Consumer Pattern
import asyncio
async def producer(queue, items):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # sentinel to signal completion
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
await asyncio.sleep(0.1) # simulate processing
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
await asyncio.gather(
producer(queue, range(20)),
consumer(queue)
)
asyncio.run(main())
maxsize=10 provides backpressure: when the queue is full, put() blocks the producer until a consumer takes an item. This prevents a fast producer from overwhelming a slow consumer with unbounded memory usage.
Queue Types
asyncio.Queue — FIFO (First In, First Out)
Standard ordering. Items are consumed in the same order they were produced.
asyncio.PriorityQueue — Lowest Value First
Items are returned in sorted order. Wrap items in tuples with a priority:
queue = asyncio.PriorityQueue()
await queue.put((1, "urgent task"))
await queue.put((10, "low priority task"))
await queue.put((5, "medium task"))
item = await queue.get() # (1, "urgent task")
Lower numbers = higher priority. If priorities are equal, items must be comparable (or wrap them in a dataclass with __lt__).
asyncio.LifoQueue — Last In, First Out (Stack)
Most recently added item comes out first. Useful for depth-first processing patterns.
Multiple Consumers (Fan-Out)
Scale consumption by running multiple consumer coroutines:
async def consumer(name, queue):
while True:
item = await queue.get()
if item is None:
break
print(f"[{name}] Processing: {item}")
await asyncio.sleep(0.5)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=20)
# 1 producer, 3 consumers
consumers = [
asyncio.create_task(consumer(f"C{i}", queue))
for i in range(3)
]
# Produce items
for i in range(30):
await queue.put(i)
# Wait for all items to be processed
await queue.join()
# Signal consumers to stop
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
queue.join() blocks until every item that was put() has been matched with a task_done() call. This is the cleanest way to know “all work is finished.”
task_done() and join()
The task_done() / join() pair is a counting mechanism:
- Each
put()increments an internal counter. - Each
task_done()decrements it. join()blocks until the counter reaches zero.
Forgetting task_done() means join() hangs forever. It’s a common bug — always call it after processing an item, even if processing fails:
async def safe_consumer(queue):
while True:
item = await queue.get()
try:
await process(item)
except Exception:
logging.exception(f"Failed to process {item}")
finally:
queue.task_done() # always mark as done
Non-Blocking Operations
put_nowait() and get_nowait() raise asyncio.QueueFull or asyncio.QueueEmpty immediately instead of waiting:
try:
queue.put_nowait(item)
except asyncio.QueueFull:
print("Queue is full, dropping item")
Useful for fire-and-forget patterns or when you want to implement custom backpressure (drop oldest, sample, etc.).
Common Misconception
“asyncio.Queue is the same as queue.Queue.” They serve similar purposes but are completely different classes. queue.Queue is for threads — its get() and put() block the calling thread. asyncio.Queue is for coroutines — its methods are await-able and cooperate with the event loop. Using queue.Queue in async code blocks the entire event loop.
One thing to remember: asyncio.Queue is the bridge between producers and consumers in async code — set a maxsize for backpressure, use task_done() with join() for completion tracking, and never confuse it with the thread-safe queue.Queue.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.