Python Fallback Strategies — Deep Dive
A Composable Fallback Chain
A well-designed fallback system lets you stack strategies declaratively:
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine, Optional
logger = logging.getLogger(__name__)
@dataclass
class FallbackResult:
value: Any
source: str # Which provider returned the result
latency_ms: float
is_fallback: bool
@dataclass
class FallbackChain:
"""Tries providers in order until one succeeds."""
name: str
providers: list[tuple[str, Callable[..., Coroutine]]] = field(
default_factory=list
)
def add(self, name: str, func: Callable[..., Coroutine]) -> "FallbackChain":
self.providers.append((name, func))
return self
async def execute(self, *args: Any, **kwargs: Any) -> FallbackResult:
errors: list[tuple[str, Exception]] = []
for i, (provider_name, func) in enumerate(self.providers):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
latency = (time.monotonic() - start) * 1000
is_fallback = i > 0
if is_fallback:
logger.warning(
"Fallback activated: %s → %s (chain: %s)",
self.providers[0][0], provider_name, self.name,
)
return FallbackResult(
value=result,
source=provider_name,
latency_ms=latency,
is_fallback=is_fallback,
)
except Exception as exc:
errors.append((provider_name, exc))
logger.warning(
"Provider %s failed in chain %s: %s",
provider_name, self.name, exc,
)
# All providers exhausted
error_summary = "; ".join(
f"{name}: {exc}" for name, exc in errors
)
raise AllFallbacksExhaustedError(
f"All providers failed for '{self.name}': {error_summary}"
)
class AllFallbacksExhaustedError(Exception):
pass
Practical Usage: Product Recommendations
import redis.asyncio as redis
from sqlalchemy.ext.asyncio import AsyncSession
redis_client = redis.Redis()
async def live_recommendations(user_id: str) -> list[dict]:
"""Call ML recommendation service."""
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(
f"http://rec-service/recommend/{user_id}"
)
resp.raise_for_status()
return resp.json()["items"]
async def cached_recommendations(user_id: str) -> list[dict]:
"""Fetch last-known recommendations from Redis."""
import json
data = await redis_client.get(f"recs:{user_id}")
if data is None:
raise ValueError("No cached recommendations")
return json.loads(data)
async def popular_in_category(user_id: str) -> list[dict]:
"""Fetch popular items (no personalization)."""
async with httpx.AsyncClient(timeout=2.0) as client:
resp = await client.get("http://catalog/popular")
resp.raise_for_status()
return resp.json()["items"][:10]
async def static_bestsellers(user_id: str) -> list[dict]:
"""Hard-coded bestseller list, updated daily."""
return [
{"id": "BST001", "name": "Universal Widget", "score": 0.99},
{"id": "BST002", "name": "Premium Gadget", "score": 0.95},
]
# Build the chain
recommendations_chain = (
FallbackChain("recommendations")
.add("ml_service", live_recommendations)
.add("redis_cache", cached_recommendations)
.add("popular_items", popular_in_category)
.add("static_bestsellers", static_bestsellers)
)
Fallback with Timeout Budgets
Combine fallbacks with a total time budget so degraded responses still arrive quickly:
async def execute_with_budget(
chain: FallbackChain,
total_timeout: float,
*args: Any,
**kwargs: Any,
) -> FallbackResult:
"""Run fallback chain with a total time budget."""
deadline = time.monotonic() + total_timeout
for i, (name, func) in enumerate(chain.providers):
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=min(remaining, remaining / (len(chain.providers) - i)),
)
return FallbackResult(
value=result,
source=name,
latency_ms=(time.monotonic() + total_timeout - deadline) * 1000,
is_fallback=i > 0,
)
except (asyncio.TimeoutError, Exception) as exc:
logger.warning("Provider %s failed or timed out: %s", name, exc)
raise AllFallbacksExhaustedError(
f"All providers failed within {total_timeout}s budget"
)
FastAPI Integration with Response Headers
Let callers know when they’re getting fallback data:
from fastapi import FastAPI, Response
app = FastAPI()
@app.get("/api/recommendations/{user_id}")
async def get_recommendations(user_id: str, response: Response):
result = await recommendations_chain.execute(user_id)
if result.is_fallback:
response.headers["X-Fallback-Source"] = result.source
response.headers["X-Data-Freshness"] = "degraded"
return {
"items": result.value,
"source": result.source,
"latency_ms": round(result.latency_ms, 2),
}
Frontend code can check X-Fallback-Source to show a subtle “Showing cached results” banner.
Django Middleware for Automatic Fallbacks
For Django views, a middleware can wrap responses with fallback behavior:
import functools
def with_fallback(fallback_func):
"""Decorator that catches exceptions and calls a fallback."""
def decorator(view_func):
@functools.wraps(view_func)
async def wrapper(*args, **kwargs):
try:
return await view_func(*args, **kwargs)
except Exception as exc:
logger.warning(
"View %s failed, using fallback: %s",
view_func.__name__, exc,
)
return await fallback_func(*args, **kwargs)
return wrapper
return decorator
# Usage in Django async views
@with_fallback(serve_cached_dashboard)
async def dashboard_view(request):
data = await fetch_live_analytics()
return render(request, "dashboard.html", {"data": data})
Monitoring Fallback Health
Track fallback activation rates to catch degradation before users complain:
from prometheus_client import Counter, Histogram
fallback_activations = Counter(
"fallback_activations_total",
"Number of times a fallback was used",
["chain_name", "provider_name"],
)
fallback_latency = Histogram(
"fallback_latency_seconds",
"Latency of fallback responses",
["chain_name", "provider_name"],
)
class MonitoredFallbackChain(FallbackChain):
async def execute(self, *args, **kwargs):
result = await super().execute(*args, **kwargs)
if result.is_fallback:
fallback_activations.labels(
chain_name=self.name,
provider_name=result.source,
).inc()
fallback_latency.labels(
chain_name=self.name,
provider_name=result.source,
).observe(result.latency_ms / 1000)
return result
Set alerts on:
- Fallback activation rate > 5% over 5 minutes → investigate primary service
- All-fallbacks-exhausted errors > 0 → critical, user-facing failures
- Fallback latency increase → the fallback service itself is struggling
Testing Fallback Chains
Test that each fallback level works independently and that the chain degrades correctly:
import pytest
@pytest.mark.asyncio
async def test_primary_succeeds():
chain = (
FallbackChain("test")
.add("primary", mock_success("primary_data"))
.add("fallback", mock_success("fallback_data"))
)
result = await chain.execute()
assert result.value == "primary_data"
assert result.is_fallback is False
@pytest.mark.asyncio
async def test_falls_to_second_provider():
chain = (
FallbackChain("test")
.add("primary", mock_failure(ConnectionError))
.add("fallback", mock_success("fallback_data"))
)
result = await chain.execute()
assert result.value == "fallback_data"
assert result.source == "fallback"
assert result.is_fallback is True
@pytest.mark.asyncio
async def test_all_fail_raises():
chain = (
FallbackChain("test")
.add("a", mock_failure(ConnectionError))
.add("b", mock_failure(TimeoutError))
)
with pytest.raises(AllFallbacksExhaustedError):
await chain.execute()
def mock_success(value):
async def func(*args, **kwargs):
return value
return func
def mock_failure(exc_type):
async def func(*args, **kwargs):
raise exc_type("simulated failure")
return func
Pitfalls
-
Fallback storms. When the primary fails, all traffic hits the fallback simultaneously. If the fallback can’t handle full traffic, it fails too — cascading the problem. Solution: the fallback should be cheap and scalable (cache, static data).
-
Stale fallback data. If cached data is months old, it might be worse than an error. Tag cached entries with timestamps and set maximum staleness thresholds.
-
Invisible degradation. Without monitoring, your app could run on fallbacks for weeks and nobody notices. Always emit metrics and alerts when fallbacks activate.
-
Testing only the happy path. If you never test your fallbacks, they’ll be broken when you need them most. Include fallback scenarios in your integration test suite and run chaos engineering experiments.
One thing to remember: A fallback chain is only as strong as its weakest tested link. Build each level to be independently reliable, monitor activation rates obsessively, and treat sustained fallback usage as an incident — not a feature.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.