Python Priority Queue Patterns — Deep Dive

Heap Internals

Python’s heapq implements a binary min-heap stored as a flat list. The parent of element at index i is at (i - 1) // 2. Children are at 2*i + 1 and 2*i + 2. This compact representation means no pointer overhead — just array indexing.

The heap invariant: heap[k] <= heap[2*k+1] and heap[k] <= heap[2*k+2] for all valid indices. heappush appends to the end and sifts up; heappop swaps root with the last element, removes it, and sifts down.

import heapq

# Under the hood, heappush does this:
def _sift_up(heap, pos):
    newitem = heap[pos]
    while pos > 0:
        parentpos = (pos - 1) >> 1
        parent = heap[parentpos]
        if newitem < parent:
            heap[pos] = parent
            pos = parentpos
        else:
            break
    heap[pos] = newitem

The CPython implementation is in pure Python (Lib/heapq.py) with a C accelerator (Modules/_heapqmodule.c). The C version is roughly 4-10x faster for push/pop operations.

The Mark-as-Invalid Pattern

Standard heaps don’t support updating priorities. The classic workaround: mark the old entry as invalid and insert a new one.

import heapq
import itertools

class UpdatablePriorityQueue:
    REMOVED = '<removed>'

    def __init__(self):
        self._heap = []
        self._entry_finder = {}
        self._counter = itertools.count()

    def add_task(self, task, priority=0):
        if task in self._entry_finder:
            self.remove_task(task)
        count = next(self._counter)
        entry = [priority, count, task]
        self._entry_finder[task] = entry
        heapq.heappush(self._heap, entry)

    def remove_task(self, task):
        entry = self._entry_finder.pop(task)
        entry[-1] = self.REMOVED

    def pop_task(self):
        while self._heap:
            priority, count, task = heapq.heappop(self._heap)
            if task is not self.REMOVED:
                del self._entry_finder[task]
                return task
        raise KeyError('pop from an empty priority queue')

This is the pattern used in Dijkstra’s algorithm implementations. The tradeoff: the heap accumulates dead entries, so worst-case size is O(total insertions) not O(live items). For long-running systems, periodic rebuilding with heapq.heapify() keeps memory bounded.

Async Priority Queues

asyncio.PriorityQueue mirrors asyncio.Queue but uses a heap internally. It supports maxsize for backpressure.

import asyncio

async def priority_worker(name, queue):
    while True:
        priority, task_id, payload = await queue.get()
        print(f"{name} processing priority-{priority} task {task_id}")
        await asyncio.sleep(0.1)  # simulate work
        queue.task_done()

async def main():
    pq = asyncio.PriorityQueue(maxsize=100)

    # Spawn workers
    workers = [asyncio.create_task(priority_worker(f"w-{i}", pq))
               for i in range(3)]

    # Enqueue mixed priorities
    for i, (pri, payload) in enumerate([
        (2, "low-email"), (0, "critical-alert"),
        (1, "standard-job"), (0, "critical-payment"),
    ]):
        await pq.put((pri, i, payload))

    await pq.join()
    for w in workers:
        w.cancel()

Priority Starvation

The danger: low-priority tasks never execute if high-priority tasks keep arriving. Solutions:

  1. Aging — increment priority of waiting items over time
  2. Fair-share — reserve a percentage of capacity for each priority level
  3. Priority bands with round-robin — treat each band as a separate FIFO, alternate between bands with weighted frequency
import time

class AgingPriorityQueue:
    def __init__(self, aging_rate=0.1):
        self._heap = []
        self._counter = itertools.count()
        self._aging_rate = aging_rate

    def push(self, item, priority):
        count = next(self._counter)
        entry = (priority, time.monotonic(), count, item)
        heapq.heappush(self._heap, entry)

    def pop(self):
        now = time.monotonic()
        # Recalculate effective priorities with aging
        aged = []
        for pri, timestamp, count, item in self._heap:
            age = now - timestamp
            effective = pri - (age * self._aging_rate)
            aged.append((effective, count, item))
        heapq.heapify(aged)
        effective_pri, count, item = heapq.heappop(aged)
        # Rebuild heap without the popped item
        self._heap = [(p, t, c, i) for p, t, c, i in self._heap
                      if c != count]
        heapq.heapify(self._heap)
        return item

Multi-Level Priority Queues

Production task systems often use discrete priority levels rather than continuous numbers. Celery, for example, supports priority levels 0-9 with separate queues per level.

from collections import deque

class MultiLevelQueue:
    def __init__(self, levels=4, weights=None):
        self.queues = [deque() for _ in range(levels)]
        self.weights = weights or list(range(levels, 0, -1))
        self._cycle_counts = [0] * levels

    def put(self, item, priority=0):
        priority = min(priority, len(self.queues) - 1)
        self.queues[priority].append(item)

    def get(self):
        for level, (queue, weight) in enumerate(
            zip(self.queues, self.weights)
        ):
            if queue and self._cycle_counts[level] < weight:
                self._cycle_counts[level] += 1
                return queue.popleft()
        # Reset cycle and retry
        self._cycle_counts = [0] * len(self.queues)
        for queue in self.queues:
            if queue:
                return queue.popleft()
        raise IndexError("all queues empty")

Benchmarks

Relative performance for 100,000 operations (insert + extract-min):

ApproachTimeThread-Safe
heapq on list~0.15sNo
queue.PriorityQueue~0.45sYes
asyncio.PriorityQueue~0.20sNo (single-thread async)
sortedcontainers.SortedList~0.35sNo

heapq wins on raw speed. Use queue.PriorityQueue when threads are involved. For async code, asyncio.PriorityQueue is the right choice.

Real-World Applications

Event Scheduling (Discrete Event Simulation)

SimPy and similar frameworks use priority queues keyed by simulation time. Each event has a scheduled time; the simulator pops the earliest event, advances the clock, and processes it.

Connection Pool Management

Database connection pools can prioritize requests. A transaction-critical query gets a connection before a reporting query. PgBouncer uses a similar concept internally.

A* Pathfinding

The open set in A* is a priority queue keyed on f(n) = g(n) + h(n). Python’s heapq with the mark-as-invalid pattern handles the “decrease key” operation needed when a shorter path to a node is found.

Production Considerations

  1. Serialization — if priority queue items cross process boundaries (Redis, RabbitMQ), the priority must be part of the serialized message, not just in-memory heap position
  2. Observability — track queue depth per priority level separately. A growing low-priority backlog is normal; a growing high-priority backlog is an incident
  3. Memory — unbounded priority queues with the mark-as-invalid pattern can grow indefinitely. Set hard limits and periodically compact
  4. Fairness testing — simulate burst traffic at each priority level and verify low-priority tasks still complete within SLA

One thing to remember: The mark-as-invalid pattern plus a monotonic tiebreaker counter is the production-grade recipe for priority queues in Python — it handles updates, maintains FIFO within equal priorities, and avoids comparison errors on custom objects.

pythondata-structuresconcurrency

See Also