Python Multitenancy Patterns — Deep Dive

Shared Schema: Row-Level Tenant Isolation

The most common model. Every table has a tenant_id and every query must filter by it. The danger: one forgotten filter exposes another tenant’s data.

SQLAlchemy Automatic Tenant Filtering

Use SQLAlchemy events to automatically inject tenant filters:

from sqlalchemy import event, Column, String, create_engine
from sqlalchemy.orm import Session, DeclarativeBase, Query
from contextvars import ContextVar

# Thread-safe tenant context
current_tenant: ContextVar[str] = ContextVar("current_tenant")

class Base(DeclarativeBase):
    pass

class TenantMixin:
    """Add to every model that needs tenant isolation."""
    tenant_id = Column(String(64), nullable=False, index=True)

class Project(TenantMixin, Base):
    __tablename__ = "projects"
    id = Column(String(36), primary_key=True)
    name = Column(String(255))

class Task(TenantMixin, Base):
    __tablename__ = "tasks"
    id = Column(String(36), primary_key=True)
    title = Column(String(255))
    project_id = Column(String(36))

# Automatically add tenant_id filter to all queries
@event.listens_for(Query, "before_compile", retval=True)
def add_tenant_filter(query):
    try:
        tenant_id = current_tenant.get()
    except LookupError:
        raise RuntimeError("No tenant context set — potential data leak!")

    for column_desc in query.column_descriptions:
        entity = column_desc.get("entity")
        if entity and hasattr(entity, "tenant_id"):
            query = query.filter(entity.tenant_id == tenant_id)
    return query

# Automatically set tenant_id on new objects
@event.listens_for(Session, "before_flush")
def set_tenant_on_insert(session, flush_context, instances):
    try:
        tenant_id = current_tenant.get()
    except LookupError:
        return

    for obj in session.new:
        if hasattr(obj, "tenant_id") and not obj.tenant_id:
            obj.tenant_id = tenant_id

The ContextVar stores the tenant ID for the current request. The before_compile event adds the filter to every query automatically — you can’t forget it.

FastAPI Middleware for Tenant Context

from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware

class TenantMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Resolve tenant from subdomain
        host = request.headers.get("host", "")
        subdomain = host.split(".")[0] if "." in host else None

        if not subdomain or subdomain in ("www", "api", "app"):
            raise HTTPException(400, "Tenant not identified")

        tenant = await resolve_tenant(subdomain)
        if not tenant:
            raise HTTPException(404, "Unknown tenant")

        # Set context for this request
        token = current_tenant.set(tenant.id)
        request.state.tenant = tenant
        try:
            response = await call_next(request)
            return response
        finally:
            current_tenant.reset(token)

async def resolve_tenant(subdomain: str):
    """Look up tenant by subdomain, with caching."""
    cache_key = f"tenant:{subdomain}"
    cached = await redis.get(cache_key)
    if cached:
        return Tenant.from_json(cached)

    tenant = await db.fetch_one(
        "SELECT * FROM tenants WHERE subdomain = $1 AND active = true",
        subdomain,
    )
    if tenant:
        await redis.set(cache_key, tenant.to_json(), ex=300)
    return tenant

Schema-Per-Tenant with Django

Django’s database routing makes schema-per-tenant straightforward with django-tenants:

# settings.py
DATABASES = {
    "default": {
        "ENGINE": "django_tenants.postgresql_backend",
        "NAME": "myapp",
        "HOST": "localhost",
    }
}

DATABASE_ROUTERS = ("django_tenants.routers.TenantSyncRouter",)

TENANT_MODEL = "tenants.Tenant"
TENANT_DOMAIN_MODEL = "tenants.Domain"

SHARED_APPS = [
    "django_tenants",
    "tenants",        # Tenant management (public schema)
    "django.contrib.auth",
]

TENANT_APPS = [
    "projects",       # Each tenant gets their own tables
    "tasks",
    "billing",
]

# models.py
from django_tenants.models import TenantMixin, DomainMixin

class Tenant(TenantMixin):
    name = models.CharField(max_length=100)
    plan = models.CharField(max_length=20, default="free")
    created_at = models.DateTimeField(auto_now_add=True)

    auto_create_schema = True

class Domain(DomainMixin):
    pass

When a request arrives at acme.yourapp.com, the middleware sets the PostgreSQL search_path to the acme schema. All queries automatically target that schema’s tables.

Migration Across Schemas

# Migrate shared apps (public schema)
python manage.py migrate_schemas --shared

# Migrate tenant apps (all tenant schemas)
python manage.py migrate_schemas --tenant

# Migrate a specific tenant
python manage.py migrate_schemas --tenant --schema=acme

For hundreds of tenants, run migrations in parallel:

from concurrent.futures import ThreadPoolExecutor
from django.db import connections

def migrate_tenant(schema_name: str):
    connection = connections["default"]
    connection.set_schema(schema_name)
    call_command("migrate", verbosity=0)
    connection.set_schema_to_public()

schemas = Tenant.objects.values_list("schema_name", flat=True)
with ThreadPoolExecutor(max_workers=4) as pool:
    pool.map(migrate_tenant, schemas)

Database-Per-Tenant Routing

For maximum isolation, each tenant connects to their own database:

from contextlib import asynccontextmanager
import asyncpg

