Python Distributed Caching — Deep Dive

Architecture patterns

Single-tier: distributed cache only

App Servers → Redis (single/cluster) → Database

All cache lookups go to Redis. Simplest to reason about. Every cache read adds 0.5–2ms network latency.

Two-tier: local + distributed

App Servers → L1 (in-process dict) → L2 (Redis) → Database

Hot keys are cached in-process (L1) with short TTLs (5–30 seconds). On L1 miss, check Redis (L2). On L2 miss, query the database and populate both tiers. This eliminates network calls for the most-accessed data.

The challenge: L1 caches across servers can diverge. Use short L1 TTLs or Redis pub/sub to broadcast invalidation.

Connection pooling

Every Redis call opens a TCP connection. Without pooling, high-concurrency applications exhaust file descriptors. Always use connection pools:

import redis


def create_redis_pool(
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    max_connections: int = 50,
    socket_timeout: float = 1.0,
    socket_connect_timeout: float = 1.0,
    retry_on_timeout: bool = True,
) -> redis.Redis:
    """Create a Redis client with connection pooling and timeouts."""
    pool = redis.ConnectionPool(
        host=host,
        port=port,
        db=db,
        max_connections=max_connections,
        socket_timeout=socket_timeout,
        socket_connect_timeout=socket_connect_timeout,
        retry_on_timeout=retry_on_timeout,
        decode_responses=True,
    )
    return redis.Redis(connection_pool=pool)

Key settings:

  • max_connections — match to your expected concurrency. Too low causes ConnectionError; too high wastes memory on the Redis server.
  • socket_timeout — prevents hanging reads. Set to 1–2 seconds for cache operations.
  • retry_on_timeout — automatically retries once on timeout, useful for transient network blips.

Two-tier cache implementation

import json
import time
import threading
from typing import Any, Callable, Optional


class TwoTierCache:
    """L1 (in-process) + L2 (Redis) distributed cache."""

    def __init__(self, redis_client: redis.Redis, l1_ttl: float = 10.0,
                 l2_ttl: int = 300):
        self.redis = redis_client
        self.l1_ttl = l1_ttl
        self.l2_ttl = l2_ttl
        self._l1: dict[str, tuple[float, Any]] = {}
        self._l1_lock = threading.Lock()

    def get(self, key: str, loader: Callable[[], Any]) -> Any:
        # L1 check (in-process, no network)
        with self._l1_lock:
            if key in self._l1:
                expiry, value = self._l1[key]
                if time.monotonic() < expiry:
                    return value
                del self._l1[key]

        # L2 check (Redis, ~1ms network)
        try:
            raw = self.redis.get(key)
            if raw is not None:
                value = json.loads(raw)
                self._set_l1(key, value)
                return value
        except redis.RedisError:
            pass

        # Full miss — load from source
        value = loader()
        if value is not None:
            self._set_l1(key, value)
            try:
                self.redis.setex(key, self.l2_ttl, json.dumps(value))
            except redis.RedisError:
                pass
        return value

    def invalidate(self, key: str) -> None:
        with self._l1_lock:
            self._l1.pop(key, None)
        try:
            self.redis.delete(key)
        except redis.RedisError:
            pass

    def _set_l1(self, key: str, value: Any) -> None:
        with self._l1_lock:
            self._l1[key] = (time.monotonic() + self.l1_ttl, value)

Redis Cluster integration

For large-scale deployments, Redis Cluster distributes keys across shards:

from redis.cluster import RedisCluster


def create_cluster_client(
    startup_nodes: list[dict],
) -> RedisCluster:
    """Connect to a Redis Cluster with automatic shard routing."""
    return RedisCluster(
        startup_nodes=[
            redis.cluster.ClusterNode(n["host"], n["port"])
            for n in startup_nodes
        ],
        decode_responses=True,
        skip_full_coverage_check=True,
        retry_on_timeout=True,
    )


# Usage
cluster = create_cluster_client([
    {"host": "redis-1.internal", "port": 6379},
    {"host": "redis-2.internal", "port": 6379},
    {"host": "redis-3.internal", "port": 6379},
])

