Python API Caching Layers — Deep Dive

Technical foundation

Production caching is not just “put data in Redis.” It involves choosing eviction policies, preventing thundering herds, handling cache consistency across services, and instrumenting hit rates to know whether your cache is actually helping. This deep dive covers the patterns that separate toy caching from production caching.

In-process caching with async support

Python’s lru_cache works only with synchronous functions. For async APIs, use cachetools with manual management or aiocache:

from cachetools import TTLCache
import asyncio

class AsyncTTLCache:
    def __init__(self, maxsize: int = 1024, ttl: int = 300):
        self._cache = TTLCache(maxsize=maxsize, ttl=ttl)
        self._locks: dict[str, asyncio.Lock] = {}

    async def get_or_set(self, key: str, factory):
        if key in self._cache:
            return self._cache[key]
        
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()
        
        async with self._locks[key]:
            # Double-check after acquiring lock
            if key in self._cache:
                return self._cache[key]
            value = await factory()
            self._cache[key] = value
            return value

product_cache = AsyncTTLCache(maxsize=5000, ttl=120)

async def get_product(product_id: int):
    return await product_cache.get_or_set(
        f"product:{product_id}",
        lambda: db.get_product(product_id),
    )

The lock prevents the thundering herd problem: when a popular cache entry expires, only one coroutine fetches the new value while others wait. Without the lock, 100 concurrent requests for the same expired key would all hit the database simultaneously.

Redis caching patterns

Cache-aside (lazy loading)

The most common pattern — application checks cache first, falls back to database:

import redis.asyncio as aioredis
import orjson

redis = aioredis.Redis(host="localhost", port=6379, decode_responses=False)

async def get_user(user_id: int) -> dict:
    key = f"user:{user_id}"
    cached = await redis.get(key)
    if cached:
        return orjson.loads(cached)
    
    user = await db.get_user(user_id)
    if user:
        await redis.setex(key, 600, orjson.dumps(user))
    return user

Write-through cache

Updates write to both cache and database atomically:

async def update_user(user_id: int, data: dict) -> dict:
    user = await db.update_user(user_id, data)
    key = f"user:{user_id}"
    await redis.setex(key, 600, orjson.dumps(user))
    return user

The risk: if the cache write fails after the database write succeeds, the cache holds stale data. Mitigate by wrapping in a try/except that deletes the cache key on failure.

Cache stampede prevention with probabilistic early expiration

Even with TTLs, many keys expiring simultaneously (e.g., after a deployment) cause a stampede. The “probabilistic early expiration” pattern recomputes values slightly before they expire:

import random
import time

async def get_with_early_recompute(key: str, factory, ttl: int = 600, beta: float = 1.0):
    raw = await redis.get(key)
    if raw:
        data = orjson.loads(raw)
        expiry = data.get("_expiry", 0)
        delta = data.get("_delta", 0)
        
        # Probabilistically recompute before expiry
        now = time.time()
        if now - delta * beta * random.random() < expiry:
            return data["value"]
    
    start = time.time()
    value = await factory()
    delta = time.time() - start
    
    cache_data = {
        "value": value,
        "_expiry": time.time() + ttl,
        "_delta": delta,
    }
    await redis.setex(key, ttl + 60, orjson.dumps(cache_data))
    return value

This spreads recomputation across time, preventing all servers from fetching the same data simultaneously.

Redis pipeline for batch cache operations

Fetching multiple cache keys individually is slow. Use pipelines:

async def get_products_batch(product_ids: list[int]) -> list[dict]:
    keys = [f"product:{pid}" for pid in product_ids]
    
    async with redis.pipeline() as pipe:
        for key in keys:
            pipe.get(key)
        results = await pipe.execute()
    
    products = []
    missing_ids = []
    for pid, cached in zip(product_ids, results):
        if cached:
            products.append(orjson.loads(cached))
        else:
            missing_ids.append(pid)
            products.append(None)
    
    if missing_ids:
        db_products = await db.get_products(missing_ids)
        async with redis.pipeline() as pipe:
            for product in db_products:
                pipe.setex(f"product:{product['id']}", 600, orjson.dumps(product))
            await pipe.execute()
        
        # Fill in the gaps
        db_map = {p["id"]: p for p in db_products}
        products = [p if p else db_map.get(pid) for p, pid in zip(products, product_ids)]
    
    return products

