Python Poison Pill Handling — Deep Dive
A Generic Poison Pill Handler
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
class ErrorClassification(Enum):
TRANSIENT = "transient" # Retry with backoff
PERMANENT = "permanent" # Send to DLQ immediately
UNKNOWN = "unknown" # Retry up to limit, then DLQ
@dataclass
class MessageMetadata:
message_id: str
attempt: int = 1
max_attempts: int = 5
first_failure_time: Optional[float] = None
last_error: Optional[str] = None
@dataclass
class PoisonPillHandler:
"""Routes messages based on failure classification."""
max_attempts: int = 5
error_classifier: Optional[Callable[[Exception], ErrorClassification]] = None
on_dlq: Optional[Callable[[Any, MessageMetadata, Exception], None]] = None
on_retry: Optional[Callable[[Any, MessageMetadata, float], None]] = None
def classify_error(self, exc: Exception) -> ErrorClassification:
if self.error_classifier:
return self.error_classifier(exc)
return self._default_classifier(exc)
def _default_classifier(self, exc: Exception) -> ErrorClassification:
# Common permanent failures
permanent_types = (
ValueError, TypeError, KeyError,
UnicodeDecodeError, json.JSONDecodeError,
)
if isinstance(exc, permanent_types):
return ErrorClassification.PERMANENT
# Common transient failures
transient_types = (
ConnectionError, TimeoutError, OSError,
)
if isinstance(exc, transient_types):
return ErrorClassification.TRANSIENT
return ErrorClassification.UNKNOWN
def should_retry(
self,
metadata: MessageMetadata,
exc: Exception,
) -> bool:
classification = self.classify_error(exc)
if classification == ErrorClassification.PERMANENT:
logger.warning(
"Permanent failure for message %s — routing to DLQ: %s",
metadata.message_id, exc,
)
return False
if classification == ErrorClassification.TRANSIENT:
if metadata.attempt >= metadata.max_attempts:
logger.error(
"Transient failure exhausted retries for %s "
"after %d attempts — routing to DLQ",
metadata.message_id, metadata.attempt,
)
return False
return True
# UNKNOWN: retry up to limit
return metadata.attempt < metadata.max_attempts
import json
Celery Integration
Celery has built-in retry support, but poison pill handling needs explicit configuration:
from celery import Celery, Task
from celery.exceptions import MaxRetriesExceededError, Reject
import json
app = Celery("tasks", broker="redis://localhost:6379")
class PoisonPillAwareTask(Task):
"""Base task class with poison pill detection."""
max_retries = 4
default_retry_delay = 5
autoretry_for = (ConnectionError, TimeoutError)
retry_backoff = True
retry_jitter = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Called when all retries are exhausted."""
logger.error(
"Task %s permanently failed (message to DLQ): %s",
task_id, exc,
)
# Store in a dead letter collection
store_in_dlq(
task_name=self.name,
task_id=task_id,
args=args,
kwargs=kwargs,
error=str(exc),
traceback=str(einfo),
)
@app.task(
base=PoisonPillAwareTask,
bind=True,
acks_late=True, # Don't ack until processing succeeds
)
def process_order(self, order_data: dict):
try:
validated = validate_order(order_data)
except (ValueError, KeyError) as exc:
# Permanent error — don't retry, go straight to DLQ
logger.error("Invalid order data: %s", exc)
raise Reject(exc, requeue=False)
try:
result = submit_to_payment_service(validated)
return result
except ConnectionError as exc:
# Transient — retry with backoff
raise self.retry(exc=exc)
def store_in_dlq(task_name, task_id, args, kwargs, error, traceback):
"""Store failed message metadata for later investigation."""
import redis
r = redis.Redis()
dlq_entry = json.dumps({
"task_name": task_name,
"task_id": task_id,
"args": args,
"kwargs": kwargs,
"error": error,
"traceback": traceback,
"failed_at": time.time(),
})
r.lpush("dlq:orders", dlq_entry)
r.incr("dlq:orders:count")
RabbitMQ Dead Letter Exchange
RabbitMQ supports dead letter exchanges natively:
import pika
def setup_queues_with_dlq():
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
# Declare the dead letter exchange
channel.exchange_declare(
exchange="dlx",
exchange_type="direct",
)
# Declare the dead letter queue
channel.queue_declare(queue="orders.dlq", durable=True)
channel.queue_bind(
queue="orders.dlq",
exchange="dlx",
routing_key="orders",
)
# Declare the main queue with DLQ routing
channel.queue_declare(
queue="orders",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "orders",
"x-message-ttl": 300000, # 5 min TTL (optional)
},
)
return channel
def consume_with_poison_pill_handling(channel):
handler = PoisonPillHandler(max_attempts=3)
def callback(ch, method, properties, body):
headers = properties.headers or {}
death_count = 0
# RabbitMQ tracks rejections in x-death header
x_death = headers.get("x-death", [])
if x_death:
death_count = sum(d.get("count", 0) for d in x_death)
if death_count >= handler.max_attempts:
logger.error(
"Message exceeded max attempts (%d) — "
"will route to DLQ via reject",
death_count,
)
ch.basic_reject(
delivery_tag=method.delivery_tag,
requeue=False, # Routes to DLX
)
return
try:
process_message(json.loads(body))
ch.basic_ack(delivery_tag=method.delivery_tag)
except (ValueError, KeyError) as exc:
# Permanent failure — reject without requeue
logger.warning("Permanent failure: %s", exc)
ch.basic_reject(
delivery_tag=method.delivery_tag,
requeue=False,
)
except Exception as exc:
# Transient — requeue for retry
logger.warning("Transient failure: %s", exc)
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True,
)
channel.basic_consume(
queue="orders",
on_message_callback=callback,
auto_ack=False,
)
channel.start_consuming()
Kafka Poison Pill Handling
Kafka doesn’t have built-in DLQ support, so you manage it at the consumer level:
from confluent_kafka import Consumer, Producer, KafkaError
import json
def create_kafka_consumer_with_dlq():
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": "order-processor",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
dlq_producer = Producer({
"bootstrap.servers": "localhost:9092",
})
consumer.subscribe(["orders"])
handler = PoisonPillHandler(max_attempts=3)
# Track per-message retry counts in memory
# (for production, use a persistent store)
retry_counts: dict[str, int] = {}
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
continue
message_key = f"{msg.topic()}-{msg.partition()}-{msg.offset()}"
retry_counts[message_key] = retry_counts.get(message_key, 0) + 1
attempt = retry_counts[message_key]
try:
data = json.loads(msg.value())
process_order(data)
consumer.commit(msg)
retry_counts.pop(message_key, None)
except (ValueError, KeyError) as exc:
# Permanent — send to DLQ immediately
send_to_dlq(
dlq_producer, msg,
error=str(exc),
classification="permanent",
)
consumer.commit(msg)
retry_counts.pop(message_key, None)
except Exception as exc:
if attempt >= handler.max_attempts:
send_to_dlq(
dlq_producer, msg,
error=str(exc),
classification="exhausted",
)
consumer.commit(msg)
retry_counts.pop(message_key, None)
else:
logger.warning(
"Attempt %d/%d for %s: %s",
attempt, handler.max_attempts, message_key, exc,
)
# Don't commit — message will be redelivered
def send_to_dlq(producer, original_msg, error, classification):
dlq_message = json.dumps({
"original_topic": original_msg.topic(),
"original_partition": original_msg.partition(),
"original_offset": original_msg.offset(),
"original_key": (
original_msg.key().decode()
if original_msg.key() else None
),
"original_value": original_msg.value().decode(),
"error": error,
"classification": classification,
"failed_at": time.time(),
})
producer.produce(
topic=f"{original_msg.topic()}.dlq",
value=dlq_message.encode(),
key=original_msg.key(),
)
producer.flush()
DLQ Replay Tool
Build tooling to inspect and replay DLQ messages:
import json
import redis
class DLQManager:
"""Inspect and replay messages from the dead letter queue."""
def __init__(self, redis_client: redis.Redis, dlq_key: str):
self.redis = redis_client
self.dlq_key = dlq_key
def count(self) -> int:
return self.redis.llen(self.dlq_key)
def peek(self, limit: int = 10) -> list[dict]:
entries = self.redis.lrange(self.dlq_key, 0, limit - 1)
return [json.loads(e) for e in entries]
def replay_one(self, task_func: Callable) -> dict:
entry_raw = self.redis.rpop(self.dlq_key)
if not entry_raw:
return {"status": "empty"}
entry = json.loads(entry_raw)
try:
task_func.delay(*entry["args"], **entry["kwargs"])
return {"status": "replayed", "task_id": entry["task_id"]}
except Exception as exc:
# Put it back if replay submission fails
self.redis.rpush(self.dlq_key, entry_raw)
return {"status": "error", "error": str(exc)}
def replay_all(self, task_func: Callable) -> dict:
replayed = 0
failed = 0
while self.redis.llen(self.dlq_key) > 0:
result = self.replay_one(task_func)
if result["status"] == "replayed":
replayed += 1
elif result["status"] == "error":
failed += 1
break
return {"replayed": replayed, "failed": failed}
def purge(self) -> int:
count = self.redis.llen(self.dlq_key)
self.redis.delete(self.dlq_key)
return count
Monitoring
from prometheus_client import Counter, Gauge
poison_pills_detected = Counter(
"poison_pills_total",
"Messages routed to DLQ",
["queue", "classification"],
)
dlq_depth = Gauge(
"dlq_depth",
"Number of messages in the dead letter queue",
["queue"],
)
message_retries = Counter(
"message_retries_total",
"Total message retry attempts",
["queue"],
)
Critical alerts:
- DLQ depth > 0 and growing → new poison pills arriving, investigate immediately
- DLQ depth > 100 → systemic issue (schema change? bug in consumer?)
- Same message_id retried > max_attempts → retry logic bypass (configuration error)
- Permanent error rate > 1% → producer validation is broken
Testing Poison Pill Handling
import pytest
def test_permanent_error_skips_retry():
handler = PoisonPillHandler(max_attempts=5)
metadata = MessageMetadata(message_id="test-1", attempt=1)
# ValueError is classified as permanent
should_retry = handler.should_retry(metadata, ValueError("bad data"))
assert should_retry is False
def test_transient_error_allows_retry():
handler = PoisonPillHandler(max_attempts=5)
metadata = MessageMetadata(message_id="test-2", attempt=1)
should_retry = handler.should_retry(metadata, ConnectionError("timeout"))
assert should_retry is True
def test_transient_exhausts_retries():
handler = PoisonPillHandler(max_attempts=3)
metadata = MessageMetadata(message_id="test-3", attempt=3)
should_retry = handler.should_retry(metadata, ConnectionError("timeout"))
assert should_retry is False
def test_dlq_replay_resubmits_message():
r = redis.Redis()
dlq = DLQManager(r, "test:dlq")
# ... test replay logic
One thing to remember: Every message queue will eventually encounter a poison pill. The difference between a production-ready system and a fragile one is whether that poison pill blocks the queue for hours or gets safely isolated in seconds. Classify errors, set retry limits, route to dead letter queues, and build replay tooling — because you will need it.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.