Python Dead Letter Queues — Deep Dive
RabbitMQ Native DLQ Setup
RabbitMQ supports dead-lettering at the queue level through three mechanisms: message rejection (basic.nack or basic.reject without requeue), message TTL expiration, and queue length overflow.
Declaring DLQ Infrastructure with Pika
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare the dead letter exchange and queue
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(queue='dead_letters', exchange='dlx',
routing_key='failed')
# Declare the main queue with DLQ routing
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'failed',
'x-message-ttl': 60000, # optional: 60s TTL
}
)
When a consumer nacks a message from the tasks queue, RabbitMQ automatically routes it to dead_letters via the dlx exchange. The dead-lettered message retains its original headers plus additional x-death headers containing death reason, original queue, and timestamp.
Reading x-death Headers
def dlq_consumer(ch, method, properties, body):
headers = properties.headers or {}
x_death = headers.get('x-death', [])
if x_death:
death_info = x_death[0]
original_queue = death_info.get('queue')
reason = death_info.get('reason')
count = death_info.get('count', 1)
print(f"Dead letter from {original_queue}, "
f"reason={reason}, death_count={count}")
# Process or store for investigation
ch.basic_ack(delivery_tag=method.delivery_tag)
Celery DLQ Patterns
Using Error Callbacks
Celery’s on_failure handler and link_error callbacks let you route failed tasks to a DLQ after retries are exhausted.
from celery import Celery
import json
import redis
import traceback
app = Celery('tasks', broker='redis://localhost')
dlq_client = redis.Redis()
DLQ_KEY = 'celery:dead_letters'
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_order(self, order_id, payload):
try:
# Business logic
result = handle_order(order_id, payload)
return result
except TransientError as exc:
raise self.retry(exc=exc)
except Exception as exc:
# Max retries exceeded or non-retryable error
if self.request.retries >= self.max_retries:
dead_letter = {
'task_id': self.request.id,
'task_name': self.name,
'args': [order_id, payload],
'exception': str(exc),
'traceback': traceback.format_exc(),
'retries': self.request.retries,
'dead_lettered_at': datetime.utcnow().isoformat(),
'origin_queue': self.request.delivery_info.get(
'routing_key', 'unknown'
),
}
dlq_client.lpush(DLQ_KEY, json.dumps(dead_letter))
return # Don't re-raise; task is "handled"
raise self.retry(exc=exc)
Celery Task Base Class for DLQ
For consistent DLQ handling across all tasks:
class DLQTask(app.Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
dead_letter = {
'task_id': task_id,
'task_name': self.name,
'args': args,
'kwargs': kwargs,
'exception': str(exc),
'traceback': str(einfo),
'timestamp': datetime.utcnow().isoformat(),
}
dlq_client.lpush(DLQ_KEY, json.dumps(dead_letter))
super().on_failure(exc, task_id, args, kwargs, einfo)
@app.task(base=DLQTask, max_retries=3)
def fragile_task(data):
# If this fails after 3 retries, on_failure routes to DLQ
return process(data)
Custom Redis DLQ Implementation
For systems not using Celery, here’s a complete Redis-backed DLQ:
import redis
import json
import time
import uuid
from dataclasses import dataclass, asdict
from typing import Optional, Callable
@dataclass
class DeadLetter:
message_id: str
payload: dict
error: str
traceback: str
attempts: int
source_queue: str
created_at: float
dead_lettered_at: float
class RedisQueueWithDLQ:
def __init__(self, redis_client, queue_name,
max_retries=3, dlq_ttl_days=30):
self.r = redis_client
self.queue = queue_name
self.dlq = f"{queue_name}:dlq"
self.processing = f"{queue_name}:processing"
self.max_retries = max_retries
self.dlq_ttl = dlq_ttl_days * 86400
def enqueue(self, payload: dict) -> str:
message_id = str(uuid.uuid4())
message = {
'id': message_id,
'payload': payload,
'attempts': 0,
'created_at': time.time(),
}
self.r.lpush(self.queue, json.dumps(message))
return message_id
def dequeue(self, timeout=5) -> Optional[dict]:
result = self.r.brpoplpush(
self.queue, self.processing, timeout
)
if result:
return json.loads(result)
return None
def ack(self, message: dict):
self.r.lrem(self.processing, 1, json.dumps(message))
def nack(self, message: dict, error: str,
tb: str = ""):
self.r.lrem(self.processing, 1, json.dumps(message))
message['attempts'] += 1
if message['attempts'] >= self.max_retries:
dead_letter = {
**message,
'error': error,
'traceback': tb,
'dead_lettered_at': time.time(),
'source_queue': self.queue,
}
self.r.lpush(self.dlq, json.dumps(dead_letter))
# Set TTL on DLQ if it doesn't have one
if self.r.ttl(self.dlq) == -1:
self.r.expire(self.dlq, self.dlq_ttl)
else:
# Re-enqueue for retry
self.r.lpush(self.queue, json.dumps(message))
def dlq_size(self) -> int:
return self.r.llen(self.dlq)
def dlq_peek(self, count=10) -> list:
items = self.r.lrange(self.dlq, 0, count - 1)
return [json.loads(i) for i in items]
def dlq_replay(self, count=1) -> int:
replayed = 0
for _ in range(count):
item = self.r.rpop(self.dlq)
if not item:
break
message = json.loads(item)
message['attempts'] = 0 # reset retry counter
self.r.lpush(self.queue, json.dumps(message))
replayed += 1
return replayed
Automated DLQ Processing
Triage Automation
Not all dead letters need human intervention. Categorize failures and auto-handle known types:
class DLQProcessor:
def __init__(self, queue_with_dlq):
self.q = queue_with_dlq
self.handlers = {}
def register_handler(self, error_pattern: str,
handler: Callable):
self.handlers[error_pattern] = handler
def process_dlq(self, batch_size=50):
items = self.q.dlq_peek(batch_size)
stats = {'replayed': 0, 'discarded': 0,
'escalated': 0}
for item in items:
error = item.get('error', '')
handled = False
for pattern, handler in self.handlers.items():
if pattern in error:
action = handler(item)
if action == 'replay':
stats['replayed'] += 1
elif action == 'discard':
stats['discarded'] += 1
handled = True
break
if not handled:
stats['escalated'] += 1
# Send to alerting system
return stats
# Usage
processor = DLQProcessor(queue)
processor.register_handler(
'ConnectionTimeout',
lambda item: 'replay' # transient, safe to retry
)
processor.register_handler(
'ValidationError',
lambda item: 'discard' # data is bad, won't fix itself
)
Monitoring and Alerting
Essential DLQ metrics to track:
| Metric | Alert Threshold | Why |
|---|---|---|
| DLQ depth | > 0 for > 5 minutes | Any dead letters need attention |
| DLQ growth rate | > 10/minute | Indicates systemic failure |
| DLQ age (oldest item) | > 24 hours | Items are being ignored |
| Replay success rate | < 80% | Root cause not fixed |
| DLQ as % of total throughput | > 1% | Unusual failure rate |
# Prometheus metrics example
from prometheus_client import Gauge, Counter
dlq_depth = Gauge('dlq_depth', 'Dead letter queue depth',
['queue'])
dlq_messages_total = Counter('dlq_messages_total',
'Total dead lettered messages',
['queue', 'error_type'])
def monitor_dlq(queue_name, redis_client):
depth = redis_client.llen(f"{queue_name}:dlq")
dlq_depth.labels(queue=queue_name).set(depth)
DLQ Design Decisions
Separate DLQ per Source Queue vs Shared DLQ
Separate (recommended): Each queue gets its own DLQ (orders:dlq, emails:dlq). Easier to reason about, alert on, and replay. Debugging is faster because you know exactly where the message originated.
Shared: One DLQ for everything. Simpler infrastructure but harder to triage. Only viable for very small systems.
DLQ Retention
Dead letters should not live forever. Set a retention policy:
- 7 days for high-volume, low-value messages
- 30 days for transactional data
- 90 days for financial or compliance-related messages
Archive to cold storage (S3, database) before expiry if audit trail is needed.
Idempotent Replay
When replaying DLQ messages, the consumer must be idempotent. A message that failed after partially completing work will cause duplicate side effects on replay unless the handler checks for prior completion.
One thing to remember: A DLQ without monitoring is just a slow memory leak. The value isn’t in storing dead letters — it’s in the alert that fires when the first one arrives, the dashboard that shows the trend, and the automation that triages known failure patterns.
See Also
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.
- Python Priority Queue Patterns Why some tasks cut the line in Python — and how priority queues decide who goes first.