This makes exactly 2 Redis round-trips and 1 database query regardless of how many products are requested.

HTTP conditional requests

Implement ETag and If-None-Match for bandwidth-efficient caching:

import hashlib
from fastapi import Request, Response

@app.get("/products/{product_id}")
async def get_product(product_id: int, request: Request, response: Response):
    product = await cached_get_product(product_id)
    
    # Generate ETag from content
    content_bytes = orjson.dumps(product)
    etag = f'"{hashlib.md5(content_bytes).hexdigest()}"'
    
    # Check if client has current version
    if_none_match = request.headers.get("if-none-match")
    if if_none_match == etag:
        return Response(status_code=304)  # Not Modified, zero payload
    
    response.headers["ETag"] = etag
    response.headers["Cache-Control"] = "public, max-age=60"
    return product

A 304 response saves bandwidth and client-side parsing time. The client reuses its local copy.

Stale-while-revalidate pattern

Serve stale data immediately while refreshing in the background:

async def get_with_stale_refresh(key: str, factory, ttl: int = 300, stale_ttl: int = 600):
    raw = await redis.get(key)
    if raw:
        data = orjson.loads(raw)
        age = time.time() - data["_stored_at"]
        
        if age < ttl:
            return data["value"]  # Fresh
        
        if age < stale_ttl:
            # Return stale data, refresh in background
            asyncio.create_task(refresh_cache(key, factory, ttl, stale_ttl))
            return data["value"]  # Stale but acceptable
    
    # Cache miss — fetch synchronously
    return await refresh_cache(key, factory, ttl, stale_ttl)

async def refresh_cache(key: str, factory, ttl: int, stale_ttl: int):
    value = await factory()
    cache_data = {"value": value, "_stored_at": time.time()}
    await redis.setex(key, stale_ttl + 60, orjson.dumps(cache_data))
    return value

HTTP supports this natively: Cache-Control: max-age=60, stale-while-revalidate=300 tells the browser to serve stale content for up to 300 extra seconds while fetching fresh data in the background.

Cache invalidation across services

In microservice architectures, one service updates data and other services cache it. Event-driven invalidation keeps caches consistent:

# Publishing service (writes data)
async def update_product(product_id: int, data: dict):
    await db.update_product(product_id, data)
    await redis.publish("cache_invalidation", orjson.dumps({
        "entity": "product",
        "id": product_id,
        "action": "updated",
    }))

# Consuming service (caches data)
async def listen_for_invalidations():
    pubsub = redis.pubsub()
    await pubsub.subscribe("cache_invalidation")
    async for message in pubsub.listen():
        if message["type"] == "message":
            event = orjson.loads(message["data"])
            key = f"{event['entity']}:{event['id']}"
            await redis.delete(key)
            local_cache.pop(key, None)

Monitoring cache effectiveness

A cache with a 10% hit rate is wasted infrastructure. Track:

from prometheus_client import Counter

cache_hits = Counter("cache_hits_total", "Cache hits", ["layer", "entity"])
cache_misses = Counter("cache_misses_total", "Cache misses", ["layer", "entity"])

async def get_product_instrumented(product_id: int):
    # Check in-process cache
    local = local_cache.get(f"product:{product_id}")
    if local:
        cache_hits.labels(layer="local", entity="product").inc()
        return local
    cache_misses.labels(layer="local", entity="product").inc()
    
    # Check Redis
    cached = await redis.get(f"product:{product_id}")
    if cached:
        cache_hits.labels(layer="redis", entity="product").inc()
        return orjson.loads(cached)
    cache_misses.labels(layer="redis", entity="product").inc()
    
    # Database fallback
    return await db.get_product(product_id)

Target hit rates: in-process cache >90% for hot data, Redis >80% for entity caches. Below these thresholds, reconsider your TTL, cache key design, or whether the data is actually cacheable.

Tradeoffs

Every caching layer adds complexity: more infrastructure to operate, more failure modes to handle, and more stale data risk. The payoff is response times measured in microseconds instead of milliseconds and database load reduced by orders of magnitude. Start with the simplest layer that solves your bottleneck, measure its effectiveness, and add layers only when data justifies the complexity.

The one thing to remember: Production caching requires stampede prevention (locks or probabilistic early expiration), event-driven invalidation for consistency, per-layer hit rate monitoring, and the discipline to measure before adding complexity.

pythonapicachingperformanceredis

See Also