Python Work Queue Patterns — Deep Dive

Reliable Queue with Redis

The basic BRPOP pattern loses messages if a worker crashes. Production systems need visibility timeouts — a mechanism to reclaim items from crashed workers.

import redis
import json
import time
import uuid

class ReliableWorkQueue:
    def __init__(self, redis_client, name,
                 visibility_timeout=300):
        self.r = redis_client
        self.name = name
        self.processing_key = f"{name}:processing"
        self.visibility_timeout = visibility_timeout

    def enqueue(self, payload):
        message = {
            'id': str(uuid.uuid4()),
            'payload': payload,
            'enqueued_at': time.time(),
        }
        self.r.lpush(self.name, json.dumps(message))
        return message['id']

    def dequeue(self, timeout=5):
        # Atomic: pop from queue, push to processing set
        raw = self.r.brpoplpush(
            self.name, self.processing_key, timeout
        )
        if raw is None:
            return None
        message = json.loads(raw)
        # Store processing start time
        self.r.hset(
            f"{self.name}:timestamps",
            message['id'],
            str(time.time())
        )
        return message

    def ack(self, message):
        raw = json.dumps(message)
        self.r.lrem(self.processing_key, 1, raw)
        self.r.hdel(f"{self.name}:timestamps",
                    message['id'])

    def nack(self, message):
        raw = json.dumps(message)
        self.r.lrem(self.processing_key, 1, raw)
        self.r.hdel(f"{self.name}:timestamps",
                    message['id'])
        # Re-enqueue at the front
        self.r.rpush(self.name, raw)

    def reclaim_stale(self):
        """Move timed-out items back to the queue."""
        now = time.time()
        timestamps = self.r.hgetall(
            f"{self.name}:timestamps"
        )
        reclaimed = 0
        processing_items = self.r.lrange(
            self.processing_key, 0, -1
        )
        for raw in processing_items:
            message = json.loads(raw)
            ts = timestamps.get(
                message['id'].encode(), b'0'
            )
            if now - float(ts) > self.visibility_timeout:
                self.r.lrem(self.processing_key, 1, raw)
                self.r.rpush(self.name, raw)
                self.r.hdel(f"{self.name}:timestamps",
                           message['id'])
                reclaimed += 1
        return reclaimed

Run reclaim_stale() periodically (via a scheduled task or heartbeat) to recover items from crashed workers.

Redis Streams as Work Queues

Redis Streams (5.0+) provide consumer groups — a built-in reliable work queue pattern:

import redis

class StreamWorkQueue:
    def __init__(self, redis_client, stream,
                 group, consumer_name):
        self.r = redis_client
        self.stream = stream
        self.group = group
        self.consumer = consumer_name
        self._ensure_group()

    def _ensure_group(self):
        try:
            self.r.xgroup_create(
                self.stream, self.group, id='0',
                mkstream=True
            )
        except redis.ResponseError:
            pass  # Group already exists

    def enqueue(self, payload):
        return self.r.xadd(self.stream, payload)

    def dequeue(self, count=1, block_ms=5000):
        results = self.r.xreadgroup(
            self.group, self.consumer,
            {self.stream: '>'},
            count=count, block=block_ms
        )
        if not results:
            return []
        return [(msg_id, fields)
                for _, messages in results
                for msg_id, fields in messages]

    def ack(self, message_id):
        self.r.xack(self.stream, self.group, message_id)

    def pending(self):
        """List unacknowledged messages."""
        return self.r.xpending(self.stream, self.group)

    def claim_stale(self, min_idle_ms=300000, count=10):
        """Claim messages idle for too long."""
        return self.r.xautoclaim(
            self.stream, self.group, self.consumer,
            min_idle_time=min_idle_ms, start_id='0-0',
            count=count
        )

Redis Streams advantages over list-based queues:

  • Built-in consumer groups with acknowledgment
  • Message persistence with configurable trimming
  • XAUTOCLAIM for automatic stale message recovery
  • Message ordering guarantees
  • Per-consumer pending entry tracking

Worker Pool Patterns

Dynamic Worker Scaling

Scale workers based on queue depth:

import asyncio
import os

