Python aiocache Caching — Deep Dive

The Cache Stampede Problem

The most dangerous caching failure mode occurs when a popular cached value expires and hundreds of concurrent requests simultaneously try to regenerate it:

  1. Cache key “popular_products” expires at t=0
  2. 200 requests arrive at t=0.001
  3. All 200 find the cache empty
  4. All 200 hit the database simultaneously
  5. Database buckles under load

Solving with Locking

aiocache doesn’t include built-in stampede protection, but you can implement it:

import asyncio
from aiocache import Cache

class StampedeProtectedCache:
    def __init__(self, cache: Cache):
        self.cache = cache
        self._locks = {}
    
    async def get_or_set(self, key, func, ttl=300):
        # Try cache first
        value = await self.cache.get(key)
        if value is not None:
            return value
        
        # Acquire per-key lock
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()
        
        async with self._locks[key]:
            # Double-check after acquiring lock
            value = await self.cache.get(key)
            if value is not None:
                return value
            
            # Only one coroutine regenerates
            value = await func()
            await self.cache.set(key, value, ttl=ttl)
            return value

Probabilistic Early Expiration

A more sophisticated approach — randomly regenerate values before they expire:

import random
import time

async def get_with_early_refresh(cache, key, func, ttl=300, beta=1.0):
    """XFetch algorithm — probabilistic early regeneration."""
    value, stored_at = await cache.get(key)  # Assume stored with timestamp
    
    if value is not None:
        age = time.time() - stored_at
        remaining = ttl - age
        # Probability of regeneration increases as expiry approaches
        if remaining > 0 and random.random() > (remaining / ttl) ** beta:
            return value  # Use cached value
    
    # Regenerate
    value = await func()
    await cache.set(key, (value, time.time()), ttl=ttl)
    return value

Multi-Layer Caching

Production systems often use multiple cache layers:

class MultiLayerCache:
    """L1 (memory) + L2 (Redis) caching."""
    
    def __init__(self):
        self.l1 = Cache(Cache.MEMORY)
        self.l2 = Cache(Cache.REDIS, endpoint="localhost", port=6379)
    
    async def get(self, key):
        # Try L1 first (fastest)
        value = await self.l1.get(key)
        if value is not None:
            return value
        
        # Try L2
        value = await self.l2.get(key)
        if value is not None:
            # Promote to L1
            await self.l1.set(key, value, ttl=60)
            return value
        
        return None
    
    async def set(self, key, value, ttl=300):
        # Write to both layers
        await self.l1.set(key, value, ttl=min(ttl, 60))  # Shorter L1 TTL
        await self.l2.set(key, value, ttl=ttl)
    
    async def delete(self, key):
        await self.l1.delete(key)
        await self.l2.delete(key)

L1 (memory) serves high-frequency reads with sub-millisecond latency. L2 (Redis) provides shared caching across application instances and handles L1 misses.

Custom Plugins

aiocache supports a plugin system for cross-cutting concerns:

from aiocache.plugins import BasePlugin

class MetricsPlugin(BasePlugin):
    """Track cache hit/miss rates."""
    
    def __init__(self):
        self.hits = 0
        self.misses = 0
    
    async def pre_get(self, client, key, **kwargs):
        pass
    
    async def post_get(self, client, key, ret=None, **kwargs):
        if ret is not None:
            self.hits += 1
        else:
            self.misses += 1
    
    @property
    def hit_rate(self):
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0

class TimingPlugin(BasePlugin):
    """Log slow cache operations."""
    
    async def pre_get(self, client, key, **kwargs):
        kwargs['start_time'] = time.monotonic()
    
    async def post_get(self, client, key, ret=None, **kwargs):
        elapsed = time.monotonic() - kwargs.get('start_time', 0)
        if elapsed > 0.1:  # > 100ms
            logger.warning(f"Slow cache get: {key} took {elapsed:.3f}s")

# Apply plugins
cache = Cache(
    Cache.REDIS,
    plugins=[MetricsPlugin(), TimingPlugin()]
)

Serialization Security

The Pickle Risk

When using PickleSerializer with Redis or Memcached, any process that can write to the cache can execute arbitrary code:

# An attacker who can write to Redis can inject:
import pickle
import os

class Exploit:
    def __reduce__(self):
        return (os.system, ("rm -rf /",))

