Python Message Deduplication — Deep Dive
Dedup Architecture Overview
A production deduplication system has four components:
- ID Generation — producing globally unique, deterministic identifiers
- Dedup Store — tracking which IDs have been processed
- Atomic Check-and-Process — ensuring the dedup check and processing happen as one logical unit
- Cleanup — expiring old entries to prevent unbounded growth
Getting any one of these wrong leads to either missed duplicates or false rejections.
ID Generation Strategies
UUID per Message
import uuid
def create_message(payload):
return {
'id': str(uuid.uuid4()),
'payload': payload,
'created_at': time.time(),
}
Simple but has a gap: if the producer crashes after sending but before recording the ID, it generates a new UUID on retry — the duplicate gets a different ID and slips through.
Deterministic IDs from Content
import hashlib
import json
def deterministic_id(payload, context_keys=None):
"""Generate the same ID for the same logical operation."""
# Include operation-specific fields that make this unique
content = json.dumps(payload, sort_keys=True)
if context_keys:
content += '|'.join(str(context_keys.get(k, ''))
for k in sorted(context_keys))
return hashlib.sha256(content.encode()).hexdigest()[:32]
# Same input always produces the same ID
msg_id = deterministic_id(
{'user_id': 42, 'amount': 50.00},
context_keys={'operation': 'charge', 'order': 'ORD-789'}
)
This is producer-retry-safe: resending the same operation produces the same ID, which the dedup layer catches.
Client-Provided Idempotency Keys
Stripe, AWS, and many APIs use this pattern. The client includes an Idempotency-Key header. The server uses it for deduplication. If the client retries with the same key, the server returns the cached response.
from fastapi import FastAPI, Header, HTTPException
import redis
app = FastAPI()
r = redis.Redis()
@app.post("/charges")
async def create_charge(
amount: float,
idempotency_key: str = Header(...),
):
# Check if we've seen this key
cached = r.get(f"idempotency:{idempotency_key}")
if cached:
return json.loads(cached)
# Process the charge
result = process_payment(amount)
# Cache the result with 24h TTL
r.setex(
f"idempotency:{idempotency_key}",
86400,
json.dumps(result)
)
return result
Redis Dedup Implementations
Basic SET NX Pattern
import redis
import json
class RedisDeduplicator:
def __init__(self, redis_client, prefix='dedup',
ttl_seconds=3600):
self.r = redis_client
self.prefix = prefix
self.ttl = ttl_seconds
def is_duplicate(self, message_id: str) -> bool:
key = f"{self.prefix}:{message_id}"
# SET NX returns True if key was set (new message)
was_set = self.r.set(key, '1', nx=True, ex=self.ttl)
return not was_set # True if duplicate
def process_if_new(self, message_id, handler, payload):
if self.is_duplicate(message_id):
return None # Skip duplicate
try:
result = handler(payload)
return result
except Exception:
# If processing fails, remove dedup key
# so the message can be retried
self.r.delete(f"{self.prefix}:{message_id}")
raise
The key subtlety: if processing fails after the dedup key is set, you must remove the key. Otherwise the retry is treated as a duplicate and the message is permanently lost. This creates a small window for actual duplicates if the process crashes between handler() succeeding and the function returning — but that window is usually acceptable.
Lua Script for Atomic Dedup + Metadata
DEDUP_SCRIPT = """
local key = KEYS[1]
local ttl = tonumber(ARGV[1])
local metadata = ARGV[2]
if redis.call('EXISTS', key) == 1 then
return 0 -- duplicate
end
redis.call('SET', key, metadata, 'EX', ttl)
return 1 -- new message
"""
class AtomicDeduplicator:
def __init__(self, redis_client, ttl=3600):
self.r = redis_client
self.ttl = ttl
self._script = self.r.register_script(DEDUP_SCRIPT)
def check_and_mark(self, message_id, metadata=""):
result = self._script(
keys=[f"dedup:{message_id}"],
args=[self.ttl, metadata]
)
return bool(result) # True = new, False = duplicate
Bloom Filter Deduplication
For extremely high-volume streams where Redis memory is a concern, Bloom filters provide probabilistic deduplication with constant memory usage.
import mmh3
from bitarray import bitarray
class BloomDeduplicator:
def __init__(self, expected_items=1_000_000,
fp_rate=0.001):
self.size = self._optimal_size(expected_items,
fp_rate)
self.hash_count = self._optimal_hashes(
self.size, expected_items
)
self.bits = bitarray(self.size)
self.bits.setall(0)
self.count = 0
def _optimal_size(self, n, p):
import math
return int(-n * math.log(p) / (math.log(2) ** 2))
def _optimal_hashes(self, m, n):
import math
return int((m / n) * math.log(2))
def _get_positions(self, item):
positions = []
for seed in range(self.hash_count):
pos = mmh3.hash(item, seed) % self.size
positions.append(pos)
return positions
def is_duplicate(self, message_id: str) -> bool:
positions = self._get_positions(message_id)
if all(self.bits[p] for p in positions):
return True # probably seen (may be false positive)
for p in positions:
self.bits[p] = 1
self.count += 1
return False # definitely new
Tradeoffs:
- Pro: O(1) memory per check, very fast
- Con: False positives (says duplicate when it isn’t), can’t remove entries, can’t expire individual entries
For time-windowed dedup, use rotating Bloom filters: maintain two filters (current window and previous window), swap periodically.
Redis Bloom (Production)
Redis Stack includes a native Bloom filter module:
# Requires Redis Stack with RedisBloom module
class RedisBoomDedup:
def __init__(self, redis_client, key='dedup:bloom',
capacity=1_000_000, error_rate=0.001):
self.r = redis_client
self.key = key
# Create if not exists
try:
self.r.execute_command(
'BF.RESERVE', key, error_rate, capacity
)
except redis.ResponseError:
pass # Already exists
def is_duplicate(self, message_id: str) -> bool:
# BF.ADD returns 0 if item already existed
result = self.r.execute_command(
'BF.ADD', self.key, message_id
)
return result == 0
Database-Level Deduplication
For critical paths (payments, account mutations), use database constraints as the dedup mechanism:
from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError
engine = create_engine('postgresql://...')
def process_with_db_dedup(message_id, payload):
with engine.begin() as conn:
try:
# Insert dedup record and process in same tx
conn.execute(text("""
INSERT INTO processed_messages
(message_id, processed_at)
VALUES (:id, NOW())
"""), {'id': message_id})
# Business logic within same transaction
conn.execute(text("""
UPDATE accounts
SET balance = balance + :amount
WHERE id = :account_id
"""), {
'amount': payload['amount'],
'account_id': payload['account_id'],
})
# Both succeed or both fail — atomic
except IntegrityError:
# Duplicate key — message already processed
return None
This is the strongest guarantee: the dedup check and the business operation happen in the same database transaction. Either both commit or neither does.
Transactional Outbox Pattern
The outbox pattern solves the dual-write problem: you need to update your database AND publish a message, but you can’t do both atomically across two systems.
def create_order_with_outbox(session, order_data):
# Step 1: Write order + outbox entry in same transaction
order = Order(**order_data)
session.add(order)
outbox_entry = OutboxMessage(
id=str(uuid.uuid4()),
aggregate_type='Order',
aggregate_id=order.id,
event_type='OrderCreated',
payload=json.dumps(order_data),
)
session.add(outbox_entry)
session.commit() # Atomic: both or neither
# Step 2: Separate process reads outbox and publishes
# (with its own dedup on the consumer side)
A separate poller reads the outbox table and publishes to the message broker. The consumer deduplicates using the outbox entry’s ID.
Performance Comparison
Dedup throughput for 1 million messages:
| Method | Throughput | Memory | Durability |
|---|---|---|---|
| In-memory set | 2M ops/s | ~50MB/1M IDs | None |
| Redis SET NX | 100K ops/s | ~80MB/1M IDs | Restarts |
| Redis Bloom | 200K ops/s | ~2MB/1M IDs | Restarts |
| PostgreSQL unique constraint | 15K ops/s | ~40MB/1M rows | Full |
Choose based on your durability needs and throughput requirements. Layer them: Bloom filter as a fast first pass, Redis for the window, database for critical operations.
One thing to remember: The safest dedup strategy combines a deterministic message ID (same operation always produces the same ID) with a database unique constraint in the same transaction as your business logic. Everything else is an optimization on top of this foundation.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Priority Queue Patterns Why some tasks cut the line in Python — and how priority queues decide who goes first.