class AutoscalingWorkerPool:
    def __init__(self, queue, handler,
                 min_workers=2, max_workers=20,
                 scale_threshold=50):
        self.queue = queue
        self.handler = handler
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.scale_threshold = scale_threshold
        self.workers = []

    async def worker(self, worker_id):
        while True:
            try:
                item = await asyncio.wait_for(
                    self.queue.get(), timeout=30
                )
                await self.handler(item)
                self.queue.task_done()
            except asyncio.TimeoutError:
                if len(self.workers) > self.min_workers:
                    return  # Scale down: worker exits
            except Exception as e:
                # Log error, don't crash the worker
                print(f"Worker {worker_id} error: {e}")

    async def scaler(self):
        while True:
            await asyncio.sleep(5)
            depth = self.queue.qsize()
            active = len([w for w in self.workers
                         if not w.done()])

            if depth > self.scale_threshold and \
               active < self.max_workers:
                new_id = len(self.workers)
                task = asyncio.create_task(
                    self.worker(new_id)
                )
                self.workers.append(task)

    async def start(self):
        # Start minimum workers
        for i in range(self.min_workers):
            task = asyncio.create_task(self.worker(i))
            self.workers.append(task)
        # Start the scaler
        asyncio.create_task(self.scaler())

Worker Health Checks

Workers should report their health to detect hung or deadlocked workers:

class HealthCheckedWorker:
    def __init__(self, worker_id, redis_client):
        self.worker_id = worker_id
        self.r = redis_client
        self.heartbeat_key = f"worker:{worker_id}:heartbeat"

    async def run(self, queue, handler):
        while True:
            # Update heartbeat before dequeue
            self.r.setex(self.heartbeat_key, 60, 'alive')
            item = await queue.get()

            self.r.setex(self.heartbeat_key, 60,
                        f'processing:{item.get("id")}')
            try:
                await handler(item)
                queue.task_done()
            except Exception:
                queue.task_done()
                raise

    @classmethod
    def check_health(cls, redis_client, worker_ids):
        stale = []
        for wid in worker_ids:
            val = redis_client.get(f"worker:{wid}:heartbeat")
            if val is None:
                stale.append(wid)
        return stale

Queue-Based Load Leveling

Work queues naturally level load between bursty producers and steady consumers. This is critical for protecting downstream services:

Traffic Spike          Work Queue           Database
  1000 req/s  ──►  [buffered tasks]  ──►  50 writes/s
                    (absorbs spike)        (constant rate)

Without the queue, the database sees 1000 writes/s during spikes. With the queue, the database always sees a steady 50 writes/s (or however many workers × throughput per worker).

Implementing Rate-Limited Consumption

import asyncio
import time

class RateLimitedConsumer:
    def __init__(self, max_per_second=50):
        self.interval = 1.0 / max_per_second
        self.last_call = 0

    async def consume(self, queue, handler):
        while True:
            item = await queue.get()
            # Rate limit
            now = time.monotonic()
            wait = self.interval - (now - self.last_call)
            if wait > 0:
                await asyncio.sleep(wait)
            self.last_call = time.monotonic()

            await handler(item)
            queue.task_done()

Multi-Queue Routing

Complex systems often have multiple queues with different characteristics:

class MultiQueueRouter:
    def __init__(self):
        self.queues = {}
        self.routing_rules = []

    def add_queue(self, name, queue, worker_count):
        self.queues[name] = {
            'queue': queue,
            'workers': worker_count,
        }

    def add_rule(self, predicate, queue_name):
        self.routing_rules.append((predicate, queue_name))

    def route(self, message):
        for predicate, queue_name in self.routing_rules:
            if predicate(message):
                self.queues[queue_name]['queue'].put_nowait(
                    message
                )
                return queue_name
        raise ValueError("No matching queue for message")

# Usage
router = MultiQueueRouter()
router.add_queue('critical', asyncio.Queue(100), 10)
router.add_queue('bulk', asyncio.Queue(1000), 3)
router.add_queue('analytics', asyncio.Queue(5000), 1)

router.add_rule(
    lambda m: m.get('priority') == 'critical', 'critical'
)
router.add_rule(
    lambda m: m.get('type') == 'analytics', 'analytics'
)
router.add_rule(lambda m: True, 'bulk')  # default

Monitoring Essentials

MetricWhat It Tells You
Queue depthBacklog size; growing = consumers can’t keep up
Enqueue rateProducer throughput
Dequeue rateConsumer throughput
Processing time (p50, p99)How long each item takes
Error ratePercentage of items failing
Worker count (active/idle)Scaling efficiency
Time in queue (p50, p99)Latency from enqueue to processing start

The most important alert: queue depth growing over a sustained period. This means your system is falling behind and will eventually run out of memory or breach SLAs.

One thing to remember: The production-grade work queue has three layers: a reliable broker (Redis Streams or RabbitMQ) for persistence and acknowledgment, a worker pool with health checks and autoscaling, and monitoring on queue depth and processing latency. Skip any layer and you’ll feel the pain at 3 AM.

pythonconcurrencypatterns

See Also