Python Idempotency Patterns — Deep Dive

Idempotency Key Middleware in FastAPI

A reusable idempotency layer that intercepts requests, checks for duplicate keys, and returns cached responses:

from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import json
import hashlib

class IdempotencyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, store, ttl_seconds=86400):
        super().__init__(app)
        self.store = store  # Redis or similar
        self.ttl = ttl_seconds

    async def dispatch(self, request: Request, call_next):
        if request.method not in ("POST", "PUT", "PATCH"):
            return await call_next(request)

        idem_key = request.headers.get("Idempotency-Key")
        if not idem_key:
            return await call_next(request)

        cache_key = f"idempotency:{idem_key}"

        # Check for existing result
        cached = await self.store.get(cache_key)
        if cached:
            data = json.loads(cached)
            return Response(
                content=data["body"],
                status_code=data["status"],
                headers={"X-Idempotent-Replay": "true"},
                media_type="application/json",
            )

        # Check for in-progress request (lock)
        lock_key = f"idempotency:lock:{idem_key}"
        acquired = await self.store.set(lock_key, "1", nx=True, ex=30)
        if not acquired:
            return Response(
                content='{"error": "Request already in progress"}',
                status_code=409,
                media_type="application/json",
            )

        try:
            response = await call_next(request)

            # Cache successful responses
            if 200 <= response.status_code < 300:
                body = b""
                async for chunk in response.body_iterator:
                    body += chunk

                await self.store.set(
                    cache_key,
                    json.dumps({"body": body.decode(), "status": response.status_code}),
                    ex=self.ttl,
                )
                return Response(
                    content=body,
                    status_code=response.status_code,
                    headers=dict(response.headers),
                    media_type=response.media_type,
                )
            return response
        finally:
            await self.store.delete(lock_key)

Key design decisions:

  • Only cache successful responses — A failed request should be retryable
  • Use a lock for in-progress requests — Prevents concurrent processing of the same key
  • TTL on cached results — Keys don’t live forever; 24 hours is typical
  • 409 for concurrent duplicates — Tells the client to wait and retry

Database-Level Idempotency

Upsert Pattern with SQLAlchemy

from sqlalchemy.dialects.postgresql import insert

async def create_order(db: AsyncSession, order_data: dict, idempotency_key: str):
    stmt = insert(Order).values(
        idempotency_key=idempotency_key,
        user_id=order_data["user_id"],
        amount=order_data["amount"],
        status="pending",
    ).on_conflict_do_nothing(
        index_elements=["idempotency_key"]
    ).returning(Order)

    result = await db.execute(stmt)
    order = result.scalar_one_or_none()

    if order is None:
        # Duplicate key — fetch the existing order
        existing = await db.execute(
            select(Order).where(Order.idempotency_key == idempotency_key)
        )
        return existing.scalar_one(), False  # (order, is_new)

    return order, True

The database ensures atomicity. Even if two requests with the same key arrive simultaneously, only one insert succeeds.

Django with get_or_create

def process_webhook(event_id: str, payload: dict):
    event, created = WebhookEvent.objects.get_or_create(
        event_id=event_id,
        defaults={
            "payload": payload,
            "status": "pending",
        },
    )

    if not created:
        # Already processed or in progress
        return event

    try:
        handle_event(event)
        event.status = "completed"
        event.save()
    except Exception:
        event.status = "failed"
        event.save()
        raise

    return event

get_or_create uses a database unique constraint under the hood. The event_id column must have a unique index for this to be safe under concurrency.

State Machine for Complex Operations

For multi-step operations where partial completion is possible:

from enum import Enum

class PaymentStatus(str, Enum):
    CREATED = "created"
    CHARGING = "charging"
    CHARGED = "charged"
    FAILED = "failed"

class PaymentProcessor:
    async def process(self, db: AsyncSession, payment_id: str, amount: int):
        # Step 1: Create or fetch payment record
        payment = await self._get_or_create(db, payment_id, amount)

        # Step 2: Check current state
        if payment.status == PaymentStatus.CHARGED:
            return payment  # Already done

        if payment.status == PaymentStatus.CHARGING:
            # Check with payment provider if charge went through
            return await self._reconcile(payment)

        # Step 3: Transition to charging
        payment.status = PaymentStatus.CHARGING
        await db.commit()

        # Step 4: Call external payment provider
        try:
            provider_ref = await self.provider.charge(amount)
            payment.provider_reference = provider_ref
            payment.status = PaymentStatus.CHARGED
            await db.commit()
        except Exception:
            payment.status = PaymentStatus.FAILED
            await db.commit()
            raise

        return payment

    async def _reconcile(self, payment):
        """Check provider for in-flight charges."""
        result = await self.provider.get_charge(payment.provider_reference)
        if result and result.status == "succeeded":
            payment.status = PaymentStatus.CHARGED
            await self.db.commit()
        return payment

