Python Multitenancy Patterns — Deep Dive
Shared Schema: Row-Level Tenant Isolation
The most common model. Every table has a tenant_id and every query must filter by it. The danger: one forgotten filter exposes another tenant’s data.
SQLAlchemy Automatic Tenant Filtering
Use SQLAlchemy events to automatically inject tenant filters:
from sqlalchemy import event, Column, String, create_engine
from sqlalchemy.orm import Session, DeclarativeBase, Query
from contextvars import ContextVar
# Thread-safe tenant context
current_tenant: ContextVar[str] = ContextVar("current_tenant")
class Base(DeclarativeBase):
pass
class TenantMixin:
"""Add to every model that needs tenant isolation."""
tenant_id = Column(String(64), nullable=False, index=True)
class Project(TenantMixin, Base):
__tablename__ = "projects"
id = Column(String(36), primary_key=True)
name = Column(String(255))
class Task(TenantMixin, Base):
__tablename__ = "tasks"
id = Column(String(36), primary_key=True)
title = Column(String(255))
project_id = Column(String(36))
# Automatically add tenant_id filter to all queries
@event.listens_for(Query, "before_compile", retval=True)
def add_tenant_filter(query):
try:
tenant_id = current_tenant.get()
except LookupError:
raise RuntimeError("No tenant context set — potential data leak!")
for column_desc in query.column_descriptions:
entity = column_desc.get("entity")
if entity and hasattr(entity, "tenant_id"):
query = query.filter(entity.tenant_id == tenant_id)
return query
# Automatically set tenant_id on new objects
@event.listens_for(Session, "before_flush")
def set_tenant_on_insert(session, flush_context, instances):
try:
tenant_id = current_tenant.get()
except LookupError:
return
for obj in session.new:
if hasattr(obj, "tenant_id") and not obj.tenant_id:
obj.tenant_id = tenant_id
The ContextVar stores the tenant ID for the current request. The before_compile event adds the filter to every query automatically — you can’t forget it.
FastAPI Middleware for Tenant Context
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
class TenantMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Resolve tenant from subdomain
host = request.headers.get("host", "")
subdomain = host.split(".")[0] if "." in host else None
if not subdomain or subdomain in ("www", "api", "app"):
raise HTTPException(400, "Tenant not identified")
tenant = await resolve_tenant(subdomain)
if not tenant:
raise HTTPException(404, "Unknown tenant")
# Set context for this request
token = current_tenant.set(tenant.id)
request.state.tenant = tenant
try:
response = await call_next(request)
return response
finally:
current_tenant.reset(token)
async def resolve_tenant(subdomain: str):
"""Look up tenant by subdomain, with caching."""
cache_key = f"tenant:{subdomain}"
cached = await redis.get(cache_key)
if cached:
return Tenant.from_json(cached)
tenant = await db.fetch_one(
"SELECT * FROM tenants WHERE subdomain = $1 AND active = true",
subdomain,
)
if tenant:
await redis.set(cache_key, tenant.to_json(), ex=300)
return tenant
Schema-Per-Tenant with Django
Django’s database routing makes schema-per-tenant straightforward with django-tenants:
# settings.py
DATABASES = {
"default": {
"ENGINE": "django_tenants.postgresql_backend",
"NAME": "myapp",
"HOST": "localhost",
}
}
DATABASE_ROUTERS = ("django_tenants.routers.TenantSyncRouter",)
TENANT_MODEL = "tenants.Tenant"
TENANT_DOMAIN_MODEL = "tenants.Domain"
SHARED_APPS = [
"django_tenants",
"tenants", # Tenant management (public schema)
"django.contrib.auth",
]
TENANT_APPS = [
"projects", # Each tenant gets their own tables
"tasks",
"billing",
]
# models.py
from django_tenants.models import TenantMixin, DomainMixin
class Tenant(TenantMixin):
name = models.CharField(max_length=100)
plan = models.CharField(max_length=20, default="free")
created_at = models.DateTimeField(auto_now_add=True)
auto_create_schema = True
class Domain(DomainMixin):
pass
When a request arrives at acme.yourapp.com, the middleware sets the PostgreSQL search_path to the acme schema. All queries automatically target that schema’s tables.
Migration Across Schemas
# Migrate shared apps (public schema)
python manage.py migrate_schemas --shared
# Migrate tenant apps (all tenant schemas)
python manage.py migrate_schemas --tenant
# Migrate a specific tenant
python manage.py migrate_schemas --tenant --schema=acme
For hundreds of tenants, run migrations in parallel:
from concurrent.futures import ThreadPoolExecutor
from django.db import connections
def migrate_tenant(schema_name: str):
connection = connections["default"]
connection.set_schema(schema_name)
call_command("migrate", verbosity=0)
connection.set_schema_to_public()
schemas = Tenant.objects.values_list("schema_name", flat=True)
with ThreadPoolExecutor(max_workers=4) as pool:
pool.map(migrate_tenant, schemas)
Database-Per-Tenant Routing
For maximum isolation, each tenant connects to their own database:
from contextlib import asynccontextmanager
import asyncpg
class TenantDatabaseRouter:
def __init__(self):
self._pools: dict[str, asyncpg.Pool] = {}
self._pool_lock = asyncio.Lock()
async def get_pool(self, tenant_id: str) -> asyncpg.Pool:
if tenant_id not in self._pools:
async with self._pool_lock:
if tenant_id not in self._pools:
config = await self._get_db_config(tenant_id)
self._pools[tenant_id] = await asyncpg.create_pool(
dsn=config.dsn,
min_size=2,
max_size=10,
)
return self._pools[tenant_id]
async def _get_db_config(self, tenant_id: str) -> DatabaseConfig:
"""Fetch database connection details for a tenant."""
# From a central registry (could be a config service,
# Consul, or a management database)
return await registry.get_database(tenant_id)
@asynccontextmanager
async def connection(self, tenant_id: str):
pool = await self.get_pool(tenant_id)
async with pool.acquire() as conn:
yield conn
async def shutdown(self):
for pool in self._pools.values():
await pool.close()
# Usage:
router = TenantDatabaseRouter()
async def get_projects(tenant_id: str):
async with router.connection(tenant_id) as conn:
return await conn.fetch("SELECT * FROM projects")
Connection Pool Management
With database-per-tenant, connection pools can explode. If you have 500 tenants with min_size=2, that’s 1,000 idle connections. Strategies:
class LazyPoolRouter:
"""Pools are created on demand and evicted when idle."""
def __init__(self, max_pools: int = 50, idle_timeout: float = 300):
self._pools: dict[str, tuple[asyncpg.Pool, float]] = {}
self._max_pools = max_pools
self._idle_timeout = idle_timeout
async def get_pool(self, tenant_id: str) -> asyncpg.Pool:
if tenant_id in self._pools:
pool, _ = self._pools[tenant_id]
self._pools[tenant_id] = (pool, time.monotonic())
return pool
# Evict idle pools if at capacity
if len(self._pools) >= self._max_pools:
await self._evict_idle()
pool = await self._create_pool(tenant_id)
self._pools[tenant_id] = (pool, time.monotonic())
return pool
async def _evict_idle(self):
now = time.monotonic()
to_evict = [
tid for tid, (_, last_used) in self._pools.items()
if now - last_used > self._idle_timeout
]
for tid in to_evict:
pool, _ = self._pools.pop(tid)
await pool.close()
Per-Tenant Rate Limiting
import time
from collections import defaultdict
class TenantRateLimiter:
def __init__(self):
self._buckets: dict[str, list[float]] = defaultdict(list)
self._limits: dict[str, int] = {} # tenant_id -> requests/minute
def set_limit(self, tenant_id: str, rpm: int):
self._limits[tenant_id] = rpm
def check(self, tenant_id: str) -> tuple[bool, int]:
"""Returns (allowed, remaining)."""
limit = self._limits.get(tenant_id, 100) # Default 100 rpm
now = time.monotonic()
window_start = now - 60
# Clean old entries
bucket = self._buckets[tenant_id]
self._buckets[tenant_id] = [
t for t in bucket if t > window_start
]
if len(self._buckets[tenant_id]) >= limit:
return (False, 0)
self._buckets[tenant_id].append(now)
remaining = limit - len(self._buckets[tenant_id])
return (True, remaining)
For production use, implement this with Redis sliding windows for distributed rate limiting across multiple app instances.
Cache Isolation
Shared caches need tenant-scoped keys to prevent data leakage:
class TenantCache:
def __init__(self, redis_client, prefix: str = "cache"):
self.redis = redis_client
self.prefix = prefix
def _key(self, tenant_id: str, key: str) -> str:
return f"{self.prefix}:{tenant_id}:{key}"
async def get(self, tenant_id: str, key: str):
return await self.redis.get(self._key(tenant_id, key))
async def set(self, tenant_id: str, key: str, value, ttl: int = 300):
await self.redis.set(
self._key(tenant_id, key), value, ex=ttl
)
async def flush_tenant(self, tenant_id: str):
"""Delete all cached data for a tenant (e.g., on tenant deletion)."""
pattern = f"{self.prefix}:{tenant_id}:*"
cursor = 0
while True:
cursor, keys = await self.redis.scan(
cursor, match=pattern, count=100
)
if keys:
await self.redis.delete(*keys)
if cursor == 0:
break
Testing Multitenant Code
The most critical test: verify that tenant isolation actually works.
import pytest
@pytest.fixture
async def tenant_a():
return await create_test_tenant("tenant-a")
@pytest.fixture
async def tenant_b():
return await create_test_tenant("tenant-b")
async def test_tenant_isolation(tenant_a, tenant_b, db_session):
"""Tenant A's data must be invisible to Tenant B."""
# Create data as Tenant A
current_tenant.set(tenant_a.id)
project = Project(name="Secret Project")
db_session.add(project)
await db_session.commit()
# Query as Tenant B
current_tenant.set(tenant_b.id)
projects = await db_session.execute(
select(Project).filter(Project.name == "Secret Project")
)
assert projects.scalars().all() == [], "Tenant B can see Tenant A's data!"
async def test_cross_tenant_count(tenant_a, tenant_b, db_session):
"""Aggregations must be tenant-scoped."""
current_tenant.set(tenant_a.id)
for i in range(5):
db_session.add(Task(title=f"Task {i}"))
await db_session.commit()
current_tenant.set(tenant_b.id)
count = await db_session.scalar(select(func.count(Task.id)))
assert count == 0, "Tenant B sees Tenant A's task count!"
One thing to remember: Multitenancy in Python requires tenant context propagation (via ContextVar or middleware), automatic query filtering (never trust manual WHERE clauses), scoped caching and rate limiting, and rigorous cross-tenant isolation tests. The complexity scales with your isolation model — shared schema is the most code-intensive because every layer needs tenant awareness.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.