Python aiocache Caching — Deep Dive
The Cache Stampede Problem
The most dangerous caching failure mode occurs when a popular cached value expires and hundreds of concurrent requests simultaneously try to regenerate it:
- Cache key “popular_products” expires at t=0
- 200 requests arrive at t=0.001
- All 200 find the cache empty
- All 200 hit the database simultaneously
- Database buckles under load
Solving with Locking
aiocache doesn’t include built-in stampede protection, but you can implement it:
import asyncio
from aiocache import Cache
class StampedeProtectedCache:
def __init__(self, cache: Cache):
self.cache = cache
self._locks = {}
async def get_or_set(self, key, func, ttl=300):
# Try cache first
value = await self.cache.get(key)
if value is not None:
return value
# Acquire per-key lock
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
# Double-check after acquiring lock
value = await self.cache.get(key)
if value is not None:
return value
# Only one coroutine regenerates
value = await func()
await self.cache.set(key, value, ttl=ttl)
return value
Probabilistic Early Expiration
A more sophisticated approach — randomly regenerate values before they expire:
import random
import time
async def get_with_early_refresh(cache, key, func, ttl=300, beta=1.0):
"""XFetch algorithm — probabilistic early regeneration."""
value, stored_at = await cache.get(key) # Assume stored with timestamp
if value is not None:
age = time.time() - stored_at
remaining = ttl - age
# Probability of regeneration increases as expiry approaches
if remaining > 0 and random.random() > (remaining / ttl) ** beta:
return value # Use cached value
# Regenerate
value = await func()
await cache.set(key, (value, time.time()), ttl=ttl)
return value
Multi-Layer Caching
Production systems often use multiple cache layers:
class MultiLayerCache:
"""L1 (memory) + L2 (Redis) caching."""
def __init__(self):
self.l1 = Cache(Cache.MEMORY)
self.l2 = Cache(Cache.REDIS, endpoint="localhost", port=6379)
async def get(self, key):
# Try L1 first (fastest)
value = await self.l1.get(key)
if value is not None:
return value
# Try L2
value = await self.l2.get(key)
if value is not None:
# Promote to L1
await self.l1.set(key, value, ttl=60)
return value
return None
async def set(self, key, value, ttl=300):
# Write to both layers
await self.l1.set(key, value, ttl=min(ttl, 60)) # Shorter L1 TTL
await self.l2.set(key, value, ttl=ttl)
async def delete(self, key):
await self.l1.delete(key)
await self.l2.delete(key)
L1 (memory) serves high-frequency reads with sub-millisecond latency. L2 (Redis) provides shared caching across application instances and handles L1 misses.
Custom Plugins
aiocache supports a plugin system for cross-cutting concerns:
from aiocache.plugins import BasePlugin
class MetricsPlugin(BasePlugin):
"""Track cache hit/miss rates."""
def __init__(self):
self.hits = 0
self.misses = 0
async def pre_get(self, client, key, **kwargs):
pass
async def post_get(self, client, key, ret=None, **kwargs):
if ret is not None:
self.hits += 1
else:
self.misses += 1
@property
def hit_rate(self):
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
class TimingPlugin(BasePlugin):
"""Log slow cache operations."""
async def pre_get(self, client, key, **kwargs):
kwargs['start_time'] = time.monotonic()
async def post_get(self, client, key, ret=None, **kwargs):
elapsed = time.monotonic() - kwargs.get('start_time', 0)
if elapsed > 0.1: # > 100ms
logger.warning(f"Slow cache get: {key} took {elapsed:.3f}s")
# Apply plugins
cache = Cache(
Cache.REDIS,
plugins=[MetricsPlugin(), TimingPlugin()]
)
Serialization Security
The Pickle Risk
When using PickleSerializer with Redis or Memcached, any process that can write to the cache can execute arbitrary code:
# An attacker who can write to Redis can inject:
import pickle
import os
class Exploit:
def __reduce__(self):
return (os.system, ("rm -rf /",))
# When your app deserializes this cached value... boom.
Mitigations:
- Use
JsonSerializerfor any cache shared with untrusted sources - If you must use Pickle, sign cached values with HMAC
- Restrict Redis/Memcached network access
Custom Serializers
from aiocache.serializers import BaseSerializer
import msgpack
class MsgPackSerializer(BaseSerializer):
"""Faster than JSON, more compact, still safe."""
def dumps(self, value):
return msgpack.packb(value, use_bin_type=True)
def loads(self, value):
if value is None:
return None
return msgpack.unpackb(value, raw=False)
cache = Cache(Cache.REDIS, serializer=MsgPackSerializer())
Cache Warming Strategies
On Startup
async def warm_cache(cache, db):
"""Pre-populate cache with frequently accessed data."""
popular_items = await db.fetch("SELECT * FROM products ORDER BY views DESC LIMIT 100")
for item in popular_items:
await cache.set(
f"product:{item['id']}",
dict(item),
ttl=3600
)
logger.info(f"Warmed cache with {len(popular_items)} products")
Background Refresh
async def background_refresh(cache, key, func, ttl=300, refresh_interval=240):
"""Continuously refresh a cache key before it expires."""
while True:
try:
value = await func()
await cache.set(key, value, ttl=ttl)
except Exception as e:
logger.error(f"Cache refresh failed for {key}: {e}")
await asyncio.sleep(refresh_interval)
Integration with FastAPI
from fastapi import FastAPI, Depends
from aiocache import Cache, cached
app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost")
@app.on_event("shutdown")
async def shutdown():
await cache.close()
def cache_key_from_request(func, request, *args, **kwargs):
return f"{func.__name__}:{request.url.path}:{request.query_params}"
@app.get("/products/{product_id}")
@cached(ttl=300, cache=Cache.REDIS, key_builder=lambda f, *a, **kw: f"product:{kw.get('product_id', a[0])}")
async def get_product(product_id: int):
return await db.fetch_product(product_id)
# Manual cache control
@app.put("/products/{product_id}")
async def update_product(product_id: int, data: dict):
await db.update_product(product_id, data)
await cache.delete(f"product:{product_id}")
return {"status": "updated"}
Monitoring Cache Health
Key metrics to track:
class CacheMonitor:
def __init__(self, cache):
self.cache = cache
self.stats = {
"hits": 0, "misses": 0,
"sets": 0, "deletes": 0,
"errors": 0
}
async def health_check(self):
"""Return cache health metrics."""
try:
await self.cache.set("_health", "ok", ttl=10)
value = await self.cache.get("_health")
healthy = value == "ok"
except Exception:
healthy = False
total = self.stats["hits"] + self.stats["misses"]
return {
"healthy": healthy,
"hit_rate": self.stats["hits"] / total if total > 0 else 0,
"total_operations": sum(self.stats.values()),
"error_rate": self.stats["errors"] / total if total > 0 else 0,
}
Alert thresholds:
- Hit rate below 80% → cache might be too small or TTLs too short
- Error rate above 1% → connection issues with Redis/Memcached
- Latency p99 above 10ms (Redis) → network or Redis overload
Common Anti-Patterns
- Caching everything — only cache data that’s expensive to compute and read more often than it changes
- No TTL — data cached forever becomes stale forever. Always set a TTL, even a long one (24h)
- Cache key collisions — ensure keys include enough context (user ID, locale, version) to prevent cross-contamination
- Ignoring cache failures — treat the cache as optional. If Redis is down, fall through to the database
- Coupling business logic to cache — the application should work (slowly) without the cache
One thing to remember: Production caching with aiocache requires handling cache stampedes (use locking or probabilistic early refresh), choosing the right serializer (JSON for safety, msgpack for performance), monitoring hit rates and latency, and treating the cache as an optimization layer that can fail gracefully — never as a primary data store.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.
- Python Anyio Understand Anyio through an everyday analogy so Python behavior feels intuitive, not random.