Python Read Replica Patterns — Deep Dive

SQLAlchemy read replica routing

SQLAlchemy supports multiple database binds, making it straightforward to route reads and writes to different servers:

from contextlib import contextmanager
from sqlalchemy import create_engine, event
from sqlalchemy.orm import Session, sessionmaker

# Create engines
primary_engine = create_engine(
    "postgresql://primary-host/mydb",
    pool_size=20,
    max_overflow=10,
)

replica_engines = [
    create_engine(f"postgresql://replica-{i}/mydb", pool_size=30, max_overflow=15)
    for i in range(3)
]

PrimarySession = sessionmaker(bind=primary_engine)


class ReplicaRouter:
    """Round-robin router across read replicas."""

    def __init__(self, engines: list):
        self._engines = engines
        self._index = 0

    def get_engine(self):
        engine = self._engines[self._index % len(self._engines)]
        self._index += 1
        return engine

    def get_session(self) -> Session:
        engine = self.get_engine()
        factory = sessionmaker(bind=engine)
        return factory()


replica_router = ReplicaRouter(replica_engines)


@contextmanager
def read_session():
    """Get a session bound to a read replica."""
    session = replica_router.get_session()
    try:
        yield session
    finally:
        session.close()


@contextmanager
def write_session():
    """Get a session bound to the primary."""
    session = PrimarySession()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

Read-your-own-writes pattern

After a write, route that user’s reads to the primary for a short window to guarantee they see their own changes:

import time
import threading
from typing import Optional

_write_timestamps: dict[str, float] = {}
_lock = threading.Lock()

STICKY_WINDOW = 5.0  # seconds after write to use primary for reads


def record_write(user_id: str) -> None:
    """Mark that a user just performed a write."""
    with _lock:
        _write_timestamps[user_id] = time.monotonic()


def should_read_primary(user_id: str) -> bool:
    """Check if this user should read from primary (recent write)."""
    with _lock:
        last_write = _write_timestamps.get(user_id)
        if last_write is None:
            return False
        if time.monotonic() - last_write < STICKY_WINDOW:
            return True
        # Clean up expired entry
        del _write_timestamps[user_id]
        return False


@contextmanager
def smart_read_session(user_id: Optional[str] = None):
    """Route to primary if user recently wrote, otherwise to replica."""
    if user_id and should_read_primary(user_id):
        session = PrimarySession()
    else:
        session = replica_router.get_session()
    try:
        yield session
    finally:
        session.close()


# Usage in a FastAPI endpoint
async def get_user_profile(user_id: str, current_user_id: str):
    with smart_read_session(user_id=current_user_id) as session:
        return session.execute(
            "SELECT * FROM users WHERE id = :id", {"id": user_id}
        ).fetchone()

Lag-aware routing

Monitor replication lag and route reads away from lagging replicas:

from dataclasses import dataclass


@dataclass
class ReplicaHealth:
    engine: any
    name: str
    lag_seconds: float = 0.0
    healthy: bool = True


class LagAwareRouter:
    """Routes reads only to replicas within acceptable lag thresholds."""

    def __init__(self, replicas: list[ReplicaHealth], max_lag: float = 2.0):
        self.replicas = replicas
        self.max_lag = max_lag
        self._index = 0

    def update_lag(self, name: str, lag_seconds: float) -> None:
        """Update lag measurement for a replica (call from monitoring loop)."""
        for r in self.replicas:
            if r.name == name:
                r.lag_seconds = lag_seconds
                r.healthy = lag_seconds < self.max_lag
                break

    def get_engine(self):
        """Get the next healthy replica engine."""
        healthy = [r for r in self.replicas if r.healthy]
        if not healthy:
            # All replicas lagging — fall back to primary
            return primary_engine

        replica = healthy[self._index % len(healthy)]
        self._index += 1
        return replica.engine


def monitor_replication_lag(router: LagAwareRouter, check_interval: float = 1.0):
    """Background thread that checks replication lag on each replica."""
    import threading

    def _check():
        while True:
            for replica in router.replicas:
                try:
                    factory = sessionmaker(bind=replica.engine)
                    session = factory()
                    # PostgreSQL-specific lag query
                    result = session.execute("""
                        SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))
                        AS lag_seconds
                    """).scalar()
                    lag = float(result) if result else 0.0
                    router.update_lag(replica.name, lag)
                    session.close()
                except Exception:
                    router.update_lag(replica.name, float("inf"))
            time.sleep(check_interval)

    thread = threading.Thread(target=_check, daemon=True)
    thread.start()

