Python Idempotency Patterns — Deep Dive
Idempotency Key Middleware in FastAPI
A reusable idempotency layer that intercepts requests, checks for duplicate keys, and returns cached responses:
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import json
import hashlib
class IdempotencyMiddleware(BaseHTTPMiddleware):
def __init__(self, app, store, ttl_seconds=86400):
super().__init__(app)
self.store = store # Redis or similar
self.ttl = ttl_seconds
async def dispatch(self, request: Request, call_next):
if request.method not in ("POST", "PUT", "PATCH"):
return await call_next(request)
idem_key = request.headers.get("Idempotency-Key")
if not idem_key:
return await call_next(request)
cache_key = f"idempotency:{idem_key}"
# Check for existing result
cached = await self.store.get(cache_key)
if cached:
data = json.loads(cached)
return Response(
content=data["body"],
status_code=data["status"],
headers={"X-Idempotent-Replay": "true"},
media_type="application/json",
)
# Check for in-progress request (lock)
lock_key = f"idempotency:lock:{idem_key}"
acquired = await self.store.set(lock_key, "1", nx=True, ex=30)
if not acquired:
return Response(
content='{"error": "Request already in progress"}',
status_code=409,
media_type="application/json",
)
try:
response = await call_next(request)
# Cache successful responses
if 200 <= response.status_code < 300:
body = b""
async for chunk in response.body_iterator:
body += chunk
await self.store.set(
cache_key,
json.dumps({"body": body.decode(), "status": response.status_code}),
ex=self.ttl,
)
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
)
return response
finally:
await self.store.delete(lock_key)
Key design decisions:
- Only cache successful responses — A failed request should be retryable
- Use a lock for in-progress requests — Prevents concurrent processing of the same key
- TTL on cached results — Keys don’t live forever; 24 hours is typical
- 409 for concurrent duplicates — Tells the client to wait and retry
Database-Level Idempotency
Upsert Pattern with SQLAlchemy
from sqlalchemy.dialects.postgresql import insert
async def create_order(db: AsyncSession, order_data: dict, idempotency_key: str):
stmt = insert(Order).values(
idempotency_key=idempotency_key,
user_id=order_data["user_id"],
amount=order_data["amount"],
status="pending",
).on_conflict_do_nothing(
index_elements=["idempotency_key"]
).returning(Order)
result = await db.execute(stmt)
order = result.scalar_one_or_none()
if order is None:
# Duplicate key — fetch the existing order
existing = await db.execute(
select(Order).where(Order.idempotency_key == idempotency_key)
)
return existing.scalar_one(), False # (order, is_new)
return order, True
The database ensures atomicity. Even if two requests with the same key arrive simultaneously, only one insert succeeds.
Django with get_or_create
def process_webhook(event_id: str, payload: dict):
event, created = WebhookEvent.objects.get_or_create(
event_id=event_id,
defaults={
"payload": payload,
"status": "pending",
},
)
if not created:
# Already processed or in progress
return event
try:
handle_event(event)
event.status = "completed"
event.save()
except Exception:
event.status = "failed"
event.save()
raise
return event
get_or_create uses a database unique constraint under the hood. The event_id column must have a unique index for this to be safe under concurrency.
State Machine for Complex Operations
For multi-step operations where partial completion is possible:
from enum import Enum
class PaymentStatus(str, Enum):
CREATED = "created"
CHARGING = "charging"
CHARGED = "charged"
FAILED = "failed"
class PaymentProcessor:
async def process(self, db: AsyncSession, payment_id: str, amount: int):
# Step 1: Create or fetch payment record
payment = await self._get_or_create(db, payment_id, amount)
# Step 2: Check current state
if payment.status == PaymentStatus.CHARGED:
return payment # Already done
if payment.status == PaymentStatus.CHARGING:
# Check with payment provider if charge went through
return await self._reconcile(payment)
# Step 3: Transition to charging
payment.status = PaymentStatus.CHARGING
await db.commit()
# Step 4: Call external payment provider
try:
provider_ref = await self.provider.charge(amount)
payment.provider_reference = provider_ref
payment.status = PaymentStatus.CHARGED
await db.commit()
except Exception:
payment.status = PaymentStatus.FAILED
await db.commit()
raise
return payment
async def _reconcile(self, payment):
"""Check provider for in-flight charges."""
result = await self.provider.get_charge(payment.provider_reference)
if result and result.status == "succeeded":
payment.status = PaymentStatus.CHARGED
await self.db.commit()
return payment
Each state transition is persisted before external calls. If the process crashes between the CHARGING commit and the provider call, the reconciliation step detects and resolves the in-flight state.
Distributed Locking for Idempotency
When multiple application instances process the same request:
import redis.asyncio as redis
class RedisLock:
def __init__(self, client: redis.Redis, key: str, ttl: int = 30):
self.client = client
self.key = f"lock:{key}"
self.ttl = ttl
self.token = None
async def acquire(self) -> bool:
import uuid
self.token = str(uuid.uuid4())
return await self.client.set(self.key, self.token, nx=True, ex=self.ttl)
async def release(self):
# Only release if we own the lock (compare token)
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
end
return 0
"""
await self.client.eval(script, 1, self.key, self.token)
The Lua script ensures atomic check-and-delete, preventing one process from releasing another’s lock.
Handling Idempotency in Message Queues
Queue consumers receive messages “at least once.” Deduplication strategies:
class IdempotentConsumer:
def __init__(self, db: AsyncSession, redis_client):
self.db = db
self.redis = redis_client
async def process_message(self, message):
message_id = message.headers.get("message-id")
# Fast check with Redis
if await self.redis.exists(f"processed:{message_id}"):
return # Already handled
# Database check (authoritative)
exists = await self.db.scalar(
select(ProcessedMessage.id)
.where(ProcessedMessage.message_id == message_id)
)
if exists:
await self.redis.set(f"processed:{message_id}", "1", ex=86400)
return
# Process the message
await self._handle(message)
# Record as processed
self.db.add(ProcessedMessage(message_id=message_id))
await self.db.commit()
await self.redis.set(f"processed:{message_id}", "1", ex=86400)
Two-tier deduplication: Redis for fast lookups (most duplicates caught here), database for durability.
Fingerprint-Based Deduplication
When clients don’t send idempotency keys, generate them from request content:
def generate_fingerprint(user_id: int, endpoint: str, body: bytes) -> str:
content = f"{user_id}:{endpoint}:{body.decode()}"
return hashlib.sha256(content.encode()).hexdigest()
@app.post("/orders")
async def create_order(request: Request, order: OrderCreate, user=Depends(get_user)):
body = await request.body()
fingerprint = generate_fingerprint(user.id, "/orders", body)
# Use fingerprint as idempotency key with short TTL (e.g., 60 seconds)
cached = await redis.get(f"fingerprint:{fingerprint}")
if cached:
return JSONResponse(json.loads(cached), headers={"X-Deduplicated": "true"})
result = await process_order(order, user)
await redis.set(f"fingerprint:{fingerprint}", json.dumps(result), ex=60)
return result
Short TTL is important: you want to catch rapid double-submits, not prevent legitimate repeated orders days apart.
Stripe’s Approach (Industry Reference)
Stripe’s idempotency system:
- Client sends
Idempotency-Keyheader with every POST request - Stripe stores the key, request parameters, and response for 24 hours
- Replay requests must have identical parameters (different body with same key returns an error)
- Responses include
Idempotent-Replayed: trueheader on replays
The parameter check is crucial: it prevents accidentally reusing an idempotency key from one operation for a different operation.
Testing Idempotent Endpoints
async def test_idempotent_payment():
key = str(uuid.uuid4())
headers = {"Idempotency-Key": key}
# First request creates the payment
r1 = await client.post("/payments", json={"amount": 1000}, headers=headers)
assert r1.status_code == 201
# Second request returns cached result
r2 = await client.post("/payments", json={"amount": 1000}, headers=headers)
assert r2.status_code == 201
assert r2.json() == r1.json()
assert r2.headers.get("X-Idempotent-Replay") == "true"
# Only one payment exists
payments = await client.get("/payments")
assert len(payments.json()["data"]) == 1
The one thing to remember: Production idempotency combines client-provided keys stored in Redis with database constraints as the safety net — always lock during processing, only cache successful responses, and use state machines for multi-step operations.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.