Python Webhook Handlers — Deep Dive
System-level framing
A production webhook handler is a high-availability HTTP endpoint that receives event notifications from external services, verifies their authenticity, deduplicates deliveries, and dispatches processing to background workers. The handler must be fast (respond in under 5 seconds), resilient (never lose events), and idempotent (handle retries gracefully). It sits at the boundary between your system and external services, making it both a security surface and a reliability chokepoint.
FastAPI webhook handler structure
from fastapi import FastAPI, Request, HTTPException, Header, BackgroundTasks
from typing import Optional
import hashlib
import hmac
import json
import os
app = FastAPI()
WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]
@app.post("/webhooks/stripe")
async def stripe_webhook(
request: Request,
stripe_signature: Optional[str] = Header(None, alias="stripe-signature"),
):
body = await request.body()
if not verify_stripe_signature(body, stripe_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = json.loads(body)
event_id = payload.get("id")
event_type = payload.get("type")
if await is_duplicate(event_id):
return {"status": "already_processed"}
await store_raw_event(event_id, event_type, payload)
await enqueue_processing(event_id, event_type)
return {"status": "accepted"}
The handler follows the accept-and-defer pattern: verify, deduplicate, store, enqueue, respond. Processing happens elsewhere.
HMAC signature verification
Different services use different signing schemes. Here are the most common:
Generic HMAC-SHA256
def verify_hmac_sha256(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
Stripe verification
Stripe uses a timestamp + payload scheme to prevent replay attacks:
import time
def verify_stripe_signature(
payload: bytes, sig_header: str, tolerance: int = 300
) -> bool:
if not sig_header:
return False
elements = dict(
pair.split("=", 1) for pair in sig_header.split(",")
)
timestamp = elements.get("t")
signature = elements.get("v1")
if not timestamp or not signature:
return False
if abs(time.time() - int(timestamp)) > tolerance:
return False # Replay attack protection
signed_payload = f"{timestamp}.{payload.decode()}".encode()
expected = hmac.new(
WEBHOOK_SECRET.encode(),
signed_payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
GitHub verification
def verify_github_signature(payload: bytes, signature: str, secret: str) -> bool:
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
Always use hmac.compare_digest instead of == to prevent timing attacks.
Idempotency layer
import redis.asyncio as redis
from datetime import timedelta
redis_client = redis.from_url(os.environ["REDIS_URL"])
async def is_duplicate(event_id: str) -> bool:
key = f"webhook:processed:{event_id}"
exists = await redis_client.exists(key)
return bool(exists)
async def mark_processed(event_id: str, ttl_days: int = 7):
key = f"webhook:processed:{event_id}"
await redis_client.setex(key, timedelta(days=ttl_days), "1")
Use Redis with a TTL for the deduplication store. Most webhook providers stop retrying after 24-72 hours, so a 7-day TTL provides a comfortable margin.
For database-backed idempotency (more durable but slower):
from sqlalchemy import Column, String, DateTime, JSON
from sqlalchemy.dialects.postgresql import insert
class WebhookEvent(Base):
__tablename__ = "webhook_events"
event_id = Column(String, primary_key=True)
event_type = Column(String, nullable=False)
payload = Column(JSON, nullable=False)
received_at = Column(DateTime, default=datetime.utcnow)
processed_at = Column(DateTime, nullable=True)
status = Column(String, default="pending")
async def store_raw_event(event_id: str, event_type: str, payload: dict):
stmt = insert(WebhookEvent).values(
event_id=event_id,
event_type=event_type,
payload=payload,
).on_conflict_do_nothing(index_elements=["event_id"])
await session.execute(stmt)
await session.commit()
The on_conflict_do_nothing ensures duplicate inserts are silently ignored.
Event routing and processing
from typing import Callable, Awaitable
EventHandler = Callable[[dict], Awaitable[None]]
class WebhookRouter:
def __init__(self):
self._handlers: dict[str, EventHandler] = {}
def register(self, event_type: str):
def decorator(func: EventHandler):
self._handlers[event_type] = func
return func
return decorator
async def dispatch(self, event_type: str, payload: dict):
handler = self._handlers.get(event_type)
if handler:
await handler(payload)
else:
logger.warning(f"No handler for event type: {event_type}")
router = WebhookRouter()
@router.register("payment.succeeded")
async def handle_payment_success(payload: dict):
order_id = payload["data"]["object"]["metadata"]["order_id"]
await fulfill_order(order_id)
await send_receipt_email(payload["data"]["object"]["receipt_email"])
@router.register("customer.subscription.deleted")
async def handle_subscription_cancelled(payload: dict):
customer_id = payload["data"]["object"]["customer"]
await revoke_access(customer_id)
await send_cancellation_email(customer_id)
Async processing with task queues
For reliable background processing, use Celery or a similar task queue:
from celery import Celery
celery_app = Celery("webhooks", broker=os.environ["CELERY_BROKER_URL"])
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
acks_late=True,
)
def process_webhook_event(self, event_id: str, event_type: str):
try:
event = db.get_event(event_id)
if event.status == "processed":
return # Already handled (idempotency)
router.dispatch_sync(event_type, event.payload)
event.status = "processed"
event.processed_at = datetime.utcnow()
db.commit()
except ExternalServiceError as exc:
self.retry(exc=exc)
except Exception:
event.status = "failed"
db.commit()
raise
async def enqueue_processing(event_id: str, event_type: str):
process_webhook_event.delay(event_id, event_type)
acks_late=True ensures the task is re-queued if the worker dies mid-processing.
Handling provider retries
Webhook providers retry with exponential backoff when they do not receive a 2xx response. Typical retry schedules:
| Provider | Retry attempts | Backoff |
|---|---|---|
| Stripe | 3 over 24 hours | Exponential |
| GitHub | 3 immediate, then hourly | Linear then fixed |
| Shopify | 19 over 48 hours | Exponential |
| Twilio | 1 retry after 5 minutes | Fixed |
Your endpoint should:
- Always return 200 for valid, verified requests — even if processing fails.
- Return 401 for invalid signatures (provider will not retry auth failures).
- Never return 500 for transient errors — store the event and process later.
Security hardening
from fastapi import Depends
from starlette.middleware.trustedhost import TrustedHostMiddleware
# 1. Restrict to HTTPS
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["api.yoursite.com"])
# 2. IP allowlisting (optional, if provider publishes IPs)
ALLOWED_IPS = {"54.187.174.169", "54.187.205.235"} # Example Stripe IPs
async def verify_source_ip(request: Request):
client_ip = request.client.host
forwarded = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
source_ip = forwarded or client_ip
if ALLOWED_IPS and source_ip not in ALLOWED_IPS:
raise HTTPException(status_code=403, detail="Forbidden")
# 3. Request size limiting
@app.middleware("http")
async def limit_request_size(request: Request, call_next):
content_length = request.headers.get("content-length")
if content_length and int(content_length) > 1_000_000: # 1MB max
raise HTTPException(status_code=413, detail="Payload too large")
return await call_next(request)
Monitoring and alerting
import logging
from prometheus_client import Counter, Histogram
webhook_received = Counter(
"webhooks_received_total",
"Total webhook events received",
["provider", "event_type", "status"]
)
webhook_processing_duration = Histogram(
"webhook_processing_seconds",
"Time to process webhook events",
["provider", "event_type"]
)
logger = logging.getLogger("webhooks")
async def process_with_metrics(provider: str, event_type: str, event_id: str):
with webhook_processing_duration.labels(provider, event_type).time():
try:
await router.dispatch(event_type, await get_event_payload(event_id))
webhook_received.labels(provider, event_type, "success").inc()
except Exception as exc:
webhook_received.labels(provider, event_type, "error").inc()
logger.error(f"Webhook processing failed: {event_id}", exc_info=exc)
raise
Alert on:
- Spike in 401 responses — potential signature key rotation or attack.
- Processing queue depth growing — workers cannot keep up.
- Duplicate rate exceeding 20% — provider may be retrying excessively (check endpoint response times).
- Unknown event types — provider may have added new events you need to handle.
Testing webhook handlers
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_valid_webhook_accepted():
payload = {"id": "evt_123", "type": "payment.succeeded", "data": {}}
body = json.dumps(payload).encode()
signature = compute_test_signature(body)
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/webhooks/stripe",
content=body,
headers={
"stripe-signature": signature,
"content-type": "application/json",
},
)
assert response.status_code == 200
@pytest.mark.asyncio
async def test_invalid_signature_rejected():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/webhooks/stripe",
json={"id": "evt_456", "type": "test"},
headers={"stripe-signature": "invalid"},
)
assert response.status_code == 401
@pytest.mark.asyncio
async def test_duplicate_event_idempotent():
payload = {"id": "evt_789", "type": "payment.succeeded", "data": {}}
body = json.dumps(payload).encode()
signature = compute_test_signature(body)
async with AsyncClient(app=app, base_url="http://test") as client:
r1 = await client.post("/webhooks/stripe", content=body,
headers={"stripe-signature": signature})
r2 = await client.post("/webhooks/stripe", content=body,
headers={"stripe-signature": signature})
assert r1.status_code == 200
assert r2.json()["status"] == "already_processed"
One thing to remember: A production webhook handler is a verified, idempotent, async event intake system. Verify the signature to block forgeries, deduplicate by event ID to handle retries, store the raw event for durability, return 200 immediately, and dispatch processing to a background queue. This pattern scales from handling 10 events per day to 10 million.
See Also
- Python Api Rate Limit Handling Why APIs tell your Python program to slow down, and how to handle it gracefully — explained so anyone can follow along.
- Python Proxy Rotation Why Python programs disguise their internet address when collecting data, and how proxy rotation works — explained without any tech jargon.
- Python Sse Client Consumption How Python programs listen to live data streams from servers — like a radio that never stops playing — explained for complete beginners.
- Python Web Scraping Ethics When is it okay to collect data from websites with Python, and when does it cross the line? The rules explained for everyone.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.