Python API Caching Layers — Deep Dive
Technical foundation
Production caching is not just “put data in Redis.” It involves choosing eviction policies, preventing thundering herds, handling cache consistency across services, and instrumenting hit rates to know whether your cache is actually helping. This deep dive covers the patterns that separate toy caching from production caching.
In-process caching with async support
Python’s lru_cache works only with synchronous functions. For async APIs, use cachetools with manual management or aiocache:
from cachetools import TTLCache
import asyncio
class AsyncTTLCache:
def __init__(self, maxsize: int = 1024, ttl: int = 300):
self._cache = TTLCache(maxsize=maxsize, ttl=ttl)
self._locks: dict[str, asyncio.Lock] = {}
async def get_or_set(self, key: str, factory):
if key in self._cache:
return self._cache[key]
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
# Double-check after acquiring lock
if key in self._cache:
return self._cache[key]
value = await factory()
self._cache[key] = value
return value
product_cache = AsyncTTLCache(maxsize=5000, ttl=120)
async def get_product(product_id: int):
return await product_cache.get_or_set(
f"product:{product_id}",
lambda: db.get_product(product_id),
)
The lock prevents the thundering herd problem: when a popular cache entry expires, only one coroutine fetches the new value while others wait. Without the lock, 100 concurrent requests for the same expired key would all hit the database simultaneously.
Redis caching patterns
Cache-aside (lazy loading)
The most common pattern — application checks cache first, falls back to database:
import redis.asyncio as aioredis
import orjson
redis = aioredis.Redis(host="localhost", port=6379, decode_responses=False)
async def get_user(user_id: int) -> dict:
key = f"user:{user_id}"
cached = await redis.get(key)
if cached:
return orjson.loads(cached)
user = await db.get_user(user_id)
if user:
await redis.setex(key, 600, orjson.dumps(user))
return user
Write-through cache
Updates write to both cache and database atomically:
async def update_user(user_id: int, data: dict) -> dict:
user = await db.update_user(user_id, data)
key = f"user:{user_id}"
await redis.setex(key, 600, orjson.dumps(user))
return user
The risk: if the cache write fails after the database write succeeds, the cache holds stale data. Mitigate by wrapping in a try/except that deletes the cache key on failure.
Cache stampede prevention with probabilistic early expiration
Even with TTLs, many keys expiring simultaneously (e.g., after a deployment) cause a stampede. The “probabilistic early expiration” pattern recomputes values slightly before they expire:
import random
import time
async def get_with_early_recompute(key: str, factory, ttl: int = 600, beta: float = 1.0):
raw = await redis.get(key)
if raw:
data = orjson.loads(raw)
expiry = data.get("_expiry", 0)
delta = data.get("_delta", 0)
# Probabilistically recompute before expiry
now = time.time()
if now - delta * beta * random.random() < expiry:
return data["value"]
start = time.time()
value = await factory()
delta = time.time() - start
cache_data = {
"value": value,
"_expiry": time.time() + ttl,
"_delta": delta,
}
await redis.setex(key, ttl + 60, orjson.dumps(cache_data))
return value
This spreads recomputation across time, preventing all servers from fetching the same data simultaneously.
Redis pipeline for batch cache operations
Fetching multiple cache keys individually is slow. Use pipelines:
async def get_products_batch(product_ids: list[int]) -> list[dict]:
keys = [f"product:{pid}" for pid in product_ids]
async with redis.pipeline() as pipe:
for key in keys:
pipe.get(key)
results = await pipe.execute()
products = []
missing_ids = []
for pid, cached in zip(product_ids, results):
if cached:
products.append(orjson.loads(cached))
else:
missing_ids.append(pid)
products.append(None)
if missing_ids:
db_products = await db.get_products(missing_ids)
async with redis.pipeline() as pipe:
for product in db_products:
pipe.setex(f"product:{product['id']}", 600, orjson.dumps(product))
await pipe.execute()
# Fill in the gaps
db_map = {p["id"]: p for p in db_products}
products = [p if p else db_map.get(pid) for p, pid in zip(products, product_ids)]
return products
This makes exactly 2 Redis round-trips and 1 database query regardless of how many products are requested.
HTTP conditional requests
Implement ETag and If-None-Match for bandwidth-efficient caching:
import hashlib
from fastapi import Request, Response
@app.get("/products/{product_id}")
async def get_product(product_id: int, request: Request, response: Response):
product = await cached_get_product(product_id)
# Generate ETag from content
content_bytes = orjson.dumps(product)
etag = f'"{hashlib.md5(content_bytes).hexdigest()}"'
# Check if client has current version
if_none_match = request.headers.get("if-none-match")
if if_none_match == etag:
return Response(status_code=304) # Not Modified, zero payload
response.headers["ETag"] = etag
response.headers["Cache-Control"] = "public, max-age=60"
return product
A 304 response saves bandwidth and client-side parsing time. The client reuses its local copy.
Stale-while-revalidate pattern
Serve stale data immediately while refreshing in the background:
async def get_with_stale_refresh(key: str, factory, ttl: int = 300, stale_ttl: int = 600):
raw = await redis.get(key)
if raw:
data = orjson.loads(raw)
age = time.time() - data["_stored_at"]
if age < ttl:
return data["value"] # Fresh
if age < stale_ttl:
# Return stale data, refresh in background
asyncio.create_task(refresh_cache(key, factory, ttl, stale_ttl))
return data["value"] # Stale but acceptable
# Cache miss — fetch synchronously
return await refresh_cache(key, factory, ttl, stale_ttl)
async def refresh_cache(key: str, factory, ttl: int, stale_ttl: int):
value = await factory()
cache_data = {"value": value, "_stored_at": time.time()}
await redis.setex(key, stale_ttl + 60, orjson.dumps(cache_data))
return value
HTTP supports this natively: Cache-Control: max-age=60, stale-while-revalidate=300 tells the browser to serve stale content for up to 300 extra seconds while fetching fresh data in the background.
Cache invalidation across services
In microservice architectures, one service updates data and other services cache it. Event-driven invalidation keeps caches consistent:
# Publishing service (writes data)
async def update_product(product_id: int, data: dict):
await db.update_product(product_id, data)
await redis.publish("cache_invalidation", orjson.dumps({
"entity": "product",
"id": product_id,
"action": "updated",
}))
# Consuming service (caches data)
async def listen_for_invalidations():
pubsub = redis.pubsub()
await pubsub.subscribe("cache_invalidation")
async for message in pubsub.listen():
if message["type"] == "message":
event = orjson.loads(message["data"])
key = f"{event['entity']}:{event['id']}"
await redis.delete(key)
local_cache.pop(key, None)
Monitoring cache effectiveness
A cache with a 10% hit rate is wasted infrastructure. Track:
from prometheus_client import Counter
cache_hits = Counter("cache_hits_total", "Cache hits", ["layer", "entity"])
cache_misses = Counter("cache_misses_total", "Cache misses", ["layer", "entity"])
async def get_product_instrumented(product_id: int):
# Check in-process cache
local = local_cache.get(f"product:{product_id}")
if local:
cache_hits.labels(layer="local", entity="product").inc()
return local
cache_misses.labels(layer="local", entity="product").inc()
# Check Redis
cached = await redis.get(f"product:{product_id}")
if cached:
cache_hits.labels(layer="redis", entity="product").inc()
return orjson.loads(cached)
cache_misses.labels(layer="redis", entity="product").inc()
# Database fallback
return await db.get_product(product_id)
Target hit rates: in-process cache >90% for hot data, Redis >80% for entity caches. Below these thresholds, reconsider your TTL, cache key design, or whether the data is actually cacheable.
Tradeoffs
Every caching layer adds complexity: more infrastructure to operate, more failure modes to handle, and more stale data risk. The payoff is response times measured in microseconds instead of milliseconds and database load reduced by orders of magnitude. Start with the simplest layer that solves your bottleneck, measure its effectiveness, and add layers only when data justifies the complexity.
The one thing to remember: Production caching requires stampede prevention (locks or probabilistic early expiration), event-driven invalidation for consistency, per-layer hit rate monitoring, and the discipline to measure before adding complexity.
See Also
- Python Api Authentication Comparison API keys, JWTs, OAuth, and sessions — four ways Python APIs verify who is knocking at the door.
- Python Api Error Handling Standards Why good error messages from your Python API are like clear road signs — they tell callers exactly what went wrong and what to do next.
- Python Api Load Testing Testing how many people your Python API can handle at once — like stress-testing a bridge before opening it to traffic.
- Python Api Monitoring Observability How Python APIs keep track of their own health — like a car dashboard that warns you before the engine overheats.
- Python Request Validation Patterns How Python APIs check incoming data before trusting it — like a bouncer checking IDs at the door.