Flask Caching Strategies — Deep Dive

Caching patterns taxonomy

Cache-aside (lazy loading)

The application checks the cache first. On miss, it loads from the source, stores in cache, and returns:

def get_product(product_id):
    cache_key = f'product:{product_id}'
    product = cache.get(cache_key)
    if product is None:
        product = Product.query.get(product_id)
        if product:
            cache.set(cache_key, product.to_dict(), timeout=300)
    return product

Pros: Only caches data that’s actually requested. Cache misses are self-healing. Cons: First request after expiration is slow. Possible thundering herd.

Write-through

Writes update both the cache and the database simultaneously:

def update_product(product_id, data):
    product = Product.query.get(product_id)
    product.name = data['name']
    product.price = data['price']
    db.session.commit()
    
    # Update cache immediately
    cache.set(f'product:{product_id}', product.to_dict(), timeout=300)

Pros: Cache is always fresh after writes. No stale data window. Cons: Write latency increases (must update two stores). Wastes cache space on data that may never be read.

Write-behind (write-back)

Writes go to the cache first, then asynchronously to the database:

def update_product_fast(product_id, data):
    cache_key = f'product:{product_id}'
    cache.set(cache_key, data, timeout=300)
    # Queue async database write
    celery_app.send_task('persist_product', args=[product_id, data])

Pros: Extremely fast writes. Batches database operations. Cons: Data loss if cache crashes before persistence. Complex error handling.

Cache stampede prevention

When a popular cache entry expires, hundreds of concurrent requests all miss the cache simultaneously and flood the database. This is a cache stampede (thundering herd).

Locking approach

import threading

_locks = {}

def get_with_lock(key, loader, timeout=300):
    value = cache.get(key)
    if value is not None:
        return value
    
    lock_key = f'lock:{key}'
    lock = _locks.setdefault(lock_key, threading.Lock())
    
    if lock.acquire(blocking=False):
        try:
            # Double-check after acquiring lock
            value = cache.get(key)
            if value is None:
                value = loader()
                cache.set(key, value, timeout=timeout)
            return value
        finally:
            lock.release()
    else:
        # Another thread is loading — wait and retry
        lock.acquire()
        lock.release()
        return cache.get(key)  # Should be populated by now

For distributed systems, use Redis distributed locks:

import redis

def get_with_distributed_lock(key, loader, timeout=300):
    value = cache.get(key)
    if value is not None:
        return value
    
    r = redis.from_url(app.config['CACHE_REDIS_URL'])
    lock = r.lock(f'lock:{key}', timeout=10)
    
    if lock.acquire(blocking=True, blocking_timeout=5):
        try:
            value = cache.get(key)
            if value is None:
                value = loader()
                cache.set(key, value, timeout=timeout)
            return value
        finally:
            lock.release()
    else:
        # Lock acquisition timed out — load directly
        return loader()

Probabilistic early expiration

Refresh cache entries before they actually expire:

import random
import time

def get_with_early_refresh(key, loader, timeout=300, beta=1.0):
    data = cache.get(key)
    if data and isinstance(data, dict) and '_cache_meta' in data:
        meta = data['_cache_meta']
        expiry = meta['set_at'] + timeout
        remaining = expiry - time.time()
        
        # Probabilistically refresh before expiration
        if remaining > 0:
            threshold = remaining - beta * random.random()
            if threshold > 0:
                return data['value']
        
        # Refresh needed
        value = loader()
        cache.set(key, {
            'value': value,
            '_cache_meta': {'set_at': time.time()}
        }, timeout=timeout)
        return value
    
    # Cache miss
    value = loader()
    cache.set(key, {
        'value': value,
        '_cache_meta': {'set_at': time.time()}
    }, timeout=timeout)
    return value

As the entry approaches expiration, the probability of a refresh increases. This spreads cache rebuilds over time instead of concentrating them at the expiration point.

HTTP caching with ETags

Server-side caching reduces database load. HTTP caching reduces server load by letting browsers and CDNs serve cached responses:

import hashlib

@app.route('/api/products/<int:product_id>')
def get_product(product_id):
    product = Product.query.get_or_404(product_id)
    data = product.to_dict()
    
    # Generate ETag from content
    etag = hashlib.md5(str(data).encode()).hexdigest()
    
    # Check if client has current version
    if request.if_none_match and etag in request.if_none_match:
        return Response(status=304)  # Not Modified
    
    response = jsonify(data)
    response.headers['ETag'] = etag
    response.headers['Cache-Control'] = 'private, max-age=60'
    return response

The client sends If-None-Match: <etag> on subsequent requests. If the data hasn’t changed, the server returns 304 with no body, saving bandwidth and serialization time.