Each state transition is persisted before external calls. If the process crashes between the CHARGING commit and the provider call, the reconciliation step detects and resolves the in-flight state.

Distributed Locking for Idempotency

When multiple application instances process the same request:

import redis.asyncio as redis

class RedisLock:
    def __init__(self, client: redis.Redis, key: str, ttl: int = 30):
        self.client = client
        self.key = f"lock:{key}"
        self.ttl = ttl
        self.token = None

    async def acquire(self) -> bool:
        import uuid
        self.token = str(uuid.uuid4())
        return await self.client.set(self.key, self.token, nx=True, ex=self.ttl)

    async def release(self):
        # Only release if we own the lock (compare token)
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        end
        return 0
        """
        await self.client.eval(script, 1, self.key, self.token)

The Lua script ensures atomic check-and-delete, preventing one process from releasing another’s lock.

Handling Idempotency in Message Queues

Queue consumers receive messages “at least once.” Deduplication strategies:

class IdempotentConsumer:
    def __init__(self, db: AsyncSession, redis_client):
        self.db = db
        self.redis = redis_client

    async def process_message(self, message):
        message_id = message.headers.get("message-id")

        # Fast check with Redis
        if await self.redis.exists(f"processed:{message_id}"):
            return  # Already handled

        # Database check (authoritative)
        exists = await self.db.scalar(
            select(ProcessedMessage.id)
            .where(ProcessedMessage.message_id == message_id)
        )
        if exists:
            await self.redis.set(f"processed:{message_id}", "1", ex=86400)
            return

        # Process the message
        await self._handle(message)

        # Record as processed
        self.db.add(ProcessedMessage(message_id=message_id))
        await self.db.commit()
        await self.redis.set(f"processed:{message_id}", "1", ex=86400)

Two-tier deduplication: Redis for fast lookups (most duplicates caught here), database for durability.

Fingerprint-Based Deduplication

When clients don’t send idempotency keys, generate them from request content:

def generate_fingerprint(user_id: int, endpoint: str, body: bytes) -> str:
    content = f"{user_id}:{endpoint}:{body.decode()}"
    return hashlib.sha256(content.encode()).hexdigest()

@app.post("/orders")
async def create_order(request: Request, order: OrderCreate, user=Depends(get_user)):
    body = await request.body()
    fingerprint = generate_fingerprint(user.id, "/orders", body)

    # Use fingerprint as idempotency key with short TTL (e.g., 60 seconds)
    cached = await redis.get(f"fingerprint:{fingerprint}")
    if cached:
        return JSONResponse(json.loads(cached), headers={"X-Deduplicated": "true"})

    result = await process_order(order, user)
    await redis.set(f"fingerprint:{fingerprint}", json.dumps(result), ex=60)
    return result

Short TTL is important: you want to catch rapid double-submits, not prevent legitimate repeated orders days apart.

Stripe’s Approach (Industry Reference)

Stripe’s idempotency system:

  1. Client sends Idempotency-Key header with every POST request
  2. Stripe stores the key, request parameters, and response for 24 hours
  3. Replay requests must have identical parameters (different body with same key returns an error)
  4. Responses include Idempotent-Replayed: true header on replays

The parameter check is crucial: it prevents accidentally reusing an idempotency key from one operation for a different operation.

Testing Idempotent Endpoints

async def test_idempotent_payment():
    key = str(uuid.uuid4())
    headers = {"Idempotency-Key": key}

    # First request creates the payment
    r1 = await client.post("/payments", json={"amount": 1000}, headers=headers)
    assert r1.status_code == 201

    # Second request returns cached result
    r2 = await client.post("/payments", json={"amount": 1000}, headers=headers)
    assert r2.status_code == 201
    assert r2.json() == r1.json()
    assert r2.headers.get("X-Idempotent-Replay") == "true"

    # Only one payment exists
    payments = await client.get("/payments")
    assert len(payments.json()["data"]) == 1

The one thing to remember: Production idempotency combines client-provided keys stored in Redis with database constraints as the safety net — always lock during processing, only cache successful responses, and use state machines for multi-step operations.

pythonwebapis

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.