Python Write-Through Cache — Deep Dive

Architecture overview

A write-through cache sits as an intermediary between the application and the backing store. Every mutation passes through the cache layer, which orchestrates the dual write. The key design decision is what happens when one write succeeds and the other fails.

Application → Cache Layer → [Backing Store write] → [Cache write] → Response

If the backing store write fails, the cache remains unchanged and the error propagates. If the backing store succeeds but the cache write fails, you need a recovery strategy — typically invalidating the cache entry so the next read repopulates from the database.

Implementation in Python

Here’s a write-through cache layer using Redis and SQLAlchemy:

import json
import logging
from typing import Any, Callable, Optional

import redis
from sqlalchemy.orm import Session

logger = logging.getLogger(__name__)


class WriteThroughCache:
    """Write-through cache that keeps Redis in sync with the database."""

    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl

    def _cache_key(self, namespace: str, entity_id: str) -> str:
        return f"wtc:{namespace}:{entity_id}"

    def write(
        self,
        db: Session,
        namespace: str,
        entity_id: str,
        db_writer: Callable[[Session], Any],
        serialize: Callable[[Any], dict],
        ttl: Optional[int] = None,
    ) -> Any:
        """Write to database first, then update cache."""
        # Step 1: Write to the backing store
        result = db_writer(db)
        db.commit()

        # Step 2: Update cache (best-effort after DB success)
        key = self._cache_key(namespace, entity_id)
        try:
            payload = json.dumps(serialize(result))
            self.redis.setex(key, ttl or self.default_ttl, payload)
        except redis.RedisError:
            # DB write succeeded — invalidate cache so next read is fresh
            logger.warning("Cache write failed for %s, invalidating", key)
            self._safe_delete(key)

        return result

    def read(
        self,
        db: Session,
        namespace: str,
        entity_id: str,
        db_reader: Callable[[Session], Any],
        serialize: Callable[[Any], dict],
        ttl: Optional[int] = None,
    ) -> Optional[dict]:
        """Read from cache; on miss, populate from database."""
        key = self._cache_key(namespace, entity_id)

        # Try cache first
        try:
            cached = self.redis.get(key)
            if cached is not None:
                return json.loads(cached)
        except redis.RedisError:
            logger.warning("Cache read failed for %s, falling back to DB", key)

        # Cache miss — read from DB and populate
        result = db_reader(db)
        if result is None:
            return None

        data = serialize(result)
        try:
            self.redis.setex(
                key, ttl or self.default_ttl, json.dumps(data)
            )
        except redis.RedisError:
            pass  # Read still succeeds without cache

        return data

    def invalidate(self, namespace: str, entity_id: str) -> None:
        """Explicitly remove an entry from cache."""
        self._safe_delete(self._cache_key(namespace, entity_id))

    def _safe_delete(self, key: str) -> None:
        try:
            self.redis.delete(key)
        except redis.RedisError:
            logger.error("Failed to delete cache key %s", key)

Using the cache with a Django-style model

from dataclasses import dataclass


@dataclass
class Product:
    id: str
    name: str
    price_cents: int
    stock: int


def update_product_stock(cache: WriteThroughCache, db: Session,
                         product_id: str, new_stock: int) -> dict:
    """Update stock with write-through guarantee."""

    def db_writer(session: Session) -> Product:
        product = session.query(Product).get(product_id)
        product.stock = new_stock
        return product

    def serialize(p: Product) -> dict:
        return {"id": p.id, "name": p.name,
                "price_cents": p.price_cents, "stock": p.stock}

    result = cache.write(
        db=db,
        namespace="products",
        entity_id=product_id,
        db_writer=db_writer,
        serialize=serialize,
        ttl=1800,
    )
    return serialize(result)

Failure modes and mitigation

Database write fails

The operation raises an exception. The cache is never updated. This is the safest failure mode — no inconsistency.

Cache write fails after database success

The data exists in the database but not the cache. The fallback strategy in the code above invalidates the stale key, forcing the next read to reload from the database.

Redis is completely down

All writes still succeed (database is the source of truth). Reads fall back to the database directly. The application is slower but correct. Add monitoring on cache hit rates to detect this.

Network partition between cache and database

If the application can reach the database but not Redis, the write-through pattern degrades gracefully to “database only.” No data loss occurs.

Performance characteristics

Write-through adds latency proportional to the cache RTT (typically 0.5–2ms for local Redis). For a system handling 10,000 reads per second and 100 writes per second, the math works out:

  • Without cache: 10,100 database queries/second
  • With write-through: 100 database writes + 100 Redis writes + 10,000 Redis reads = 100 database queries/second

That’s a 99% reduction in database load for reads.

Testing the cache layer

import fakeredis
import pytest


@pytest.fixture
def cache():
    fake_redis = fakeredis.FakeRedis()
    return WriteThroughCache(redis_client=fake_redis, default_ttl=60)


def test_write_populates_cache(cache):
    """After a write, the cache should hold the new data."""
    # Simulate a DB write that returns a product dict
    result = cache.write(
        db=mock_session,
        namespace="products",
        entity_id="p-1",
        db_writer=lambda s: Product("p-1", "Widget", 999, 50),
        serialize=lambda p: {"id": p.id, "name": p.name,
                             "price_cents": p.price_cents, "stock": p.stock},
    )
    # Subsequent read should come from cache, not DB
    cached = cache.read(
        db=mock_session,
        namespace="products",
        entity_id="p-1",
        db_reader=lambda s: None,  # DB should not be called
        serialize=lambda p: {},
    )
    assert cached["stock"] == 50

When write-through isn’t enough

If your writes are bursty (Black Friday inventory updates), consider combining write-through with a short write buffer or circuit breaker around the cache layer. Libraries like tenacity can add retry logic for transient Redis failures without blocking the main write path.

For write-heavy systems, explore write-behind caching which decouples the database write from the application response.

The one thing to remember: write-through caching is the simplest pattern that guarantees cache-database consistency — build it as your default, then optimize toward write-behind or cache-aside only when measurements justify the added complexity.

pythoncachingdata-patterns

See Also

  • Python Cache Aside Pattern Learn the cache-aside pattern through a fridge analogy that makes Python caching strategies click instantly.
  • Python Distributed Caching Understand distributed caching through a shared class notebook analogy that makes multi-server Python caching obvious.
  • Python Write Behind Cache Discover how a write-behind cache works like a waiter who takes your order fast and sends it to the kitchen later.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.