Python Message Deduplication — Deep Dive

Dedup Architecture Overview

A production deduplication system has four components:

  1. ID Generation — producing globally unique, deterministic identifiers
  2. Dedup Store — tracking which IDs have been processed
  3. Atomic Check-and-Process — ensuring the dedup check and processing happen as one logical unit
  4. Cleanup — expiring old entries to prevent unbounded growth

Getting any one of these wrong leads to either missed duplicates or false rejections.

ID Generation Strategies

UUID per Message

import uuid

def create_message(payload):
    return {
        'id': str(uuid.uuid4()),
        'payload': payload,
        'created_at': time.time(),
    }

Simple but has a gap: if the producer crashes after sending but before recording the ID, it generates a new UUID on retry — the duplicate gets a different ID and slips through.

Deterministic IDs from Content

import hashlib
import json

def deterministic_id(payload, context_keys=None):
    """Generate the same ID for the same logical operation."""
    # Include operation-specific fields that make this unique
    content = json.dumps(payload, sort_keys=True)
    if context_keys:
        content += '|'.join(str(context_keys.get(k, ''))
                           for k in sorted(context_keys))
    return hashlib.sha256(content.encode()).hexdigest()[:32]

# Same input always produces the same ID
msg_id = deterministic_id(
    {'user_id': 42, 'amount': 50.00},
    context_keys={'operation': 'charge', 'order': 'ORD-789'}
)

This is producer-retry-safe: resending the same operation produces the same ID, which the dedup layer catches.

Client-Provided Idempotency Keys

Stripe, AWS, and many APIs use this pattern. The client includes an Idempotency-Key header. The server uses it for deduplication. If the client retries with the same key, the server returns the cached response.

from fastapi import FastAPI, Header, HTTPException
import redis

app = FastAPI()
r = redis.Redis()

@app.post("/charges")
async def create_charge(
    amount: float,
    idempotency_key: str = Header(...),
):
    # Check if we've seen this key
    cached = r.get(f"idempotency:{idempotency_key}")
    if cached:
        return json.loads(cached)

    # Process the charge
    result = process_payment(amount)

    # Cache the result with 24h TTL
    r.setex(
        f"idempotency:{idempotency_key}",
        86400,
        json.dumps(result)
    )
    return result

Redis Dedup Implementations

Basic SET NX Pattern

import redis
import json

class RedisDeduplicator:
    def __init__(self, redis_client, prefix='dedup',
                 ttl_seconds=3600):
        self.r = redis_client
        self.prefix = prefix
        self.ttl = ttl_seconds

    def is_duplicate(self, message_id: str) -> bool:
        key = f"{self.prefix}:{message_id}"
        # SET NX returns True if key was set (new message)
        was_set = self.r.set(key, '1', nx=True, ex=self.ttl)
        return not was_set  # True if duplicate

    def process_if_new(self, message_id, handler, payload):
        if self.is_duplicate(message_id):
            return None  # Skip duplicate
        try:
            result = handler(payload)
            return result
        except Exception:
            # If processing fails, remove dedup key
            # so the message can be retried
            self.r.delete(f"{self.prefix}:{message_id}")
            raise

The key subtlety: if processing fails after the dedup key is set, you must remove the key. Otherwise the retry is treated as a duplicate and the message is permanently lost. This creates a small window for actual duplicates if the process crashes between handler() succeeding and the function returning — but that window is usually acceptable.

Lua Script for Atomic Dedup + Metadata

DEDUP_SCRIPT = """
local key = KEYS[1]
local ttl = tonumber(ARGV[1])
local metadata = ARGV[2]

if redis.call('EXISTS', key) == 1 then
    return 0  -- duplicate
end

redis.call('SET', key, metadata, 'EX', ttl)
return 1  -- new message
"""

class AtomicDeduplicator:
    def __init__(self, redis_client, ttl=3600):
        self.r = redis_client
        self.ttl = ttl
        self._script = self.r.register_script(DEDUP_SCRIPT)

    def check_and_mark(self, message_id, metadata=""):
        result = self._script(
            keys=[f"dedup:{message_id}"],
            args=[self.ttl, metadata]
        )
        return bool(result)  # True = new, False = duplicate

Bloom Filter Deduplication

For extremely high-volume streams where Redis memory is a concern, Bloom filters provide probabilistic deduplication with constant memory usage.

import mmh3
from bitarray import bitarray

