Python Read Replica Patterns — Deep Dive
SQLAlchemy read replica routing
SQLAlchemy supports multiple database binds, making it straightforward to route reads and writes to different servers:
from contextlib import contextmanager
from sqlalchemy import create_engine, event
from sqlalchemy.orm import Session, sessionmaker
# Create engines
primary_engine = create_engine(
"postgresql://primary-host/mydb",
pool_size=20,
max_overflow=10,
)
replica_engines = [
create_engine(f"postgresql://replica-{i}/mydb", pool_size=30, max_overflow=15)
for i in range(3)
]
PrimarySession = sessionmaker(bind=primary_engine)
class ReplicaRouter:
"""Round-robin router across read replicas."""
def __init__(self, engines: list):
self._engines = engines
self._index = 0
def get_engine(self):
engine = self._engines[self._index % len(self._engines)]
self._index += 1
return engine
def get_session(self) -> Session:
engine = self.get_engine()
factory = sessionmaker(bind=engine)
return factory()
replica_router = ReplicaRouter(replica_engines)
@contextmanager
def read_session():
"""Get a session bound to a read replica."""
session = replica_router.get_session()
try:
yield session
finally:
session.close()
@contextmanager
def write_session():
"""Get a session bound to the primary."""
session = PrimarySession()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
Read-your-own-writes pattern
After a write, route that user’s reads to the primary for a short window to guarantee they see their own changes:
import time
import threading
from typing import Optional
_write_timestamps: dict[str, float] = {}
_lock = threading.Lock()
STICKY_WINDOW = 5.0 # seconds after write to use primary for reads
def record_write(user_id: str) -> None:
"""Mark that a user just performed a write."""
with _lock:
_write_timestamps[user_id] = time.monotonic()
def should_read_primary(user_id: str) -> bool:
"""Check if this user should read from primary (recent write)."""
with _lock:
last_write = _write_timestamps.get(user_id)
if last_write is None:
return False
if time.monotonic() - last_write < STICKY_WINDOW:
return True
# Clean up expired entry
del _write_timestamps[user_id]
return False
@contextmanager
def smart_read_session(user_id: Optional[str] = None):
"""Route to primary if user recently wrote, otherwise to replica."""
if user_id and should_read_primary(user_id):
session = PrimarySession()
else:
session = replica_router.get_session()
try:
yield session
finally:
session.close()
# Usage in a FastAPI endpoint
async def get_user_profile(user_id: str, current_user_id: str):
with smart_read_session(user_id=current_user_id) as session:
return session.execute(
"SELECT * FROM users WHERE id = :id", {"id": user_id}
).fetchone()
Lag-aware routing
Monitor replication lag and route reads away from lagging replicas:
from dataclasses import dataclass
@dataclass
class ReplicaHealth:
engine: any
name: str
lag_seconds: float = 0.0
healthy: bool = True
class LagAwareRouter:
"""Routes reads only to replicas within acceptable lag thresholds."""
def __init__(self, replicas: list[ReplicaHealth], max_lag: float = 2.0):
self.replicas = replicas
self.max_lag = max_lag
self._index = 0
def update_lag(self, name: str, lag_seconds: float) -> None:
"""Update lag measurement for a replica (call from monitoring loop)."""
for r in self.replicas:
if r.name == name:
r.lag_seconds = lag_seconds
r.healthy = lag_seconds < self.max_lag
break
def get_engine(self):
"""Get the next healthy replica engine."""
healthy = [r for r in self.replicas if r.healthy]
if not healthy:
# All replicas lagging — fall back to primary
return primary_engine
replica = healthy[self._index % len(healthy)]
self._index += 1
return replica.engine
def monitor_replication_lag(router: LagAwareRouter, check_interval: float = 1.0):
"""Background thread that checks replication lag on each replica."""
import threading
def _check():
while True:
for replica in router.replicas:
try:
factory = sessionmaker(bind=replica.engine)
session = factory()
# PostgreSQL-specific lag query
result = session.execute("""
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
AS lag_seconds
""").scalar()
lag = float(result) if result else 0.0
router.update_lag(replica.name, lag)
session.close()
except Exception:
router.update_lag(replica.name, float("inf"))
time.sleep(check_interval)
thread = threading.Thread(target=_check, daemon=True)
thread.start()
Django database router
Django has built-in support for multiple databases. A custom router directs reads to replicas:
import random
import threading
_recent_writers = threading.local()
class ReadReplicaRouter:
"""Django database router for primary/replica topology."""
replica_dbs = ["replica_1", "replica_2", "replica_3"]
def db_for_read(self, model, **hints):
# Check read-your-own-writes
user_id = getattr(_recent_writers, "user_id", None)
if user_id and should_read_primary(user_id):
return "default" # Primary
return random.choice(self.replica_dbs)
def db_for_write(self, model, **hints):
return "default" # Always write to primary
def allow_relation(self, obj1, obj2, **hints):
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
return db == "default" # Only migrate on primary
Django settings:
DATABASES = {
"default": {"ENGINE": "django.db.backends.postgresql", "HOST": "primary"},
"replica_1": {"ENGINE": "django.db.backends.postgresql", "HOST": "replica-1"},
"replica_2": {"ENGINE": "django.db.backends.postgresql", "HOST": "replica-2"},
"replica_3": {"ENGINE": "django.db.backends.postgresql", "HOST": "replica-3"},
}
DATABASE_ROUTERS = ["myapp.routers.ReadReplicaRouter"]
Automatic failover
When the primary fails, a replica can be promoted. Your application needs to handle the transition:
class FailoverManager:
"""Monitors primary health and promotes a replica on failure."""
def __init__(self, primary_engine, replica_engines: list, check_interval: float = 2.0):
self.primary = primary_engine
self.replicas = list(replica_engines)
self.check_interval = check_interval
self._active_primary = primary_engine
self._failed = False
@property
def current_primary(self):
return self._active_primary
def health_check(self) -> bool:
"""Check if the primary is responsive."""
try:
with self._active_primary.connect() as conn:
conn.execute("SELECT 1")
return True
except Exception:
return False
def promote_replica(self) -> bool:
"""Promote the most up-to-date replica to primary."""
if not self.replicas:
return False
# In practice, you'd check pg_last_xact_replay_timestamp()
# to find the most caught-up replica
new_primary = self.replicas.pop(0)
self._active_primary = new_primary
self._failed = True
# Application should reconnect all write sessions
return True
def run_monitor(self):
"""Background monitoring loop."""
consecutive_failures = 0
while True:
if self.health_check():
consecutive_failures = 0
else:
consecutive_failures += 1
if consecutive_failures >= 3:
self.promote_replica()
consecutive_failures = 0
time.sleep(self.check_interval)
Connection pooling with replicas
Use PgBouncer or application-level pooling to manage connections across all databases:
# Production-grade pool configuration
from sqlalchemy.pool import QueuePool
def create_pooled_engine(url: str, role: str = "replica") -> Engine:
"""Create engine with role-appropriate pool settings."""
if role == "primary":
return create_engine(
url,
poolclass=QueuePool,
pool_size=30, # More connections for writes
max_overflow=20,
pool_recycle=1800,
pool_pre_ping=True, # Detect stale connections
)
return create_engine(
url,
poolclass=QueuePool,
pool_size=50, # More connections for reads
max_overflow=30,
pool_recycle=1800,
pool_pre_ping=True,
)
Monitoring
Key metrics to track:
from prometheus_client import Gauge, Histogram, Counter
replication_lag = Gauge(
"db_replication_lag_seconds", "Replication lag per replica",
["replica"],
)
read_routing = Counter(
"db_read_route_total", "Read query routing decisions",
["target"], # primary, replica_1, replica_2, etc.
)
failover_events = Counter(
"db_failover_total", "Database failover events",
)
Alert on:
- Replication lag > 5 seconds — reads may return significantly stale data.
- All replicas unhealthy — reads fall back to primary, increasing its load.
- Failover event — requires manual verification that the promoted replica is healthy.
The one thing to remember: read replica routing in Python requires three decisions — which queries can tolerate lag, how to handle read-your-own-writes, and what happens when a replica falls behind — nail these three and your database scales effortlessly for reads.
See Also
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
- Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
- Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
- Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.
- Python 312 New Features Python 3.12 made type hints shorter, f-strings more powerful, and started preparing Python's engine for a world without the GIL.