Redis Cluster uses hash slots (16,384 total). Keys are assigned to slots via CRC16(key) % 16384. Use hash tags {user:123}:profile to ensure related keys land on the same shard.

Serialization performance

JSON is readable but slow for large payloads. For high-throughput caches, consider msgpack or pickle:

import msgpack


class MsgpackCache:
    """Distributed cache with msgpack serialization for speed."""

    def __init__(self, redis_client: redis.Redis, ttl: int = 300):
        self.redis = redis_client
        self.ttl = ttl

    def set(self, key: str, value: Any) -> None:
        packed = msgpack.packb(value, use_bin_type=True)
        self.redis.setex(key, self.ttl, packed)

    def get(self, key: str) -> Optional[Any]:
        raw = self.redis.get(key)
        if raw is None:
            return None
        return msgpack.unpackb(raw, raw=False)

Benchmark comparison for a 5KB payload (typical API response):

SerializerEncodeDecodeSize
JSON45 µs38 µs5,200 bytes
msgpack12 µs8 µs3,800 bytes
pickle15 µs10 µs4,100 bytes

msgpack gives ~3x encoding speedup and ~25% size reduction.

Cache stampede protection with probabilistic early refresh

Instead of locking, compute a random early-refresh probability that increases as the TTL approaches expiry:

import math
import random


def should_refresh_early(
    remaining_ttl: float,
    compute_time: float,
    beta: float = 1.0,
) -> bool:
    """XFetch algorithm: probabilistic early recomputation."""
    if remaining_ttl <= 0:
        return True
    # Higher beta = more aggressive early refresh
    threshold = compute_time * beta * math.log(random.random())
    return remaining_ttl + threshold <= 0

This approach from the “Optimal Probabilistic Cache Stampede Prevention” paper ensures that exactly one request refreshes the entry with high probability, without any distributed locks.

Monitoring and observability

Essential metrics for distributed cache health:

from prometheus_client import Counter, Histogram, Gauge

cache_ops = Counter(
    "cache_operations_total", "Cache operations",
    ["operation", "result"],  # operation: get/set/delete, result: hit/miss/error
)
cache_latency = Histogram(
    "cache_operation_seconds", "Cache operation latency",
    ["operation"],
    buckets=[0.0005, 0.001, 0.005, 0.01, 0.05, 0.1],
)
cache_size = Gauge("cache_entries_total", "Estimated cache entries")


def monitored_get(cache: redis.Redis, key: str) -> Optional[str]:
    """Cache get with Prometheus instrumentation."""
    with cache_latency.labels(operation="get").time():
        result = cache.get(key)
        if result is not None:
            cache_ops.labels(operation="get", result="hit").inc()
        else:
            cache_ops.labels(operation="get", result="miss").inc()
        return result

Dashboard alerts to configure:

  • Hit rate below 80% — cache may be undersized or TTLs too short.
  • P99 latency above 10ms — network issues or Redis overload.
  • Memory usage above 80% — risk of eviction affecting hit rates.
  • Connection pool exhaustion — increase max_connections or reduce concurrency.

Async support with aioredis

For async Python applications (FastAPI, aiohttp):

import redis.asyncio as aioredis


async def create_async_pool(url: str = "redis://localhost") -> aioredis.Redis:
    return aioredis.from_url(
        url,
        max_connections=50,
        decode_responses=True,
        socket_timeout=1.0,
    )


async def cached_get(
    cache: aioredis.Redis, key: str, loader, ttl: int = 300,
) -> Any:
    cached = await cache.get(key)
    if cached is not None:
        return json.loads(cached)

    value = await loader()
    if value is not None:
        await cache.setex(key, ttl, json.dumps(value))
    return value

The one thing to remember: distributed caching transforms your Python application from independent servers with fragmented caches into a unified system with shared state — but it demands careful attention to connection management, serialization, and monitoring to deliver on its promise.

pythoncachingdistributed-systems

See Also

  • Python Cache Aside Pattern Learn the cache-aside pattern through a fridge analogy that makes Python caching strategies click instantly.
  • Python Write Behind Cache Discover how a write-behind cache works like a waiter who takes your order fast and sends it to the kitchen later.
  • Python Write Through Cache See why a write-through cache is like a librarian who updates the catalog the moment a new book arrives.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.