Python Poison Pill Handling — Deep Dive

A Generic Poison Pill Handler

import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Optional

logger = logging.getLogger(__name__)

class ErrorClassification(Enum):
    TRANSIENT = "transient"      # Retry with backoff
    PERMANENT = "permanent"      # Send to DLQ immediately
    UNKNOWN = "unknown"          # Retry up to limit, then DLQ

@dataclass
class MessageMetadata:
    message_id: str
    attempt: int = 1
    max_attempts: int = 5
    first_failure_time: Optional[float] = None
    last_error: Optional[str] = None

@dataclass
class PoisonPillHandler:
    """Routes messages based on failure classification."""
    max_attempts: int = 5
    error_classifier: Optional[Callable[[Exception], ErrorClassification]] = None
    on_dlq: Optional[Callable[[Any, MessageMetadata, Exception], None]] = None
    on_retry: Optional[Callable[[Any, MessageMetadata, float], None]] = None

    def classify_error(self, exc: Exception) -> ErrorClassification:
        if self.error_classifier:
            return self.error_classifier(exc)
        return self._default_classifier(exc)

    def _default_classifier(self, exc: Exception) -> ErrorClassification:
        # Common permanent failures
        permanent_types = (
            ValueError, TypeError, KeyError,
            UnicodeDecodeError, json.JSONDecodeError,
        )
        if isinstance(exc, permanent_types):
            return ErrorClassification.PERMANENT

        # Common transient failures
        transient_types = (
            ConnectionError, TimeoutError, OSError,
        )
        if isinstance(exc, transient_types):
            return ErrorClassification.TRANSIENT

        return ErrorClassification.UNKNOWN

    def should_retry(
        self,
        metadata: MessageMetadata,
        exc: Exception,
    ) -> bool:
        classification = self.classify_error(exc)

        if classification == ErrorClassification.PERMANENT:
            logger.warning(
                "Permanent failure for message %s — routing to DLQ: %s",
                metadata.message_id, exc,
            )
            return False

        if classification == ErrorClassification.TRANSIENT:
            if metadata.attempt >= metadata.max_attempts:
                logger.error(
                    "Transient failure exhausted retries for %s "
                    "after %d attempts — routing to DLQ",
                    metadata.message_id, metadata.attempt,
                )
                return False
            return True

        # UNKNOWN: retry up to limit
        return metadata.attempt < metadata.max_attempts

import json

Celery Integration

Celery has built-in retry support, but poison pill handling needs explicit configuration:

from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError, Reject
import json

app = Celery("tasks", broker="redis://localhost:6379")

