ETag Caching — Deep Dive

ETag middleware in FastAPI

A middleware approach computes ETags automatically for all GET responses:

from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import hashlib

app = FastAPI()

class ETagMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        if request.method != "GET" or response.status_code != 200:
            return response

        # Read response body
        body = b""
        async for chunk in response.body_iterator:
            body += chunk if isinstance(chunk, bytes) else chunk.encode()

        # Generate ETag from content hash
        etag = f'"{hashlib.md5(body).hexdigest()}"'

        # Check If-None-Match
        if_none_match = request.headers.get("if-none-match")
        if if_none_match and if_none_match == etag:
            return Response(status_code=304, headers={"ETag": etag})

        return Response(
            content=body,
            status_code=response.status_code,
            headers={**dict(response.headers), "ETag": etag},
            media_type=response.media_type,
        )

app.add_middleware(ETagMiddleware)

This middleware has a limitation: it reads the entire response body to compute the hash, which means the server still does all the work. The 304 saves bandwidth, not computation.

Efficient ETags with version counters

To skip computation entirely, use a version field in your database:

from sqlalchemy import Column, Integer, String, DateTime, event
from sqlalchemy.orm import Session

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    email = Column(String)
    version = Column(Integer, default=1, nullable=False)
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

@event.listens_for(User, "before_update")
def increment_version(mapper, connection, target):
    target.version += 1

Now the endpoint can check the ETag without building the full response:

from fastapi import FastAPI, Header, HTTPException

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    if_none_match: str | None = Header(None),
    db: Session = Depends(get_db),
):
    # Quick version check — no serialization needed
    if if_none_match:
        current_version = db.query(User.version).filter(
            User.id == user_id
        ).scalar()

        if current_version is not None:
            etag = f'"user-{user_id}-v{current_version}"'
            if if_none_match == etag:
                return Response(status_code=304, headers={"ETag": etag})

    # Full fetch only if ETag doesn't match
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404)

    etag = f'"user-{user.id}-v{user.version}"'
    return Response(
        content=UserResponse.from_orm(user).json(),
        media_type="application/json",
        headers={"ETag": etag},
    )

The version check query (SELECT version FROM users WHERE id = ?) is far cheaper than a full select with joins and serialization. For a user profile with 15 fields and 3 joined tables, this can reduce response time from 50ms to 2ms on cache hits.

Optimistic concurrency with If-Match

Preventing lost updates using ETags:

@app.put("/users/{user_id}")
async def update_user(
    user_id: int,
    user_data: UserUpdate,
    if_match: str = Header(...),  # Required for updates
    db: Session = Depends(get_db),
):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404)

    expected_etag = f'"user-{user.id}-v{user.version}"'
    if if_match != expected_etag:
        raise HTTPException(
            status_code=412,
            detail={
                "error": "PRECONDITION_FAILED",
                "message": "Resource has been modified. Refetch and retry.",
                "current_etag": expected_etag,
            },
        )

    # Safe to update — no concurrent modification
    for field, value in user_data.dict(exclude_unset=True).items():
        setattr(user, field, value)

    db.commit()
    db.refresh(user)

    new_etag = f'"user-{user.id}-v{user.version}"'
    return Response(
        content=UserResponse.from_orm(user).json(),
        media_type="application/json",
        headers={"ETag": new_etag},
    )

The If-Match header is required (not optional) to enforce that clients must acknowledge the version they’re updating. This prevents accidental overwrites from clients that don’t implement conditional requests.

Race condition protection

The version check above has a TOCTOU (time-of-check-time-of-use) race condition. Between checking the version and committing the update, another request could modify the row. Fix this with a database-level atomic check:

from sqlalchemy import update

@app.put("/users/{user_id}")
async def update_user_safe(
    user_id: int,
    user_data: UserUpdate,
    if_match: str = Header(...),
    db: Session = Depends(get_db),
):
    # Extract version from ETag
    expected_version = int(if_match.strip('"').split("-v")[1])

    # Atomic update: only succeeds if version matches
    stmt = (
        update(User)
        .where(User.id == user_id, User.version == expected_version)
        .values(
            **user_data.dict(exclude_unset=True),
            version=expected_version + 1,
        )
        .returning(User)
    )

    result = db.execute(stmt)
    updated = result.fetchone()

    if updated is None:
        # Either user doesn't exist or version mismatch
        existing = db.query(User.version).filter(User.id == user_id).scalar()
        if existing is None:
            raise HTTPException(status_code=404)
        raise HTTPException(status_code=412, detail="Resource modified concurrently")

    db.commit()
    new_etag = f'"user-{user_id}-v{expected_version + 1}"'
    return Response(
        content=json.dumps(dict(updated)),
        media_type="application/json",
        headers={"ETag": new_etag},
    )

The WHERE version = expected_version clause makes the update atomic at the database level. If another transaction incremented the version between the read and write, the WHERE clause matches zero rows.

Django ETag support