class TenantDatabaseRouter:
    def __init__(self):
        self._pools: dict[str, asyncpg.Pool] = {}
        self._pool_lock = asyncio.Lock()

    async def get_pool(self, tenant_id: str) -> asyncpg.Pool:
        if tenant_id not in self._pools:
            async with self._pool_lock:
                if tenant_id not in self._pools:
                    config = await self._get_db_config(tenant_id)
                    self._pools[tenant_id] = await asyncpg.create_pool(
                        dsn=config.dsn,
                        min_size=2,
                        max_size=10,
                    )
        return self._pools[tenant_id]

    async def _get_db_config(self, tenant_id: str) -> DatabaseConfig:
        """Fetch database connection details for a tenant."""
        # From a central registry (could be a config service, 
        # Consul, or a management database)
        return await registry.get_database(tenant_id)

    @asynccontextmanager
    async def connection(self, tenant_id: str):
        pool = await self.get_pool(tenant_id)
        async with pool.acquire() as conn:
            yield conn

    async def shutdown(self):
        for pool in self._pools.values():
            await pool.close()

# Usage:
router = TenantDatabaseRouter()

async def get_projects(tenant_id: str):
    async with router.connection(tenant_id) as conn:
        return await conn.fetch("SELECT * FROM projects")

Connection Pool Management

With database-per-tenant, connection pools can explode. If you have 500 tenants with min_size=2, that’s 1,000 idle connections. Strategies:

class LazyPoolRouter:
    """Pools are created on demand and evicted when idle."""
    
    def __init__(self, max_pools: int = 50, idle_timeout: float = 300):
        self._pools: dict[str, tuple[asyncpg.Pool, float]] = {}
        self._max_pools = max_pools
        self._idle_timeout = idle_timeout

    async def get_pool(self, tenant_id: str) -> asyncpg.Pool:
        if tenant_id in self._pools:
            pool, _ = self._pools[tenant_id]
            self._pools[tenant_id] = (pool, time.monotonic())
            return pool

        # Evict idle pools if at capacity
        if len(self._pools) >= self._max_pools:
            await self._evict_idle()

        pool = await self._create_pool(tenant_id)
        self._pools[tenant_id] = (pool, time.monotonic())
        return pool

    async def _evict_idle(self):
        now = time.monotonic()
        to_evict = [
            tid for tid, (_, last_used) in self._pools.items()
            if now - last_used > self._idle_timeout
        ]
        for tid in to_evict:
            pool, _ = self._pools.pop(tid)
            await pool.close()

Per-Tenant Rate Limiting

import time
from collections import defaultdict

class TenantRateLimiter:
    def __init__(self):
        self._buckets: dict[str, list[float]] = defaultdict(list)
        self._limits: dict[str, int] = {}  # tenant_id -> requests/minute

    def set_limit(self, tenant_id: str, rpm: int):
        self._limits[tenant_id] = rpm

    def check(self, tenant_id: str) -> tuple[bool, int]:
        """Returns (allowed, remaining)."""
        limit = self._limits.get(tenant_id, 100)  # Default 100 rpm
        now = time.monotonic()
        window_start = now - 60

        # Clean old entries
        bucket = self._buckets[tenant_id]
        self._buckets[tenant_id] = [
            t for t in bucket if t > window_start
        ]

        if len(self._buckets[tenant_id]) >= limit:
            return (False, 0)

        self._buckets[tenant_id].append(now)
        remaining = limit - len(self._buckets[tenant_id])
        return (True, remaining)

For production use, implement this with Redis sliding windows for distributed rate limiting across multiple app instances.

Cache Isolation

Shared caches need tenant-scoped keys to prevent data leakage:

class TenantCache:
    def __init__(self, redis_client, prefix: str = "cache"):
        self.redis = redis_client
        self.prefix = prefix

    def _key(self, tenant_id: str, key: str) -> str:
        return f"{self.prefix}:{tenant_id}:{key}"

    async def get(self, tenant_id: str, key: str):
        return await self.redis.get(self._key(tenant_id, key))

    async def set(self, tenant_id: str, key: str, value, ttl: int = 300):
        await self.redis.set(
            self._key(tenant_id, key), value, ex=ttl
        )

    async def flush_tenant(self, tenant_id: str):
        """Delete all cached data for a tenant (e.g., on tenant deletion)."""
        pattern = f"{self.prefix}:{tenant_id}:*"
        cursor = 0
        while True:
            cursor, keys = await self.redis.scan(
                cursor, match=pattern, count=100
            )
            if keys:
                await self.redis.delete(*keys)
            if cursor == 0:
                break

Testing Multitenant Code

The most critical test: verify that tenant isolation actually works.

import pytest

@pytest.fixture
async def tenant_a():
    return await create_test_tenant("tenant-a")

@pytest.fixture
async def tenant_b():
    return await create_test_tenant("tenant-b")

async def test_tenant_isolation(tenant_a, tenant_b, db_session):
    """Tenant A's data must be invisible to Tenant B."""
    # Create data as Tenant A
    current_tenant.set(tenant_a.id)
    project = Project(name="Secret Project")
    db_session.add(project)
    await db_session.commit()

    # Query as Tenant B
    current_tenant.set(tenant_b.id)
    projects = await db_session.execute(
        select(Project).filter(Project.name == "Secret Project")
    )
    assert projects.scalars().all() == [], "Tenant B can see Tenant A's data!"

async def test_cross_tenant_count(tenant_a, tenant_b, db_session):
    """Aggregations must be tenant-scoped."""
    current_tenant.set(tenant_a.id)
    for i in range(5):
        db_session.add(Task(title=f"Task {i}"))
    await db_session.commit()

    current_tenant.set(tenant_b.id)
    count = await db_session.scalar(select(func.count(Task.id)))
    assert count == 0, "Tenant B sees Tenant A's task count!"

One thing to remember: Multitenancy in Python requires tenant context propagation (via ContextVar or middleware), automatic query filtering (never trust manual WHERE clauses), scoped caching and rate limiting, and rigorous cross-tenant isolation tests. The complexity scales with your isolation model — shared schema is the most code-intensive because every layer needs tenant awareness.

pythonarchitecturesaas

See Also