Python Cache-Aside Pattern — Deep Dive

Pattern mechanics

Cache-aside places the application in full control of the cache lifecycle. Unlike write-through or write-behind, the cache is not aware of the database at all — it’s a passive key-value store. The application orchestrates all interactions.

Read:  App → Cache? → hit → return
                    → miss → DB → store in cache → return

Write: App → DB → invalidate cache

This separation makes cache-aside the easiest pattern to retrofit onto existing applications.

Basic implementation

import json
import logging
from typing import Any, Callable, Optional, TypeVar

import redis

logger = logging.getLogger(__name__)
T = TypeVar("T")


class CacheAside:
    """Cache-aside (lazy-loading) wrapper around Redis."""

    def __init__(self, redis_client: redis.Redis, default_ttl: int = 300):
        self.redis = redis_client
        self.default_ttl = default_ttl
        self._hits = 0
        self._misses = 0

    def get_or_load(
        self,
        key: str,
        loader: Callable[[], Any],
        ttl: Optional[int] = None,
        serialize: Callable[[Any], str] = json.dumps,
        deserialize: Callable[[str], Any] = json.loads,
    ) -> Any:
        """Check cache first; on miss, call loader and populate."""
        try:
            cached = self.redis.get(key)
            if cached is not None:
                self._hits += 1
                return deserialize(cached)
        except redis.RedisError:
            logger.warning("Cache read error for %s", key)

        # Cache miss — load from source
        self._misses += 1
        value = loader()
        if value is not None:
            try:
                self.redis.setex(key, ttl or self.default_ttl, serialize(value))
            except redis.RedisError:
                logger.warning("Cache write error for %s", key)

        return value

    def invalidate(self, key: str) -> None:
        """Remove a key from cache after a write."""
        try:
            self.redis.delete(key)
        except redis.RedisError:
            logger.warning("Cache invalidation failed for %s", key)

    @property
    def hit_rate(self) -> float:
        total = self._hits + self._misses
        return self._hits / total if total > 0 else 0.0

Decorator pattern for cleaner code

Wrapping cache-aside logic in a decorator keeps business logic clean:

import functools
import hashlib