Django has built-in ETag middleware:

# settings.py
MIDDLEWARE = [
    'django.middleware.http.ConditionalGetMiddleware',
    # ... other middleware
]

# This middleware automatically:
# 1. Computes ETags from response content (MD5 hash)
# 2. Handles If-None-Match → 304 responses
# 3. Handles If-Modified-Since → 304 responses

For view-level control, use the condition decorator:

from django.views.decorators.http import condition
from django.http import JsonResponse

def user_etag(request, user_id):
    """Compute ETag without building full response."""
    try:
        version = User.objects.values_list("version", flat=True).get(id=user_id)
        return f'"user-{user_id}-v{version}"'
    except User.DoesNotExist:
        return None

def user_last_modified(request, user_id):
    try:
        return User.objects.values_list("updated_at", flat=True).get(id=user_id)
    except User.DoesNotExist:
        return None

@condition(etag_func=user_etag, last_modified_func=user_last_modified)
def user_detail(request, user_id):
    user = User.objects.get(id=user_id)
    return JsonResponse({
        "id": user.id,
        "name": user.name,
        "email": user.email,
    })

Django’s condition decorator is elegant: the etag_func runs first. If the ETag matches If-None-Match, Django returns 304 without ever calling the view function. This gives you computation savings, not just bandwidth savings.

ETags for collection endpoints

Generating ETags for list endpoints requires a different strategy. You can’t hash every item — that defeats the purpose. Options:

@app.get("/users")
async def list_users(
    if_none_match: str | None = Header(None),
    db: Session = Depends(get_db),
):
    # Option 1: Use max version across all users
    max_version = db.query(func.max(User.version)).scalar() or 0
    count = db.query(func.count(User.id)).scalar()
    collection_etag = f'"users-v{max_version}-n{count}"'

    if if_none_match == collection_etag:
        return Response(status_code=304, headers={"ETag": collection_etag})

    # Only fetch if ETag doesn't match
    users = db.query(User).all()
    return Response(
        content=json.dumps([UserResponse.from_orm(u).dict() for u in users]),
        media_type="application/json",
        headers={"ETag": collection_etag},
    )

The composite ETag (max_version + count) changes whenever any user is updated, added, or deleted. It’s not perfectly precise (updating user A and then user B generates two ETags even though separate queries might return the same list), but it’s fast and correct enough for most use cases.

Cache-Control interaction

ETags work best alongside Cache-Control headers:

@app.get("/products/{product_id}")
async def get_product(product_id: int, if_none_match: str | None = Header(None)):
    product = await fetch_product(product_id)
    etag = f'"product-{product.id}-v{product.version}"'

    if if_none_match == etag:
        return Response(
            status_code=304,
            headers={
                "ETag": etag,
                "Cache-Control": "private, max-age=60",
            },
        )

    return Response(
        content=product.json(),
        media_type="application/json",
        headers={
            "ETag": etag,
            "Cache-Control": "private, max-age=60",
        },
    )

max-age=60 tells the browser “don’t even ask for 60 seconds.” After that, the browser sends a conditional request with If-None-Match. This combination minimizes both requests (for 60 seconds) and bandwidth (after 60 seconds, only re-download if changed).

For shared caches (CDNs):

Cache-Control: public, max-age=300, stale-while-revalidate=60

This tells the CDN to serve the cached version for 5 minutes, then revalidate in the background for another minute while still serving stale content. The ETag ensures revalidation is cheap when nothing changed.

Benchmarking ETag savings

Measure the real impact of ETag caching:

import httpx
import time

async def measure_etag_savings(url: str, iterations: int = 100):
    async with httpx.AsyncClient() as client:
        # First request — full response
        resp = await client.get(url)
        etag = resp.headers.get("etag")
        full_size = len(resp.content)
        full_time = resp.elapsed.total_seconds()

        # Conditional requests
        total_bytes = 0
        total_time = 0
        hits_304 = 0

        for _ in range(iterations):
            start = time.perf_counter()
            resp = await client.get(url, headers={"If-None-Match": etag})
            elapsed = time.perf_counter() - start
            total_time += elapsed
            total_bytes += len(resp.content)
            if resp.status_code == 304:
                hits_304 += 1

        print(f"Full response: {full_size} bytes, {full_time*1000:.1f}ms")
        print(f"304 responses: {hits_304}/{iterations}")
        print(f"Avg conditional: {total_bytes/iterations:.0f} bytes, "
              f"{total_time/iterations*1000:.1f}ms")
        print(f"Bandwidth saved: {(1 - total_bytes/(full_size*iterations))*100:.1f}%")

Typical results for a 5KB API response: 98% bandwidth reduction on cache hits, 40-60% latency reduction (the server skips serialization with version-based ETags).

One thing to remember: The best ETag implementation avoids computing the full response on cache hits — use version counters or timestamps that can be checked with a lightweight database query before doing expensive work.

pythonwebhttpcachingperformancefastapidjango

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.