Python Webhook Handlers — Deep Dive

System-level framing

A production webhook handler is a high-availability HTTP endpoint that receives event notifications from external services, verifies their authenticity, deduplicates deliveries, and dispatches processing to background workers. The handler must be fast (respond in under 5 seconds), resilient (never lose events), and idempotent (handle retries gracefully). It sits at the boundary between your system and external services, making it both a security surface and a reliability chokepoint.

FastAPI webhook handler structure

from fastapi import FastAPI, Request, HTTPException, Header, BackgroundTasks
from typing import Optional
import hashlib
import hmac
import json
import os

app = FastAPI()

WEBHOOK_SECRET = os.environ["WEBHOOK_SECRET"]

@app.post("/webhooks/stripe")
async def stripe_webhook(
    request: Request,
    stripe_signature: Optional[str] = Header(None, alias="stripe-signature"),
):
    body = await request.body()

    if not verify_stripe_signature(body, stripe_signature):
        raise HTTPException(status_code=401, detail="Invalid signature")

    payload = json.loads(body)
    event_id = payload.get("id")
    event_type = payload.get("type")

    if await is_duplicate(event_id):
        return {"status": "already_processed"}

    await store_raw_event(event_id, event_type, payload)
    await enqueue_processing(event_id, event_type)

    return {"status": "accepted"}

The handler follows the accept-and-defer pattern: verify, deduplicate, store, enqueue, respond. Processing happens elsewhere.

HMAC signature verification

Different services use different signing schemes. Here are the most common:

Generic HMAC-SHA256

def verify_hmac_sha256(payload: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

Stripe verification

Stripe uses a timestamp + payload scheme to prevent replay attacks:

import time

def verify_stripe_signature(
    payload: bytes, sig_header: str, tolerance: int = 300
) -> bool:
    if not sig_header:
        return False

    elements = dict(
        pair.split("=", 1) for pair in sig_header.split(",")
    )
    timestamp = elements.get("t")
    signature = elements.get("v1")

    if not timestamp or not signature:
        return False

    if abs(time.time() - int(timestamp)) > tolerance:
        return False  # Replay attack protection

    signed_payload = f"{timestamp}.{payload.decode()}".encode()
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        signed_payload,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected, signature)

GitHub verification

def verify_github_signature(payload: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

Always use hmac.compare_digest instead of == to prevent timing attacks.

Idempotency layer

import redis.asyncio as redis
from datetime import timedelta

redis_client = redis.from_url(os.environ["REDIS_URL"])

async def is_duplicate(event_id: str) -> bool:
    key = f"webhook:processed:{event_id}"
    exists = await redis_client.exists(key)
    return bool(exists)

async def mark_processed(event_id: str, ttl_days: int = 7):
    key = f"webhook:processed:{event_id}"
    await redis_client.setex(key, timedelta(days=ttl_days), "1")

Use Redis with a TTL for the deduplication store. Most webhook providers stop retrying after 24-72 hours, so a 7-day TTL provides a comfortable margin.

For database-backed idempotency (more durable but slower):

from sqlalchemy import Column, String, DateTime, JSON
from sqlalchemy.dialects.postgresql import insert

class WebhookEvent(Base):
    __tablename__ = "webhook_events"
    event_id = Column(String, primary_key=True)
    event_type = Column(String, nullable=False)
    payload = Column(JSON, nullable=False)
    received_at = Column(DateTime, default=datetime.utcnow)
    processed_at = Column(DateTime, nullable=True)
    status = Column(String, default="pending")

async def store_raw_event(event_id: str, event_type: str, payload: dict):
    stmt = insert(WebhookEvent).values(
        event_id=event_id,
        event_type=event_type,
        payload=payload,
    ).on_conflict_do_nothing(index_elements=["event_id"])
    await session.execute(stmt)
    await session.commit()

The on_conflict_do_nothing ensures duplicate inserts are silently ignored.

Event routing and processing

from typing import Callable, Awaitable

EventHandler = Callable[[dict], Awaitable[None]]

class WebhookRouter:
    def __init__(self):
        self._handlers: dict[str, EventHandler] = {}

    def register(self, event_type: str):
        def decorator(func: EventHandler):
            self._handlers[event_type] = func
            return func
        return decorator

    async def dispatch(self, event_type: str, payload: dict):
        handler = self._handlers.get(event_type)
        if handler:
            await handler(payload)
        else:
            logger.warning(f"No handler for event type: {event_type}")

router = WebhookRouter()

@router.register("payment.succeeded")
async def handle_payment_success(payload: dict):
    order_id = payload["data"]["object"]["metadata"]["order_id"]
    await fulfill_order(order_id)
    await send_receipt_email(payload["data"]["object"]["receipt_email"])

@router.register("customer.subscription.deleted")
async def handle_subscription_cancelled(payload: dict):
    customer_id = payload["data"]["object"]["customer"]
    await revoke_access(customer_id)
    await send_cancellation_email(customer_id)

Async processing with task queues

For reliable background processing, use Celery or a similar task queue:

from celery import Celery

celery_app = Celery("webhooks", broker=os.environ["CELERY_BROKER_URL"])

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    acks_late=True,
)
def process_webhook_event(self, event_id: str, event_type: str):
    try:
        event = db.get_event(event_id)
        if event.status == "processed":
            return  # Already handled (idempotency)

        router.dispatch_sync(event_type, event.payload)

        event.status = "processed"
        event.processed_at = datetime.utcnow()
        db.commit()
    except ExternalServiceError as exc:
        self.retry(exc=exc)
    except Exception:
        event.status = "failed"
        db.commit()
        raise

