Flask Caching Strategies — Deep Dive
Caching patterns taxonomy
Cache-aside (lazy loading)
The application checks the cache first. On miss, it loads from the source, stores in cache, and returns:
def get_product(product_id):
cache_key = f'product:{product_id}'
product = cache.get(cache_key)
if product is None:
product = Product.query.get(product_id)
if product:
cache.set(cache_key, product.to_dict(), timeout=300)
return product
Pros: Only caches data that’s actually requested. Cache misses are self-healing. Cons: First request after expiration is slow. Possible thundering herd.
Write-through
Writes update both the cache and the database simultaneously:
def update_product(product_id, data):
product = Product.query.get(product_id)
product.name = data['name']
product.price = data['price']
db.session.commit()
# Update cache immediately
cache.set(f'product:{product_id}', product.to_dict(), timeout=300)
Pros: Cache is always fresh after writes. No stale data window. Cons: Write latency increases (must update two stores). Wastes cache space on data that may never be read.
Write-behind (write-back)
Writes go to the cache first, then asynchronously to the database:
def update_product_fast(product_id, data):
cache_key = f'product:{product_id}'
cache.set(cache_key, data, timeout=300)
# Queue async database write
celery_app.send_task('persist_product', args=[product_id, data])
Pros: Extremely fast writes. Batches database operations. Cons: Data loss if cache crashes before persistence. Complex error handling.
Cache stampede prevention
When a popular cache entry expires, hundreds of concurrent requests all miss the cache simultaneously and flood the database. This is a cache stampede (thundering herd).
Locking approach
import threading
_locks = {}
def get_with_lock(key, loader, timeout=300):
value = cache.get(key)
if value is not None:
return value
lock_key = f'lock:{key}'
lock = _locks.setdefault(lock_key, threading.Lock())
if lock.acquire(blocking=False):
try:
# Double-check after acquiring lock
value = cache.get(key)
if value is None:
value = loader()
cache.set(key, value, timeout=timeout)
return value
finally:
lock.release()
else:
# Another thread is loading — wait and retry
lock.acquire()
lock.release()
return cache.get(key) # Should be populated by now
For distributed systems, use Redis distributed locks:
import redis
def get_with_distributed_lock(key, loader, timeout=300):
value = cache.get(key)
if value is not None:
return value
r = redis.from_url(app.config['CACHE_REDIS_URL'])
lock = r.lock(f'lock:{key}', timeout=10)
if lock.acquire(blocking=True, blocking_timeout=5):
try:
value = cache.get(key)
if value is None:
value = loader()
cache.set(key, value, timeout=timeout)
return value
finally:
lock.release()
else:
# Lock acquisition timed out — load directly
return loader()
Probabilistic early expiration
Refresh cache entries before they actually expire:
import random
import time
def get_with_early_refresh(key, loader, timeout=300, beta=1.0):
data = cache.get(key)
if data and isinstance(data, dict) and '_cache_meta' in data:
meta = data['_cache_meta']
expiry = meta['set_at'] + timeout
remaining = expiry - time.time()
# Probabilistically refresh before expiration
if remaining > 0:
threshold = remaining - beta * random.random()
if threshold > 0:
return data['value']
# Refresh needed
value = loader()
cache.set(key, {
'value': value,
'_cache_meta': {'set_at': time.time()}
}, timeout=timeout)
return value
# Cache miss
value = loader()
cache.set(key, {
'value': value,
'_cache_meta': {'set_at': time.time()}
}, timeout=timeout)
return value
As the entry approaches expiration, the probability of a refresh increases. This spreads cache rebuilds over time instead of concentrating them at the expiration point.
HTTP caching with ETags
Server-side caching reduces database load. HTTP caching reduces server load by letting browsers and CDNs serve cached responses:
import hashlib
@app.route('/api/products/<int:product_id>')
def get_product(product_id):
product = Product.query.get_or_404(product_id)
data = product.to_dict()
# Generate ETag from content
etag = hashlib.md5(str(data).encode()).hexdigest()
# Check if client has current version
if request.if_none_match and etag in request.if_none_match:
return Response(status=304) # Not Modified
response = jsonify(data)
response.headers['ETag'] = etag
response.headers['Cache-Control'] = 'private, max-age=60'
return response
The client sends If-None-Match: <etag> on subsequent requests. If the data hasn’t changed, the server returns 304 with no body, saving bandwidth and serialization time.
Cache-Control headers
@app.after_request
def add_cache_headers(response):
if request.endpoint == 'static':
response.headers['Cache-Control'] = 'public, max-age=31536000' # 1 year
elif request.endpoint and request.endpoint.startswith('api_'):
response.headers['Cache-Control'] = 'private, no-cache'
return response
- public — CDNs and proxies can cache this
- private — Only the browser can cache (user-specific data)
- no-cache — Cache but revalidate on every request (ETag check)
- no-store — Never cache (sensitive data like banking)
- max-age=N — Cache for N seconds without checking
Multi-layer caching architecture
Production systems stack multiple cache layers:
Client → CDN (edge cache) → Reverse proxy (Nginx) → Application (Redis) → Database
Layer 1: Browser cache (Cache-Control headers)
- Eliminates request entirely
- Per-user, controlled by response headers
Layer 2: CDN (CloudFlare, CloudFront)
- Geographic distribution
- Handles static assets and public API responses
- Configured via Cache-Control: public
Layer 3: Reverse proxy (Nginx, Varnish)
- Caches full HTTP responses
- Sits in front of application servers
- Reduces load on Python processes
Layer 4: Application cache (Flask-Caching + Redis)
- Caches database query results and computed values
- Finest-grained control
- Can invalidate on writes
Layer 5: Database query cache
- MySQL/PostgreSQL internal caching
- Automatic, limited control
Each layer catches requests that penetrate the layer above it. A well-tuned stack serves 99%+ of traffic from cache.
Distributed cache invalidation
With multiple application servers sharing a Redis cache, invalidation must be coordinated:
Event-based invalidation
import redis
r = redis.from_url(app.config['CACHE_REDIS_URL'])
pubsub = r.pubsub()
def publish_invalidation(key_pattern):
r.publish('cache_invalidation', key_pattern)
# In a background thread per worker:
def listen_for_invalidations():
pubsub.subscribe('cache_invalidation')
for message in pubsub.listen():
if message['type'] == 'message':
pattern = message['data'].decode()
# Clear local in-memory caches matching pattern
local_cache.clear_pattern(pattern)
Redis pub/sub notifies all application servers to clear their local caches when data changes. This supplements Redis cache (which is already shared) by invalidating any per-process in-memory caches.
Version-based invalidation
Instead of deleting cache entries, change the cache key:
def get_product_cache_version():
version = cache.get('product_cache_version')
if version is None:
version = 1
cache.set('product_cache_version', version, timeout=0)
return version
def get_products():
version = get_product_cache_version()
key = f'products:v{version}'
data = cache.get(key)
if data is None:
data = [p.to_dict() for p in Product.query.all()]
cache.set(key, data, timeout=300)
return data
def invalidate_products():
cache.inc('product_cache_version')
Incrementing the version makes old cache keys orphans that expire naturally. No explicit deletion needed, and no risk of race conditions during invalidation.
Caching with request context
Cache keys must account for all variables that affect the response:
def make_cache_key():
"""Generate cache key from request context."""
return f'{request.path}:{request.args}:{current_user.id}:{g.get("locale", "en")}'
@app.route('/api/dashboard')
@cache.cached(timeout=60, key_prefix=make_cache_key)
def dashboard():
return jsonify(build_dashboard(current_user))
Forgetting to include the user ID means User A sees User B’s dashboard. Forgetting the locale means English users see Spanish content. Every variable that changes the response must be in the cache key.
Monitoring cache effectiveness
Track hit rates to know if caching is working:
class MonitoredCache:
def __init__(self, cache):
self._cache = cache
self.hits = 0
self.misses = 0
def get(self, key):
value = self._cache.get(key)
if value is not None:
self.hits += 1
metrics.increment('cache.hit')
else:
self.misses += 1
metrics.increment('cache.miss')
return value
@property
def hit_rate(self):
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
Target hit rates:
- > 95% — Excellent, caching is well-tuned
- 80-95% — Good, look for frequently-missed keys
- < 80% — Investigate: timeouts too short? Too many unique keys? Wrong data being cached?
One thing to remember: Caching is a spectrum from simple @cache.cached() decorators to multi-layer architectures with distributed invalidation. Start with the simplest approach that solves your performance problem. Add complexity only when measurements show the simple approach isn’t enough. A cache hit rate below 80% usually means you’re caching the wrong things, not that you need more caching.
See Also
- Python Django Admin Get an intuitive feel for Django Admin so Python behavior stops feeling unpredictable.
- Python Django Basics Get an intuitive feel for Django Basics so Python behavior stops feeling unpredictable.
- Python Django Celery Integration Why your Django app needs a helper to handle slow jobs in the background.
- Python Django Channels Websockets How Django can send real-time updates to your browser without you refreshing the page.
- Python Django Custom Management Commands How to teach Django new tricks by creating your own command-line shortcuts.