class PoisonPillAwareTask(Task):
    """Base task class with poison pill detection."""
    max_retries = 4
    default_retry_delay = 5
    autoretry_for = (ConnectionError, TimeoutError)
    retry_backoff = True
    retry_jitter = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Called when all retries are exhausted."""
        logger.error(
            "Task %s permanently failed (message to DLQ): %s",
            task_id, exc,
        )
        # Store in a dead letter collection
        store_in_dlq(
            task_name=self.name,
            task_id=task_id,
            args=args,
            kwargs=kwargs,
            error=str(exc),
            traceback=str(einfo),
        )

@app.task(
    base=PoisonPillAwareTask,
    bind=True,
    acks_late=True,  # Don't ack until processing succeeds
)
def process_order(self, order_data: dict):
    try:
        validated = validate_order(order_data)
    except (ValueError, KeyError) as exc:
        # Permanent error — don't retry, go straight to DLQ
        logger.error("Invalid order data: %s", exc)
        raise Reject(exc, requeue=False)

    try:
        result = submit_to_payment_service(validated)
        return result
    except ConnectionError as exc:
        # Transient — retry with backoff
        raise self.retry(exc=exc)


def store_in_dlq(task_name, task_id, args, kwargs, error, traceback):
    """Store failed message metadata for later investigation."""
    import redis
    r = redis.Redis()
    dlq_entry = json.dumps({
        "task_name": task_name,
        "task_id": task_id,
        "args": args,
        "kwargs": kwargs,
        "error": error,
        "traceback": traceback,
        "failed_at": time.time(),
    })
    r.lpush("dlq:orders", dlq_entry)
    r.incr("dlq:orders:count")

RabbitMQ Dead Letter Exchange

RabbitMQ supports dead letter exchanges natively:

import pika

def setup_queues_with_dlq():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters("localhost")
    )
    channel = connection.channel()

    # Declare the dead letter exchange
    channel.exchange_declare(
        exchange="dlx",
        exchange_type="direct",
    )

    # Declare the dead letter queue
    channel.queue_declare(queue="orders.dlq", durable=True)
    channel.queue_bind(
        queue="orders.dlq",
        exchange="dlx",
        routing_key="orders",
    )

    # Declare the main queue with DLQ routing
    channel.queue_declare(
        queue="orders",
        durable=True,
        arguments={
            "x-dead-letter-exchange": "dlx",
            "x-dead-letter-routing-key": "orders",
            "x-message-ttl": 300000,  # 5 min TTL (optional)
        },
    )

    return channel


def consume_with_poison_pill_handling(channel):
    handler = PoisonPillHandler(max_attempts=3)

    def callback(ch, method, properties, body):
        headers = properties.headers or {}
        death_count = 0

        # RabbitMQ tracks rejections in x-death header
        x_death = headers.get("x-death", [])
        if x_death:
            death_count = sum(d.get("count", 0) for d in x_death)

        if death_count >= handler.max_attempts:
            logger.error(
                "Message exceeded max attempts (%d) — "
                "will route to DLQ via reject",
                death_count,
            )
            ch.basic_reject(
                delivery_tag=method.delivery_tag,
                requeue=False,  # Routes to DLX
            )
            return

        try:
            process_message(json.loads(body))
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except (ValueError, KeyError) as exc:
            # Permanent failure — reject without requeue
            logger.warning("Permanent failure: %s", exc)
            ch.basic_reject(
                delivery_tag=method.delivery_tag,
                requeue=False,
            )
        except Exception as exc:
            # Transient — requeue for retry
            logger.warning("Transient failure: %s", exc)
            ch.basic_nack(
                delivery_tag=method.delivery_tag,
                requeue=True,
            )

    channel.basic_consume(
        queue="orders",
        on_message_callback=callback,
        auto_ack=False,
    )
    channel.start_consuming()

Kafka Poison Pill Handling

Kafka doesn’t have built-in DLQ support, so you manage it at the consumer level:

from confluent_kafka import Consumer, Producer, KafkaError
import json

def create_kafka_consumer_with_dlq():
    consumer = Consumer({
        "bootstrap.servers": "localhost:9092",
        "group.id": "order-processor",
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
    })

    dlq_producer = Producer({
        "bootstrap.servers": "localhost:9092",
    })

    consumer.subscribe(["orders"])
    handler = PoisonPillHandler(max_attempts=3)

    # Track per-message retry counts in memory
    # (for production, use a persistent store)
    retry_counts: dict[str, int] = {}

    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            continue

        message_key = f"{msg.topic()}-{msg.partition()}-{msg.offset()}"
        retry_counts[message_key] = retry_counts.get(message_key, 0) + 1
        attempt = retry_counts[message_key]

        try:
            data = json.loads(msg.value())
            process_order(data)
            consumer.commit(msg)
            retry_counts.pop(message_key, None)

        except (ValueError, KeyError) as exc:
            # Permanent — send to DLQ immediately
            send_to_dlq(
                dlq_producer, msg,
                error=str(exc),
                classification="permanent",
            )
            consumer.commit(msg)
            retry_counts.pop(message_key, None)

        except Exception as exc:
            if attempt >= handler.max_attempts:
                send_to_dlq(
                    dlq_producer, msg,
                    error=str(exc),
                    classification="exhausted",
                )
                consumer.commit(msg)
                retry_counts.pop(message_key, None)
            else:
                logger.warning(
                    "Attempt %d/%d for %s: %s",
                    attempt, handler.max_attempts, message_key, exc,
                )
                # Don't commit — message will be redelivered

def send_to_dlq(producer, original_msg, error, classification):
    dlq_message = json.dumps({
        "original_topic": original_msg.topic(),
        "original_partition": original_msg.partition(),
        "original_offset": original_msg.offset(),
        "original_key": (
            original_msg.key().decode()
            if original_msg.key() else None
        ),
        "original_value": original_msg.value().decode(),
        "error": error,
        "classification": classification,
        "failed_at": time.time(),
    })
    producer.produce(
        topic=f"{original_msg.topic()}.dlq",
        value=dlq_message.encode(),
        key=original_msg.key(),
    )
    producer.flush()

DLQ Replay Tool

Build tooling to inspect and replay DLQ messages:

import json
import redis

class DLQManager:
    """Inspect and replay messages from the dead letter queue."""

    def __init__(self, redis_client: redis.Redis, dlq_key: str):
        self.redis = redis_client
        self.dlq_key = dlq_key

    def count(self) -> int:
        return self.redis.llen(self.dlq_key)

    def peek(self, limit: int = 10) -> list[dict]:
        entries = self.redis.lrange(self.dlq_key, 0, limit - 1)
        return [json.loads(e) for e in entries]

    def replay_one(self, task_func: Callable) -> dict:
        entry_raw = self.redis.rpop(self.dlq_key)
        if not entry_raw:
            return {"status": "empty"}

        entry = json.loads(entry_raw)
        try:
            task_func.delay(*entry["args"], **entry["kwargs"])
            return {"status": "replayed", "task_id": entry["task_id"]}
        except Exception as exc:
            # Put it back if replay submission fails
            self.redis.rpush(self.dlq_key, entry_raw)
            return {"status": "error", "error": str(exc)}

    def replay_all(self, task_func: Callable) -> dict:
        replayed = 0
        failed = 0
        while self.redis.llen(self.dlq_key) > 0:
            result = self.replay_one(task_func)
            if result["status"] == "replayed":
                replayed += 1
            elif result["status"] == "error":
                failed += 1
                break
        return {"replayed": replayed, "failed": failed}

    def purge(self) -> int:
        count = self.redis.llen(self.dlq_key)
        self.redis.delete(self.dlq_key)
        return count

Monitoring

from prometheus_client import Counter, Gauge

poison_pills_detected = Counter(
    "poison_pills_total",
    "Messages routed to DLQ",
    ["queue", "classification"],
)

dlq_depth = Gauge(
    "dlq_depth",
    "Number of messages in the dead letter queue",
    ["queue"],
)

message_retries = Counter(
    "message_retries_total",
    "Total message retry attempts",
    ["queue"],
)

Critical alerts:

  • DLQ depth > 0 and growing → new poison pills arriving, investigate immediately
  • DLQ depth > 100 → systemic issue (schema change? bug in consumer?)
  • Same message_id retried > max_attempts → retry logic bypass (configuration error)
  • Permanent error rate > 1% → producer validation is broken

Testing Poison Pill Handling

import pytest

def test_permanent_error_skips_retry():
    handler = PoisonPillHandler(max_attempts=5)
    metadata = MessageMetadata(message_id="test-1", attempt=1)

    # ValueError is classified as permanent
    should_retry = handler.should_retry(metadata, ValueError("bad data"))
    assert should_retry is False

def test_transient_error_allows_retry():
    handler = PoisonPillHandler(max_attempts=5)
    metadata = MessageMetadata(message_id="test-2", attempt=1)

    should_retry = handler.should_retry(metadata, ConnectionError("timeout"))
    assert should_retry is True

def test_transient_exhausts_retries():
    handler = PoisonPillHandler(max_attempts=3)
    metadata = MessageMetadata(message_id="test-3", attempt=3)

    should_retry = handler.should_retry(metadata, ConnectionError("timeout"))
    assert should_retry is False

def test_dlq_replay_resubmits_message():
    r = redis.Redis()
    dlq = DLQManager(r, "test:dlq")
    # ... test replay logic

One thing to remember: Every message queue will eventually encounter a poison pill. The difference between a production-ready system and a fragile one is whether that poison pill blocks the queue for hours or gets safely isolated in seconds. Classify errors, set retry limits, route to dead letter queues, and build replay tooling — because you will need it.

pythonreliabilitymessaging

See Also