class BloomDeduplicator:
    def __init__(self, expected_items=1_000_000,
                 fp_rate=0.001):
        self.size = self._optimal_size(expected_items,
                                        fp_rate)
        self.hash_count = self._optimal_hashes(
            self.size, expected_items
        )
        self.bits = bitarray(self.size)
        self.bits.setall(0)
        self.count = 0

    def _optimal_size(self, n, p):
        import math
        return int(-n * math.log(p) / (math.log(2) ** 2))

    def _optimal_hashes(self, m, n):
        import math
        return int((m / n) * math.log(2))

    def _get_positions(self, item):
        positions = []
        for seed in range(self.hash_count):
            pos = mmh3.hash(item, seed) % self.size
            positions.append(pos)
        return positions

    def is_duplicate(self, message_id: str) -> bool:
        positions = self._get_positions(message_id)
        if all(self.bits[p] for p in positions):
            return True  # probably seen (may be false positive)
        for p in positions:
            self.bits[p] = 1
        self.count += 1
        return False  # definitely new

Tradeoffs:

  • Pro: O(1) memory per check, very fast
  • Con: False positives (says duplicate when it isn’t), can’t remove entries, can’t expire individual entries

For time-windowed dedup, use rotating Bloom filters: maintain two filters (current window and previous window), swap periodically.

Redis Bloom (Production)

Redis Stack includes a native Bloom filter module:

# Requires Redis Stack with RedisBloom module
class RedisBoomDedup:
    def __init__(self, redis_client, key='dedup:bloom',
                 capacity=1_000_000, error_rate=0.001):
        self.r = redis_client
        self.key = key
        # Create if not exists
        try:
            self.r.execute_command(
                'BF.RESERVE', key, error_rate, capacity
            )
        except redis.ResponseError:
            pass  # Already exists

    def is_duplicate(self, message_id: str) -> bool:
        # BF.ADD returns 0 if item already existed
        result = self.r.execute_command(
            'BF.ADD', self.key, message_id
        )
        return result == 0

Database-Level Deduplication

For critical paths (payments, account mutations), use database constraints as the dedup mechanism:

from sqlalchemy import create_engine, text
from sqlalchemy.exc import IntegrityError

engine = create_engine('postgresql://...')

def process_with_db_dedup(message_id, payload):
    with engine.begin() as conn:
        try:
            # Insert dedup record and process in same tx
            conn.execute(text("""
                INSERT INTO processed_messages
                    (message_id, processed_at)
                VALUES (:id, NOW())
            """), {'id': message_id})

            # Business logic within same transaction
            conn.execute(text("""
                UPDATE accounts
                SET balance = balance + :amount
                WHERE id = :account_id
            """), {
                'amount': payload['amount'],
                'account_id': payload['account_id'],
            })
            # Both succeed or both fail — atomic
        except IntegrityError:
            # Duplicate key — message already processed
            return None

This is the strongest guarantee: the dedup check and the business operation happen in the same database transaction. Either both commit or neither does.

Transactional Outbox Pattern

The outbox pattern solves the dual-write problem: you need to update your database AND publish a message, but you can’t do both atomically across two systems.

def create_order_with_outbox(session, order_data):
    # Step 1: Write order + outbox entry in same transaction
    order = Order(**order_data)
    session.add(order)

    outbox_entry = OutboxMessage(
        id=str(uuid.uuid4()),
        aggregate_type='Order',
        aggregate_id=order.id,
        event_type='OrderCreated',
        payload=json.dumps(order_data),
    )
    session.add(outbox_entry)
    session.commit()  # Atomic: both or neither

    # Step 2: Separate process reads outbox and publishes
    # (with its own dedup on the consumer side)

A separate poller reads the outbox table and publishes to the message broker. The consumer deduplicates using the outbox entry’s ID.

Performance Comparison

Dedup throughput for 1 million messages:

MethodThroughputMemoryDurability
In-memory set2M ops/s~50MB/1M IDsNone
Redis SET NX100K ops/s~80MB/1M IDsRestarts
Redis Bloom200K ops/s~2MB/1M IDsRestarts
PostgreSQL unique constraint15K ops/s~40MB/1M rowsFull

Choose based on your durability needs and throughput requirements. Layer them: Bloom filter as a fast first pass, Redis for the window, database for critical operations.

One thing to remember: The safest dedup strategy combines a deterministic message ID (same operation always produces the same ID) with a database unique constraint in the same transaction as your business logic. Everything else is an optimization on top of this foundation.

pythonmessagingreliability

See Also