# When your app deserializes this cached value... boom.

Mitigations:

  • Use JsonSerializer for any cache shared with untrusted sources
  • If you must use Pickle, sign cached values with HMAC
  • Restrict Redis/Memcached network access

Custom Serializers

from aiocache.serializers import BaseSerializer
import msgpack

class MsgPackSerializer(BaseSerializer):
    """Faster than JSON, more compact, still safe."""
    
    def dumps(self, value):
        return msgpack.packb(value, use_bin_type=True)
    
    def loads(self, value):
        if value is None:
            return None
        return msgpack.unpackb(value, raw=False)

cache = Cache(Cache.REDIS, serializer=MsgPackSerializer())

Cache Warming Strategies

On Startup

async def warm_cache(cache, db):
    """Pre-populate cache with frequently accessed data."""
    popular_items = await db.fetch("SELECT * FROM products ORDER BY views DESC LIMIT 100")
    
    for item in popular_items:
        await cache.set(
            f"product:{item['id']}",
            dict(item),
            ttl=3600
        )
    logger.info(f"Warmed cache with {len(popular_items)} products")

Background Refresh

async def background_refresh(cache, key, func, ttl=300, refresh_interval=240):
    """Continuously refresh a cache key before it expires."""
    while True:
        try:
            value = await func()
            await cache.set(key, value, ttl=ttl)
        except Exception as e:
            logger.error(f"Cache refresh failed for {key}: {e}")
        await asyncio.sleep(refresh_interval)

Integration with FastAPI

from fastapi import FastAPI, Depends
from aiocache import Cache, cached

app = FastAPI()
cache = Cache(Cache.REDIS, endpoint="localhost")

@app.on_event("shutdown")
async def shutdown():
    await cache.close()

def cache_key_from_request(func, request, *args, **kwargs):
    return f"{func.__name__}:{request.url.path}:{request.query_params}"

@app.get("/products/{product_id}")
@cached(ttl=300, cache=Cache.REDIS, key_builder=lambda f, *a, **kw: f"product:{kw.get('product_id', a[0])}")
async def get_product(product_id: int):
    return await db.fetch_product(product_id)

# Manual cache control
@app.put("/products/{product_id}")
async def update_product(product_id: int, data: dict):
    await db.update_product(product_id, data)
    await cache.delete(f"product:{product_id}")
    return {"status": "updated"}

Monitoring Cache Health

Key metrics to track:

class CacheMonitor:
    def __init__(self, cache):
        self.cache = cache
        self.stats = {
            "hits": 0, "misses": 0,
            "sets": 0, "deletes": 0,
            "errors": 0
        }
    
    async def health_check(self):
        """Return cache health metrics."""
        try:
            await self.cache.set("_health", "ok", ttl=10)
            value = await self.cache.get("_health")
            healthy = value == "ok"
        except Exception:
            healthy = False
        
        total = self.stats["hits"] + self.stats["misses"]
        return {
            "healthy": healthy,
            "hit_rate": self.stats["hits"] / total if total > 0 else 0,
            "total_operations": sum(self.stats.values()),
            "error_rate": self.stats["errors"] / total if total > 0 else 0,
        }

Alert thresholds:

  • Hit rate below 80% → cache might be too small or TTLs too short
  • Error rate above 1% → connection issues with Redis/Memcached
  • Latency p99 above 10ms (Redis) → network or Redis overload

Common Anti-Patterns

  1. Caching everything — only cache data that’s expensive to compute and read more often than it changes
  2. No TTL — data cached forever becomes stale forever. Always set a TTL, even a long one (24h)
  3. Cache key collisions — ensure keys include enough context (user ID, locale, version) to prevent cross-contamination
  4. Ignoring cache failures — treat the cache as optional. If Redis is down, fall through to the database
  5. Coupling business logic to cache — the application should work (slowly) without the cache

One thing to remember: Production caching with aiocache requires handling cache stampedes (use locking or probabilistic early refresh), choosing the right serializer (JSON for safety, msgpack for performance), monitoring hit rates and latency, and treating the cache as an optimization layer that can fail gracefully — never as a primary data store.

pythonasynccachingaiocache

See Also

  • Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
  • Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
  • Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
  • Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.
  • Python Anyio Understand Anyio through an everyday analogy so Python behavior feels intuitive, not random.