Python Dead Letter Queues — Deep Dive

RabbitMQ Native DLQ Setup

RabbitMQ supports dead-lettering at the queue level through three mechanisms: message rejection (basic.nack or basic.reject without requeue), message TTL expiration, and queue length overflow.

Declaring DLQ Infrastructure with Pika

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Declare the dead letter exchange and queue
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(queue='dead_letters', exchange='dlx',
                   routing_key='failed')

# Declare the main queue with DLQ routing
channel.queue_declare(
    queue='tasks',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'failed',
        'x-message-ttl': 60000,  # optional: 60s TTL
    }
)

When a consumer nacks a message from the tasks queue, RabbitMQ automatically routes it to dead_letters via the dlx exchange. The dead-lettered message retains its original headers plus additional x-death headers containing death reason, original queue, and timestamp.

Reading x-death Headers

def dlq_consumer(ch, method, properties, body):
    headers = properties.headers or {}
    x_death = headers.get('x-death', [])
    if x_death:
        death_info = x_death[0]
        original_queue = death_info.get('queue')
        reason = death_info.get('reason')
        count = death_info.get('count', 1)
        print(f"Dead letter from {original_queue}, "
              f"reason={reason}, death_count={count}")
    # Process or store for investigation
    ch.basic_ack(delivery_tag=method.delivery_tag)

Celery DLQ Patterns

Using Error Callbacks

Celery’s on_failure handler and link_error callbacks let you route failed tasks to a DLQ after retries are exhausted.

from celery import Celery
import json
import redis
import traceback

app = Celery('tasks', broker='redis://localhost')
dlq_client = redis.Redis()

DLQ_KEY = 'celery:dead_letters'

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_order(self, order_id, payload):
    try:
        # Business logic
        result = handle_order(order_id, payload)
        return result
    except TransientError as exc:
        raise self.retry(exc=exc)
    except Exception as exc:
        # Max retries exceeded or non-retryable error
        if self.request.retries >= self.max_retries:
            dead_letter = {
                'task_id': self.request.id,
                'task_name': self.name,
                'args': [order_id, payload],
                'exception': str(exc),
                'traceback': traceback.format_exc(),
                'retries': self.request.retries,
                'dead_lettered_at': datetime.utcnow().isoformat(),
                'origin_queue': self.request.delivery_info.get(
                    'routing_key', 'unknown'
                ),
            }
            dlq_client.lpush(DLQ_KEY, json.dumps(dead_letter))
            return  # Don't re-raise; task is "handled"
        raise self.retry(exc=exc)

Celery Task Base Class for DLQ

For consistent DLQ handling across all tasks:

class DLQTask(app.Task):
    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        dead_letter = {
            'task_id': task_id,
            'task_name': self.name,
            'args': args,
            'kwargs': kwargs,
            'exception': str(exc),
            'traceback': str(einfo),
            'timestamp': datetime.utcnow().isoformat(),
        }
        dlq_client.lpush(DLQ_KEY, json.dumps(dead_letter))
        super().on_failure(exc, task_id, args, kwargs, einfo)

@app.task(base=DLQTask, max_retries=3)
def fragile_task(data):
    # If this fails after 3 retries, on_failure routes to DLQ
    return process(data)

Custom Redis DLQ Implementation

For systems not using Celery, here’s a complete Redis-backed DLQ:

import redis
import json
import time
import uuid
from dataclasses import dataclass, asdict
from typing import Optional, Callable

@dataclass
class DeadLetter:
    message_id: str
    payload: dict
    error: str
    traceback: str
    attempts: int
    source_queue: str
    created_at: float
    dead_lettered_at: float