def cache_aside(
    cache: CacheAside,
    prefix: str,
    ttl: int = 300,
):
    """Decorator that applies cache-aside to a function."""

    def decorator(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            # Build a deterministic cache key from function arguments
            raw = f"{prefix}:{args}:{sorted(kwargs.items())}"
            key = f"{prefix}:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
            return cache.get_or_load(key, lambda: fn(*args, **kwargs), ttl=ttl)
        return wrapper
    return decorator


# Usage
@cache_aside(cache=my_cache, prefix="product", ttl=600)
def get_product(product_id: str) -> dict:
    """Fetches product from database — cached transparently."""
    row = db.execute("SELECT * FROM products WHERE id = %s", (product_id,))
    return dict(row) if row else None

Thundering herd protection with distributed locks

When a popular key expires, many concurrent requests trigger simultaneous cache misses. A Redis-based lock ensures only one request repopulates:

import time
import uuid


class CacheAsideWithLock(CacheAside):
    """Cache-aside with distributed lock to prevent thundering herd."""

    def __init__(self, redis_client: redis.Redis, default_ttl: int = 300,
                 lock_ttl: int = 5):
        super().__init__(redis_client, default_ttl)
        self.lock_ttl = lock_ttl

    def get_or_load(self, key: str, loader: Callable[[], Any],
                    ttl: Optional[int] = None, **kwargs) -> Any:
        # Try cache first
        cached = self._try_get(key)
        if cached is not None:
            self._hits += 1
            return json.loads(cached)

        self._misses += 1
        lock_key = f"lock:{key}"
        lock_id = str(uuid.uuid4())

        # Try to acquire lock
        acquired = self.redis.set(lock_key, lock_id, nx=True, ex=self.lock_ttl)

        if acquired:
            try:
                value = loader()
                if value is not None:
                    self.redis.setex(key, ttl or self.default_ttl, json.dumps(value))
                return value
            finally:
                # Release lock only if we still own it
                self._release_lock(lock_key, lock_id)
        else:
            # Another request is loading — wait and retry cache
            return self._wait_for_cache(key, loader, ttl)

    def _try_get(self, key: str) -> Optional[str]:
        try:
            return self.redis.get(key)
        except redis.RedisError:
            return None

    def _release_lock(self, lock_key: str, lock_id: str) -> None:
        """Atomic check-and-delete using Lua script."""
        script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        end
        return 0
        """
        self.redis.eval(script, 1, lock_key, lock_id)

    def _wait_for_cache(self, key: str, loader: Callable, ttl: Optional[int],
                        retries: int = 10, delay: float = 0.1) -> Any:
        """Wait for another request to populate the cache."""
        for _ in range(retries):
            time.sleep(delay)
            cached = self._try_get(key)
            if cached is not None:
                return json.loads(cached)
        # Fallback: load directly if the lock holder was too slow
        return loader()

Stale-while-revalidate

An alternative to locking: serve slightly stale data while refreshing in the background. Store the actual expiry time in the cached value and use a longer Redis TTL:

import threading


def get_with_stale_revalidate(
    cache: redis.Redis, key: str, loader: Callable,
    soft_ttl: int = 300, hard_ttl: int = 600,
) -> Optional[dict]:
    """Serve stale data while refreshing in the background."""
    raw = cache.get(key)
    if raw is not None:
        entry = json.loads(raw)
        if time.time() < entry["expires_at"]:
            return entry["data"]  # Fresh — serve directly
        # Stale but available — serve and refresh async
        threading.Thread(
            target=_refresh, args=(cache, key, loader, soft_ttl, hard_ttl),
            daemon=True,
        ).start()
        return entry["data"]

    # True miss — synchronous load
    return _refresh(cache, key, loader, soft_ttl, hard_ttl)


def _refresh(cache, key, loader, soft_ttl, hard_ttl):
    value = loader()
    if value is not None:
        entry = {"data": value, "expires_at": time.time() + soft_ttl}
        cache.setex(key, hard_ttl, json.dumps(entry))
    return value

Invalidation strategies

Delete on write (simplest)

def update_product(product_id: str, data: dict) -> None:
    db.execute("UPDATE products SET ... WHERE id = %s", (product_id,))
    cache.delete(f"product:{product_id}")

Tag-based invalidation

When one entity change affects multiple cache keys (e.g., updating a category invalidates all products in that category), maintain tag sets:

def tag_invalidate(cache: redis.Redis, tag: str) -> int:
    """Invalidate all keys associated with a tag."""
    keys = cache.smembers(f"tag:{tag}")
    if keys:
        cache.delete(*keys)
        cache.delete(f"tag:{tag}")
    return len(keys)

Performance measurement

Track these metrics to know if your cache-aside implementation is working:

  • Hit rate — target above 90% for most read-heavy workloads.
  • Miss latency — how long a cache miss takes (database query + cache write).
  • Eviction rate — if Redis is evicting entries due to memory pressure, your cache may be undersized.
  • Invalidation frequency — high invalidation rates may signal a mismatch between TTL and write frequency.

Testing

import fakeredis
import pytest


def test_cache_miss_loads_from_source():
    cache = CacheAside(fakeredis.FakeRedis(), default_ttl=60)
    loader_calls = []

    def loader():
        loader_calls.append(1)
        return {"id": "p1", "name": "Widget"}

    result = cache.get_or_load("product:p1", loader)
    assert result["name"] == "Widget"
    assert len(loader_calls) == 1

    # Second call should hit cache
    result2 = cache.get_or_load("product:p1", loader)
    assert result2["name"] == "Widget"
    assert len(loader_calls) == 1  # Loader not called again


def test_invalidation_causes_reload():
    cache = CacheAside(fakeredis.FakeRedis(), default_ttl=60)
    call_count = 0

    def loader():
        nonlocal call_count
        call_count += 1
        return {"version": call_count}

    cache.get_or_load("key", loader)
    assert call_count == 1

    cache.invalidate("key")
    result = cache.get_or_load("key", loader)
    assert call_count == 2
    assert result["version"] == 2

The one thing to remember: cache-aside is the most pragmatic caching pattern — it requires the least infrastructure change, gracefully degrades when the cache fails, and gives your application explicit control over every caching decision.

pythoncachingdesign-patterns

See Also

  • Python Distributed Caching Understand distributed caching through a shared class notebook analogy that makes multi-server Python caching obvious.
  • Python Write Behind Cache Discover how a write-behind cache works like a waiter who takes your order fast and sends it to the kitchen later.
  • Python Write Through Cache See why a write-through cache is like a librarian who updates the catalog the moment a new book arrives.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.