Python Priority Queue Patterns — Deep Dive
Heap Internals
Python’s heapq implements a binary min-heap stored as a flat list. The parent of element at index i is at (i - 1) // 2. Children are at 2*i + 1 and 2*i + 2. This compact representation means no pointer overhead — just array indexing.
The heap invariant: heap[k] <= heap[2*k+1] and heap[k] <= heap[2*k+2] for all valid indices. heappush appends to the end and sifts up; heappop swaps root with the last element, removes it, and sifts down.
import heapq
# Under the hood, heappush does this:
def _sift_up(heap, pos):
newitem = heap[pos]
while pos > 0:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if newitem < parent:
heap[pos] = parent
pos = parentpos
else:
break
heap[pos] = newitem
The CPython implementation is in pure Python (Lib/heapq.py) with a C accelerator (Modules/_heapqmodule.c). The C version is roughly 4-10x faster for push/pop operations.
The Mark-as-Invalid Pattern
Standard heaps don’t support updating priorities. The classic workaround: mark the old entry as invalid and insert a new one.
import heapq
import itertools
class UpdatablePriorityQueue:
REMOVED = '<removed>'
def __init__(self):
self._heap = []
self._entry_finder = {}
self._counter = itertools.count()
def add_task(self, task, priority=0):
if task in self._entry_finder:
self.remove_task(task)
count = next(self._counter)
entry = [priority, count, task]
self._entry_finder[task] = entry
heapq.heappush(self._heap, entry)
def remove_task(self, task):
entry = self._entry_finder.pop(task)
entry[-1] = self.REMOVED
def pop_task(self):
while self._heap:
priority, count, task = heapq.heappop(self._heap)
if task is not self.REMOVED:
del self._entry_finder[task]
return task
raise KeyError('pop from an empty priority queue')
This is the pattern used in Dijkstra’s algorithm implementations. The tradeoff: the heap accumulates dead entries, so worst-case size is O(total insertions) not O(live items). For long-running systems, periodic rebuilding with heapq.heapify() keeps memory bounded.
Async Priority Queues
asyncio.PriorityQueue mirrors asyncio.Queue but uses a heap internally. It supports maxsize for backpressure.
import asyncio
async def priority_worker(name, queue):
while True:
priority, task_id, payload = await queue.get()
print(f"{name} processing priority-{priority} task {task_id}")
await asyncio.sleep(0.1) # simulate work
queue.task_done()
async def main():
pq = asyncio.PriorityQueue(maxsize=100)
# Spawn workers
workers = [asyncio.create_task(priority_worker(f"w-{i}", pq))
for i in range(3)]
# Enqueue mixed priorities
for i, (pri, payload) in enumerate([
(2, "low-email"), (0, "critical-alert"),
(1, "standard-job"), (0, "critical-payment"),
]):
await pq.put((pri, i, payload))
await pq.join()
for w in workers:
w.cancel()
Priority Starvation
The danger: low-priority tasks never execute if high-priority tasks keep arriving. Solutions:
- Aging — increment priority of waiting items over time
- Fair-share — reserve a percentage of capacity for each priority level
- Priority bands with round-robin — treat each band as a separate FIFO, alternate between bands with weighted frequency
import time
class AgingPriorityQueue:
def __init__(self, aging_rate=0.1):
self._heap = []
self._counter = itertools.count()
self._aging_rate = aging_rate
def push(self, item, priority):
count = next(self._counter)
entry = (priority, time.monotonic(), count, item)
heapq.heappush(self._heap, entry)
def pop(self):
now = time.monotonic()
# Recalculate effective priorities with aging
aged = []
for pri, timestamp, count, item in self._heap:
age = now - timestamp
effective = pri - (age * self._aging_rate)
aged.append((effective, count, item))
heapq.heapify(aged)
effective_pri, count, item = heapq.heappop(aged)
# Rebuild heap without the popped item
self._heap = [(p, t, c, i) for p, t, c, i in self._heap
if c != count]
heapq.heapify(self._heap)
return item
Multi-Level Priority Queues
Production task systems often use discrete priority levels rather than continuous numbers. Celery, for example, supports priority levels 0-9 with separate queues per level.
from collections import deque
class MultiLevelQueue:
def __init__(self, levels=4, weights=None):
self.queues = [deque() for _ in range(levels)]
self.weights = weights or list(range(levels, 0, -1))
self._cycle_counts = [0] * levels
def put(self, item, priority=0):
priority = min(priority, len(self.queues) - 1)
self.queues[priority].append(item)
def get(self):
for level, (queue, weight) in enumerate(
zip(self.queues, self.weights)
):
if queue and self._cycle_counts[level] < weight:
self._cycle_counts[level] += 1
return queue.popleft()
# Reset cycle and retry
self._cycle_counts = [0] * len(self.queues)
for queue in self.queues:
if queue:
return queue.popleft()
raise IndexError("all queues empty")
Benchmarks
Relative performance for 100,000 operations (insert + extract-min):
| Approach | Time | Thread-Safe |
|---|---|---|
heapq on list | ~0.15s | No |
queue.PriorityQueue | ~0.45s | Yes |
asyncio.PriorityQueue | ~0.20s | No (single-thread async) |
sortedcontainers.SortedList | ~0.35s | No |
heapq wins on raw speed. Use queue.PriorityQueue when threads are involved. For async code, asyncio.PriorityQueue is the right choice.
Real-World Applications
Event Scheduling (Discrete Event Simulation)
SimPy and similar frameworks use priority queues keyed by simulation time. Each event has a scheduled time; the simulator pops the earliest event, advances the clock, and processes it.
Connection Pool Management
Database connection pools can prioritize requests. A transaction-critical query gets a connection before a reporting query. PgBouncer uses a similar concept internally.
A* Pathfinding
The open set in A* is a priority queue keyed on f(n) = g(n) + h(n). Python’s heapq with the mark-as-invalid pattern handles the “decrease key” operation needed when a shorter path to a node is found.
Production Considerations
- Serialization — if priority queue items cross process boundaries (Redis, RabbitMQ), the priority must be part of the serialized message, not just in-memory heap position
- Observability — track queue depth per priority level separately. A growing low-priority backlog is normal; a growing high-priority backlog is an incident
- Memory — unbounded priority queues with the mark-as-invalid pattern can grow indefinitely. Set hard limits and periodically compact
- Fairness testing — simulate burst traffic at each priority level and verify low-priority tasks still complete within SLA
One thing to remember: The mark-as-invalid pattern plus a monotonic tiebreaker counter is the production-grade recipe for priority queues in Python — it handles updates, maintains FIFO within equal priorities, and avoids comparison errors on custom objects.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.