class RedisQueueWithDLQ:
    def __init__(self, redis_client, queue_name,
                 max_retries=3, dlq_ttl_days=30):
        self.r = redis_client
        self.queue = queue_name
        self.dlq = f"{queue_name}:dlq"
        self.processing = f"{queue_name}:processing"
        self.max_retries = max_retries
        self.dlq_ttl = dlq_ttl_days * 86400

    def enqueue(self, payload: dict) -> str:
        message_id = str(uuid.uuid4())
        message = {
            'id': message_id,
            'payload': payload,
            'attempts': 0,
            'created_at': time.time(),
        }
        self.r.lpush(self.queue, json.dumps(message))
        return message_id

    def dequeue(self, timeout=5) -> Optional[dict]:
        result = self.r.brpoplpush(
            self.queue, self.processing, timeout
        )
        if result:
            return json.loads(result)
        return None

    def ack(self, message: dict):
        self.r.lrem(self.processing, 1, json.dumps(message))

    def nack(self, message: dict, error: str,
             tb: str = ""):
        self.r.lrem(self.processing, 1, json.dumps(message))
        message['attempts'] += 1

        if message['attempts'] >= self.max_retries:
            dead_letter = {
                **message,
                'error': error,
                'traceback': tb,
                'dead_lettered_at': time.time(),
                'source_queue': self.queue,
            }
            self.r.lpush(self.dlq, json.dumps(dead_letter))
            # Set TTL on DLQ if it doesn't have one
            if self.r.ttl(self.dlq) == -1:
                self.r.expire(self.dlq, self.dlq_ttl)
        else:
            # Re-enqueue for retry
            self.r.lpush(self.queue, json.dumps(message))

    def dlq_size(self) -> int:
        return self.r.llen(self.dlq)

    def dlq_peek(self, count=10) -> list:
        items = self.r.lrange(self.dlq, 0, count - 1)
        return [json.loads(i) for i in items]

    def dlq_replay(self, count=1) -> int:
        replayed = 0
        for _ in range(count):
            item = self.r.rpop(self.dlq)
            if not item:
                break
            message = json.loads(item)
            message['attempts'] = 0  # reset retry counter
            self.r.lpush(self.queue, json.dumps(message))
            replayed += 1
        return replayed

Automated DLQ Processing

Triage Automation

Not all dead letters need human intervention. Categorize failures and auto-handle known types:

class DLQProcessor:
    def __init__(self, queue_with_dlq):
        self.q = queue_with_dlq
        self.handlers = {}

    def register_handler(self, error_pattern: str,
                         handler: Callable):
        self.handlers[error_pattern] = handler

    def process_dlq(self, batch_size=50):
        items = self.q.dlq_peek(batch_size)
        stats = {'replayed': 0, 'discarded': 0,
                 'escalated': 0}

        for item in items:
            error = item.get('error', '')
            handled = False
            for pattern, handler in self.handlers.items():
                if pattern in error:
                    action = handler(item)
                    if action == 'replay':
                        stats['replayed'] += 1
                    elif action == 'discard':
                        stats['discarded'] += 1
                    handled = True
                    break
            if not handled:
                stats['escalated'] += 1
                # Send to alerting system
        return stats

# Usage
processor = DLQProcessor(queue)
processor.register_handler(
    'ConnectionTimeout',
    lambda item: 'replay'  # transient, safe to retry
)
processor.register_handler(
    'ValidationError',
    lambda item: 'discard'  # data is bad, won't fix itself
)

Monitoring and Alerting

Essential DLQ metrics to track:

MetricAlert ThresholdWhy
DLQ depth> 0 for > 5 minutesAny dead letters need attention
DLQ growth rate> 10/minuteIndicates systemic failure
DLQ age (oldest item)> 24 hoursItems are being ignored
Replay success rate< 80%Root cause not fixed
DLQ as % of total throughput> 1%Unusual failure rate
# Prometheus metrics example
from prometheus_client import Gauge, Counter

dlq_depth = Gauge('dlq_depth', 'Dead letter queue depth',
                  ['queue'])
dlq_messages_total = Counter('dlq_messages_total',
                             'Total dead lettered messages',
                             ['queue', 'error_type'])

def monitor_dlq(queue_name, redis_client):
    depth = redis_client.llen(f"{queue_name}:dlq")
    dlq_depth.labels(queue=queue_name).set(depth)

DLQ Design Decisions

Separate DLQ per Source Queue vs Shared DLQ

Separate (recommended): Each queue gets its own DLQ (orders:dlq, emails:dlq). Easier to reason about, alert on, and replay. Debugging is faster because you know exactly where the message originated.

Shared: One DLQ for everything. Simpler infrastructure but harder to triage. Only viable for very small systems.

DLQ Retention

Dead letters should not live forever. Set a retention policy:

  • 7 days for high-volume, low-value messages
  • 30 days for transactional data
  • 90 days for financial or compliance-related messages

Archive to cold storage (S3, database) before expiry if audit trail is needed.

Idempotent Replay

When replaying DLQ messages, the consumer must be idempotent. A message that failed after partially completing work will cause duplicate side effects on replay unless the handler checks for prior completion.

One thing to remember: A DLQ without monitoring is just a slow memory leak. The value isn’t in storing dead letters — it’s in the alert that fires when the first one arrives, the dashboard that shows the trend, and the automation that triages known failure patterns.

pythonmessagingreliability

See Also