Python Work Queue Patterns — Deep Dive
Reliable Queue with Redis
The basic BRPOP pattern loses messages if a worker crashes. Production systems need visibility timeouts — a mechanism to reclaim items from crashed workers.
import redis
import json
import time
import uuid
class ReliableWorkQueue:
def __init__(self, redis_client, name,
visibility_timeout=300):
self.r = redis_client
self.name = name
self.processing_key = f"{name}:processing"
self.visibility_timeout = visibility_timeout
def enqueue(self, payload):
message = {
'id': str(uuid.uuid4()),
'payload': payload,
'enqueued_at': time.time(),
}
self.r.lpush(self.name, json.dumps(message))
return message['id']
def dequeue(self, timeout=5):
# Atomic: pop from queue, push to processing set
raw = self.r.brpoplpush(
self.name, self.processing_key, timeout
)
if raw is None:
return None
message = json.loads(raw)
# Store processing start time
self.r.hset(
f"{self.name}:timestamps",
message['id'],
str(time.time())
)
return message
def ack(self, message):
raw = json.dumps(message)
self.r.lrem(self.processing_key, 1, raw)
self.r.hdel(f"{self.name}:timestamps",
message['id'])
def nack(self, message):
raw = json.dumps(message)
self.r.lrem(self.processing_key, 1, raw)
self.r.hdel(f"{self.name}:timestamps",
message['id'])
# Re-enqueue at the front
self.r.rpush(self.name, raw)
def reclaim_stale(self):
"""Move timed-out items back to the queue."""
now = time.time()
timestamps = self.r.hgetall(
f"{self.name}:timestamps"
)
reclaimed = 0
processing_items = self.r.lrange(
self.processing_key, 0, -1
)
for raw in processing_items:
message = json.loads(raw)
ts = timestamps.get(
message['id'].encode(), b'0'
)
if now - float(ts) > self.visibility_timeout:
self.r.lrem(self.processing_key, 1, raw)
self.r.rpush(self.name, raw)
self.r.hdel(f"{self.name}:timestamps",
message['id'])
reclaimed += 1
return reclaimed
Run reclaim_stale() periodically (via a scheduled task or heartbeat) to recover items from crashed workers.
Redis Streams as Work Queues
Redis Streams (5.0+) provide consumer groups — a built-in reliable work queue pattern:
import redis
class StreamWorkQueue:
def __init__(self, redis_client, stream,
group, consumer_name):
self.r = redis_client
self.stream = stream
self.group = group
self.consumer = consumer_name
self._ensure_group()
def _ensure_group(self):
try:
self.r.xgroup_create(
self.stream, self.group, id='0',
mkstream=True
)
except redis.ResponseError:
pass # Group already exists
def enqueue(self, payload):
return self.r.xadd(self.stream, payload)
def dequeue(self, count=1, block_ms=5000):
results = self.r.xreadgroup(
self.group, self.consumer,
{self.stream: '>'},
count=count, block=block_ms
)
if not results:
return []
return [(msg_id, fields)
for _, messages in results
for msg_id, fields in messages]
def ack(self, message_id):
self.r.xack(self.stream, self.group, message_id)
def pending(self):
"""List unacknowledged messages."""
return self.r.xpending(self.stream, self.group)
def claim_stale(self, min_idle_ms=300000, count=10):
"""Claim messages idle for too long."""
return self.r.xautoclaim(
self.stream, self.group, self.consumer,
min_idle_time=min_idle_ms, start_id='0-0',
count=count
)
Redis Streams advantages over list-based queues:
- Built-in consumer groups with acknowledgment
- Message persistence with configurable trimming
XAUTOCLAIMfor automatic stale message recovery- Message ordering guarantees
- Per-consumer pending entry tracking
Worker Pool Patterns
Dynamic Worker Scaling
Scale workers based on queue depth:
import asyncio
import os
class AutoscalingWorkerPool:
def __init__(self, queue, handler,
min_workers=2, max_workers=20,
scale_threshold=50):
self.queue = queue
self.handler = handler
self.min_workers = min_workers
self.max_workers = max_workers
self.scale_threshold = scale_threshold
self.workers = []
async def worker(self, worker_id):
while True:
try:
item = await asyncio.wait_for(
self.queue.get(), timeout=30
)
await self.handler(item)
self.queue.task_done()
except asyncio.TimeoutError:
if len(self.workers) > self.min_workers:
return # Scale down: worker exits
except Exception as e:
# Log error, don't crash the worker
print(f"Worker {worker_id} error: {e}")
async def scaler(self):
while True:
await asyncio.sleep(5)
depth = self.queue.qsize()
active = len([w for w in self.workers
if not w.done()])
if depth > self.scale_threshold and \
active < self.max_workers:
new_id = len(self.workers)
task = asyncio.create_task(
self.worker(new_id)
)
self.workers.append(task)
async def start(self):
# Start minimum workers
for i in range(self.min_workers):
task = asyncio.create_task(self.worker(i))
self.workers.append(task)
# Start the scaler
asyncio.create_task(self.scaler())
Worker Health Checks
Workers should report their health to detect hung or deadlocked workers:
class HealthCheckedWorker:
def __init__(self, worker_id, redis_client):
self.worker_id = worker_id
self.r = redis_client
self.heartbeat_key = f"worker:{worker_id}:heartbeat"
async def run(self, queue, handler):
while True:
# Update heartbeat before dequeue
self.r.setex(self.heartbeat_key, 60, 'alive')
item = await queue.get()
self.r.setex(self.heartbeat_key, 60,
f'processing:{item.get("id")}')
try:
await handler(item)
queue.task_done()
except Exception:
queue.task_done()
raise
@classmethod
def check_health(cls, redis_client, worker_ids):
stale = []
for wid in worker_ids:
val = redis_client.get(f"worker:{wid}:heartbeat")
if val is None:
stale.append(wid)
return stale
Queue-Based Load Leveling
Work queues naturally level load between bursty producers and steady consumers. This is critical for protecting downstream services:
Traffic Spike Work Queue Database
1000 req/s ──► [buffered tasks] ──► 50 writes/s
(absorbs spike) (constant rate)
Without the queue, the database sees 1000 writes/s during spikes. With the queue, the database always sees a steady 50 writes/s (or however many workers × throughput per worker).
Implementing Rate-Limited Consumption
import asyncio
import time
class RateLimitedConsumer:
def __init__(self, max_per_second=50):
self.interval = 1.0 / max_per_second
self.last_call = 0
async def consume(self, queue, handler):
while True:
item = await queue.get()
# Rate limit
now = time.monotonic()
wait = self.interval - (now - self.last_call)
if wait > 0:
await asyncio.sleep(wait)
self.last_call = time.monotonic()
await handler(item)
queue.task_done()
Multi-Queue Routing
Complex systems often have multiple queues with different characteristics:
class MultiQueueRouter:
def __init__(self):
self.queues = {}
self.routing_rules = []
def add_queue(self, name, queue, worker_count):
self.queues[name] = {
'queue': queue,
'workers': worker_count,
}
def add_rule(self, predicate, queue_name):
self.routing_rules.append((predicate, queue_name))
def route(self, message):
for predicate, queue_name in self.routing_rules:
if predicate(message):
self.queues[queue_name]['queue'].put_nowait(
message
)
return queue_name
raise ValueError("No matching queue for message")
# Usage
router = MultiQueueRouter()
router.add_queue('critical', asyncio.Queue(100), 10)
router.add_queue('bulk', asyncio.Queue(1000), 3)
router.add_queue('analytics', asyncio.Queue(5000), 1)
router.add_rule(
lambda m: m.get('priority') == 'critical', 'critical'
)
router.add_rule(
lambda m: m.get('type') == 'analytics', 'analytics'
)
router.add_rule(lambda m: True, 'bulk') # default
Monitoring Essentials
| Metric | What It Tells You |
|---|---|
| Queue depth | Backlog size; growing = consumers can’t keep up |
| Enqueue rate | Producer throughput |
| Dequeue rate | Consumer throughput |
| Processing time (p50, p99) | How long each item takes |
| Error rate | Percentage of items failing |
| Worker count (active/idle) | Scaling efficiency |
| Time in queue (p50, p99) | Latency from enqueue to processing start |
The most important alert: queue depth growing over a sustained period. This means your system is falling behind and will eventually run out of memory or breach SLAs.
One thing to remember: The production-grade work queue has three layers: a reliable broker (Redis Streams or RabbitMQ) for persistence and acknowledgment, a worker pool with health checks and autoscaling, and monitoring on queue depth and processing latency. Skip any layer and you’ll feel the pain at 3 AM.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.