async def enqueue_processing(event_id: str, event_type: str):
    process_webhook_event.delay(event_id, event_type)

acks_late=True ensures the task is re-queued if the worker dies mid-processing.

Handling provider retries

Webhook providers retry with exponential backoff when they do not receive a 2xx response. Typical retry schedules:

ProviderRetry attemptsBackoff
Stripe3 over 24 hoursExponential
GitHub3 immediate, then hourlyLinear then fixed
Shopify19 over 48 hoursExponential
Twilio1 retry after 5 minutesFixed

Your endpoint should:

  • Always return 200 for valid, verified requests — even if processing fails.
  • Return 401 for invalid signatures (provider will not retry auth failures).
  • Never return 500 for transient errors — store the event and process later.

Security hardening

from fastapi import Depends
from starlette.middleware.trustedhost import TrustedHostMiddleware

# 1. Restrict to HTTPS
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["api.yoursite.com"])

# 2. IP allowlisting (optional, if provider publishes IPs)
ALLOWED_IPS = {"54.187.174.169", "54.187.205.235"}  # Example Stripe IPs

async def verify_source_ip(request: Request):
    client_ip = request.client.host
    forwarded = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
    source_ip = forwarded or client_ip
    if ALLOWED_IPS and source_ip not in ALLOWED_IPS:
        raise HTTPException(status_code=403, detail="Forbidden")

# 3. Request size limiting
@app.middleware("http")
async def limit_request_size(request: Request, call_next):
    content_length = request.headers.get("content-length")
    if content_length and int(content_length) > 1_000_000:  # 1MB max
        raise HTTPException(status_code=413, detail="Payload too large")
    return await call_next(request)

Monitoring and alerting

import logging
from prometheus_client import Counter, Histogram

webhook_received = Counter(
    "webhooks_received_total",
    "Total webhook events received",
    ["provider", "event_type", "status"]
)

webhook_processing_duration = Histogram(
    "webhook_processing_seconds",
    "Time to process webhook events",
    ["provider", "event_type"]
)

logger = logging.getLogger("webhooks")

async def process_with_metrics(provider: str, event_type: str, event_id: str):
    with webhook_processing_duration.labels(provider, event_type).time():
        try:
            await router.dispatch(event_type, await get_event_payload(event_id))
            webhook_received.labels(provider, event_type, "success").inc()
        except Exception as exc:
            webhook_received.labels(provider, event_type, "error").inc()
            logger.error(f"Webhook processing failed: {event_id}", exc_info=exc)
            raise

Alert on:

  • Spike in 401 responses — potential signature key rotation or attack.
  • Processing queue depth growing — workers cannot keep up.
  • Duplicate rate exceeding 20% — provider may be retrying excessively (check endpoint response times).
  • Unknown event types — provider may have added new events you need to handle.

Testing webhook handlers

import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_valid_webhook_accepted():
    payload = {"id": "evt_123", "type": "payment.succeeded", "data": {}}
    body = json.dumps(payload).encode()
    signature = compute_test_signature(body)

    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.post(
            "/webhooks/stripe",
            content=body,
            headers={
                "stripe-signature": signature,
                "content-type": "application/json",
            },
        )
    assert response.status_code == 200

@pytest.mark.asyncio
async def test_invalid_signature_rejected():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.post(
            "/webhooks/stripe",
            json={"id": "evt_456", "type": "test"},
            headers={"stripe-signature": "invalid"},
        )
    assert response.status_code == 401

@pytest.mark.asyncio
async def test_duplicate_event_idempotent():
    payload = {"id": "evt_789", "type": "payment.succeeded", "data": {}}
    body = json.dumps(payload).encode()
    signature = compute_test_signature(body)

    async with AsyncClient(app=app, base_url="http://test") as client:
        r1 = await client.post("/webhooks/stripe", content=body,
                                headers={"stripe-signature": signature})
        r2 = await client.post("/webhooks/stripe", content=body,
                                headers={"stripe-signature": signature})

    assert r1.status_code == 200
    assert r2.json()["status"] == "already_processed"

One thing to remember: A production webhook handler is a verified, idempotent, async event intake system. Verify the signature to block forgeries, deduplicate by event ID to handle retries, store the raw event for durability, return 200 immediately, and dispatch processing to a background queue. This pattern scales from handling 10 events per day to 10 million.

pythonwebhooksapievents

See Also

  • Python Api Rate Limit Handling Why APIs tell your Python program to slow down, and how to handle it gracefully — explained so anyone can follow along.
  • Python Proxy Rotation Why Python programs disguise their internet address when collecting data, and how proxy rotation works — explained without any tech jargon.
  • Python Sse Client Consumption How Python programs listen to live data streams from servers — like a radio that never stops playing — explained for complete beginners.
  • Python Web Scraping Ethics When is it okay to collect data from websites with Python, and when does it cross the line? The rules explained for everyone.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.