Django database router

Django has built-in support for multiple databases. A custom router directs reads to replicas:

import random
import threading

_recent_writers = threading.local()


class ReadReplicaRouter:
    """Django database router for primary/replica topology."""

    replica_dbs = ["replica_1", "replica_2", "replica_3"]

    def db_for_read(self, model, **hints):
        # Check read-your-own-writes
        user_id = getattr(_recent_writers, "user_id", None)
        if user_id and should_read_primary(user_id):
            return "default"  # Primary
        return random.choice(self.replica_dbs)

    def db_for_write(self, model, **hints):
        return "default"  # Always write to primary

    def allow_relation(self, obj1, obj2, **hints):
        return True

    def allow_migrate(self, db, app_label, model_name=None, **hints):
        return db == "default"  # Only migrate on primary

Django settings:

DATABASES = {
    "default": {"ENGINE": "django.db.backends.postgresql", "HOST": "primary"},
    "replica_1": {"ENGINE": "django.db.backends.postgresql", "HOST": "replica-1"},
    "replica_2": {"ENGINE": "django.db.backends.postgresql", "HOST": "replica-2"},
    "replica_3": {"ENGINE": "django.db.backends.postgresql", "HOST": "replica-3"},
}
DATABASE_ROUTERS = ["myapp.routers.ReadReplicaRouter"]

Automatic failover

When the primary fails, a replica can be promoted. Your application needs to handle the transition:

class FailoverManager:
    """Monitors primary health and promotes a replica on failure."""

    def __init__(self, primary_engine, replica_engines: list, check_interval: float = 2.0):
        self.primary = primary_engine
        self.replicas = list(replica_engines)
        self.check_interval = check_interval
        self._active_primary = primary_engine
        self._failed = False

    @property
    def current_primary(self):
        return self._active_primary

    def health_check(self) -> bool:
        """Check if the primary is responsive."""
        try:
            with self._active_primary.connect() as conn:
                conn.execute("SELECT 1")
            return True
        except Exception:
            return False

    def promote_replica(self) -> bool:
        """Promote the most up-to-date replica to primary."""
        if not self.replicas:
            return False

        # In practice, you'd check pg_last_xact_replay_timestamp()
        # to find the most caught-up replica
        new_primary = self.replicas.pop(0)
        self._active_primary = new_primary
        self._failed = True

        # Application should reconnect all write sessions
        return True

    def run_monitor(self):
        """Background monitoring loop."""
        consecutive_failures = 0
        while True:
            if self.health_check():
                consecutive_failures = 0
            else:
                consecutive_failures += 1
                if consecutive_failures >= 3:
                    self.promote_replica()
                    consecutive_failures = 0
            time.sleep(self.check_interval)

Connection pooling with replicas

Use PgBouncer or application-level pooling to manage connections across all databases:

# Production-grade pool configuration
from sqlalchemy.pool import QueuePool

def create_pooled_engine(url: str, role: str = "replica") -> Engine:
    """Create engine with role-appropriate pool settings."""
    if role == "primary":
        return create_engine(
            url,
            poolclass=QueuePool,
            pool_size=30,      # More connections for writes
            max_overflow=20,
            pool_recycle=1800,
            pool_pre_ping=True,  # Detect stale connections
        )
    return create_engine(
        url,
        poolclass=QueuePool,
        pool_size=50,      # More connections for reads
        max_overflow=30,
        pool_recycle=1800,
        pool_pre_ping=True,
    )

Monitoring

Key metrics to track:

from prometheus_client import Gauge, Histogram, Counter

replication_lag = Gauge(
    "db_replication_lag_seconds", "Replication lag per replica",
    ["replica"],
)
read_routing = Counter(
    "db_read_route_total", "Read query routing decisions",
    ["target"],  # primary, replica_1, replica_2, etc.
)
failover_events = Counter(
    "db_failover_total", "Database failover events",
)

Alert on:

  • Replication lag > 5 seconds — reads may return significantly stale data.
  • All replicas unhealthy — reads fall back to primary, increasing its load.
  • Failover event — requires manual verification that the promoted replica is healthy.

The one thing to remember: read replica routing in Python requires three decisions — which queries can tolerate lag, how to handle read-your-own-writes, and what happens when a replica falls behind — nail these three and your database scales effortlessly for reads.

pythondatabasesscaling

See Also

  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
  • Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
  • Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.
  • Python 312 New Features Python 3.12 made type hints shorter, f-strings more powerful, and started preparing Python's engine for a world without the GIL.