Python Sharding Strategies — Deep Dive

Shard router implementation

The shard router maps a shard key to a specific database connection. This sits between your application logic and the database layer:

import hashlib
from typing import Any, Optional
from dataclasses import dataclass, field

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker


@dataclass
class ShardConfig:
    name: str
    url: str
    weight: int = 1  # For weighted distribution


class ShardRouter:
    """Routes queries to the correct database shard."""

    def __init__(self, shards: list[ShardConfig], strategy: str = "hash"):
        self.shards = shards
        self.strategy = strategy
        self.engines: dict[str, Engine] = {}
        self.session_factories: dict[str, sessionmaker] = {}

        for shard in shards:
            engine = create_engine(
                shard.url,
                pool_size=20,
                max_overflow=10,
                pool_recycle=3600,
            )
            self.engines[shard.name] = engine
            self.session_factories[shard.name] = sessionmaker(bind=engine)

    def get_shard(self, shard_key: Any) -> str:
        """Determine which shard owns a key."""
        if self.strategy == "hash":
            return self._hash_shard(str(shard_key))
        elif self.strategy == "range":
            return self._range_shard(int(shard_key))
        raise ValueError(f"Unknown strategy: {self.strategy}")

    def _hash_shard(self, key: str) -> str:
        h = int(hashlib.md5(key.encode()).hexdigest()[:8], 16)
        idx = h % len(self.shards)
        return self.shards[idx].name

    def _range_shard(self, key: int) -> str:
        # Evenly divide the key space
        range_size = 2**32 // len(self.shards)
        idx = min(key // range_size, len(self.shards) - 1)
        return self.shards[idx].name

    def session(self, shard_key: Any) -> Session:
        """Get a database session for the correct shard."""
        shard_name = self.get_shard(shard_key)
        return self.session_factories[shard_name]()

    def all_sessions(self) -> list[tuple[str, Session]]:
        """Get sessions for all shards (for fan-out queries)."""
        return [
            (name, factory()) for name, factory in self.session_factories.items()
        ]

Usage in application code

# Setup
shards = [
    ShardConfig("shard-0", "postgresql://host0/app"),
    ShardConfig("shard-1", "postgresql://host1/app"),
    ShardConfig("shard-2", "postgresql://host2/app"),
    ShardConfig("shard-3", "postgresql://host3/app"),
]
router = ShardRouter(shards, strategy="hash")


# Single-shard operation
def get_user(user_id: str) -> Optional[dict]:
    session = router.session(shard_key=user_id)
    try:
        row = session.execute(
            "SELECT * FROM users WHERE id = :id", {"id": user_id}
        ).fetchone()
        return dict(row._mapping) if row else None
    finally:
        session.close()


# Cross-shard aggregation
def count_all_users() -> int:
    total = 0
    for shard_name, session in router.all_sessions():
        try:
            result = session.execute("SELECT COUNT(*) FROM users").scalar()
            total += result
        finally:
            session.close()
    return total

Tenant-based sharding for SaaS

Multi-tenant applications often shard by tenant ID, keeping all of a tenant’s data on one shard:

from functools import lru_cache


class TenantShardRouter:
    """Directory-based sharding using a tenant mapping table."""

    def __init__(self, directory_url: str, shard_configs: dict[str, str]):
        self.directory_engine = create_engine(directory_url)
        self.shard_engines: dict[str, Engine] = {
            name: create_engine(url) for name, url in shard_configs.items()
        }

    @lru_cache(maxsize=10_000)
    def get_shard_for_tenant(self, tenant_id: str) -> str:
        """Look up tenant's shard in the directory."""
        with self.directory_engine.connect() as conn:
            result = conn.execute(
                "SELECT shard_name FROM tenant_directory WHERE tenant_id = :tid",
                {"tid": tenant_id},
            ).fetchone()
            if result is None:
                return self._assign_new_tenant(tenant_id)
            return result[0]

    def _assign_new_tenant(self, tenant_id: str) -> str:
        """Assign new tenant to the least-loaded shard."""
        with self.directory_engine.connect() as conn:
            result = conn.execute("""
                SELECT shard_name, COUNT(*) as cnt
                FROM tenant_directory
                GROUP BY shard_name
                ORDER BY cnt ASC
                LIMIT 1
            """).fetchone()
            shard = result[0] if result else list(self.shard_engines.keys())[0]

            conn.execute(
                "INSERT INTO tenant_directory (tenant_id, shard_name) VALUES (:tid, :sn)",
                {"tid": tenant_id, "sn": shard},
            )
            conn.commit()
            return shard

Shard-aware Django middleware

For Django projects, a database router can transparently direct queries to the correct shard:

import threading

_shard_context = threading.local()


def set_shard(shard_key: str) -> None:
    """Set the shard context for the current request."""
    _shard_context.key = shard_key


class ShardDatabaseRouter:
    """Django database router that uses thread-local shard context."""

    shard_map = {
        0: "shard_0",
        1: "shard_1",
        2: "shard_2",
        3: "shard_3",
    }

    def _get_shard_db(self) -> Optional[str]:
        key = getattr(_shard_context, "key", None)
        if key is None:
            return None
        h = int(hashlib.md5(key.encode()).hexdigest()[:8], 16)
        return self.shard_map[h % len(self.shard_map)]

    def db_for_read(self, model, **hints):
        if hasattr(model, "SHARDED") and model.SHARDED:
            return self._get_shard_db()
        return "default"

    def db_for_write(self, model, **hints):
        if hasattr(model, "SHARDED") and model.SHARDED:
            return self._get_shard_db()
        return "default"

Rebalancing shards

When shards become uneven, data must migrate between them. A safe rebalancing process:

from enum import Enum


class MigrationState(Enum):
    PENDING = "pending"
    COPYING = "copying"
    VERIFYING = "verifying"
    SWITCHING = "switching"
    DONE = "done"


def rebalance_key(
    router: ShardRouter,
    key: str,
    source_shard: str,
    target_shard: str,
) -> MigrationState:
    """Migrate a single key from one shard to another."""
    source = router.session_factories[source_shard]()
    target = router.session_factories[target_shard]()

    try:
        # Phase 1: Copy data to target
        row = source.execute(
            "SELECT * FROM users WHERE id = :id", {"id": key}
        ).fetchone()
        if row is None:
            return MigrationState.DONE

        data = dict(row._mapping)
        target.execute(
            "INSERT INTO users (id, name, email) VALUES (:id, :name, :email) "
            "ON CONFLICT (id) DO UPDATE SET name=:name, email=:email",
            data,
        )
        target.commit()

        # Phase 2: Verify consistency
        target_row = target.execute(
            "SELECT * FROM users WHERE id = :id", {"id": key}
        ).fetchone()
        if dict(target_row._mapping) != data:
            return MigrationState.VERIFYING  # Retry needed

        # Phase 3: Update directory (if using directory-based sharding)
        # Phase 4: Delete from source (optional, can keep as backup)
        source.execute("DELETE FROM users WHERE id = :id", {"id": key})
        source.commit()

        return MigrationState.DONE
    finally:
        source.close()
        target.close()

Monitoring shard health

Track per-shard metrics to detect imbalances early:

from prometheus_client import Gauge, Histogram

shard_rows = Gauge(
    "shard_row_count", "Number of rows per shard",
    ["shard", "table"],
)
shard_qps = Gauge(
    "shard_queries_per_second", "Query rate per shard",
    ["shard"],
)
shard_latency = Histogram(
    "shard_query_seconds", "Query latency per shard",
    ["shard", "operation"],
)

Alert thresholds:

  • Row count skew > 30% between largest and smallest shard.
  • QPS skew > 50% indicates hot-spot shard key.
  • P99 latency divergence between shards suggests resource contention on one shard.

When not to shard

Before sharding, exhaust simpler options:

  1. Index optimization — proper indexes often eliminate the need for sharding.
  2. Read replicas — offload read traffic without splitting data.
  3. Connection pooling — tools like PgBouncer handle thousands of connections.
  4. Caching — Redis in front of the database handles most read scaling needs.
  5. Vertical scaling — a larger database server is simpler than distributed data.

Sharding should be a last resort, not a first choice. The operational cost is significant: schema migrations across shards, cross-shard query complexity, backup coordination, and monitoring per shard.

The one thing to remember: sharding multiplies your database capacity horizontally, but demands careful shard key selection, routing logic, and rebalancing strategies — only add this complexity when simpler scaling options are genuinely exhausted.

pythondatabasesdistributed-systems

See Also

  • Python Consistent Hashing Understand consistent hashing with a pizza delivery analogy that shows how Python distributes data across servers gracefully.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
  • Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
  • Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.