Cache-Control headers

@app.after_request
def add_cache_headers(response):
    if request.endpoint == 'static':
        response.headers['Cache-Control'] = 'public, max-age=31536000'  # 1 year
    elif request.endpoint and request.endpoint.startswith('api_'):
        response.headers['Cache-Control'] = 'private, no-cache'
    return response
  • public — CDNs and proxies can cache this
  • private — Only the browser can cache (user-specific data)
  • no-cache — Cache but revalidate on every request (ETag check)
  • no-store — Never cache (sensitive data like banking)
  • max-age=N — Cache for N seconds without checking

Multi-layer caching architecture

Production systems stack multiple cache layers:

Client → CDN (edge cache) → Reverse proxy (Nginx) → Application (Redis) → Database

Layer 1: Browser cache (Cache-Control headers)
  - Eliminates request entirely
  - Per-user, controlled by response headers

Layer 2: CDN (CloudFlare, CloudFront)
  - Geographic distribution
  - Handles static assets and public API responses
  - Configured via Cache-Control: public

Layer 3: Reverse proxy (Nginx, Varnish)
  - Caches full HTTP responses
  - Sits in front of application servers
  - Reduces load on Python processes

Layer 4: Application cache (Flask-Caching + Redis)
  - Caches database query results and computed values
  - Finest-grained control
  - Can invalidate on writes

Layer 5: Database query cache
  - MySQL/PostgreSQL internal caching
  - Automatic, limited control

Each layer catches requests that penetrate the layer above it. A well-tuned stack serves 99%+ of traffic from cache.

Distributed cache invalidation

With multiple application servers sharing a Redis cache, invalidation must be coordinated:

Event-based invalidation

import redis

r = redis.from_url(app.config['CACHE_REDIS_URL'])
pubsub = r.pubsub()

def publish_invalidation(key_pattern):
    r.publish('cache_invalidation', key_pattern)

# In a background thread per worker:
def listen_for_invalidations():
    pubsub.subscribe('cache_invalidation')
    for message in pubsub.listen():
        if message['type'] == 'message':
            pattern = message['data'].decode()
            # Clear local in-memory caches matching pattern
            local_cache.clear_pattern(pattern)

Redis pub/sub notifies all application servers to clear their local caches when data changes. This supplements Redis cache (which is already shared) by invalidating any per-process in-memory caches.

Version-based invalidation

Instead of deleting cache entries, change the cache key:

def get_product_cache_version():
    version = cache.get('product_cache_version')
    if version is None:
        version = 1
        cache.set('product_cache_version', version, timeout=0)
    return version

def get_products():
    version = get_product_cache_version()
    key = f'products:v{version}'
    data = cache.get(key)
    if data is None:
        data = [p.to_dict() for p in Product.query.all()]
        cache.set(key, data, timeout=300)
    return data

def invalidate_products():
    cache.inc('product_cache_version')

Incrementing the version makes old cache keys orphans that expire naturally. No explicit deletion needed, and no risk of race conditions during invalidation.

Caching with request context

Cache keys must account for all variables that affect the response:

def make_cache_key():
    """Generate cache key from request context."""
    return f'{request.path}:{request.args}:{current_user.id}:{g.get("locale", "en")}'

@app.route('/api/dashboard')
@cache.cached(timeout=60, key_prefix=make_cache_key)
def dashboard():
    return jsonify(build_dashboard(current_user))

Forgetting to include the user ID means User A sees User B’s dashboard. Forgetting the locale means English users see Spanish content. Every variable that changes the response must be in the cache key.

Monitoring cache effectiveness

Track hit rates to know if caching is working:

class MonitoredCache:
    def __init__(self, cache):
        self._cache = cache
        self.hits = 0
        self.misses = 0
    
    def get(self, key):
        value = self._cache.get(key)
        if value is not None:
            self.hits += 1
            metrics.increment('cache.hit')
        else:
            self.misses += 1
            metrics.increment('cache.miss')
        return value
    
    @property
    def hit_rate(self):
        total = self.hits + self.misses
        return self.hits / total if total > 0 else 0

Target hit rates:

  • > 95% — Excellent, caching is well-tuned
  • 80-95% — Good, look for frequently-missed keys
  • < 80% — Investigate: timeouts too short? Too many unique keys? Wrong data being cached?

One thing to remember: Caching is a spectrum from simple @cache.cached() decorators to multi-layer architectures with distributed invalidation. Start with the simplest approach that solves your performance problem. Add complexity only when measurements show the simple approach isn’t enough. A cache hit rate below 80% usually means you’re caching the wrong things, not that you need more caching.

pythonflaskcachingperformance

See Also