Python Data Isolation Strategies — Deep Dive
PostgreSQL Row-Level Security (RLS)
RLS makes the database enforce tenant boundaries, independent of application code:
-- Enable RLS on the table
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
-- Force RLS even for table owners
ALTER TABLE projects FORCE ROW LEVEL SECURITY;
-- Policy: users can only see rows matching their tenant
CREATE POLICY tenant_isolation ON projects
USING (tenant_id = current_setting('app.tenant_id')::text);
-- Policy for INSERT: tenant_id must match
CREATE POLICY tenant_insert ON projects
FOR INSERT
WITH CHECK (tenant_id = current_setting('app.tenant_id')::text);
Setting the Tenant Context per Connection
import asyncpg
from contextvars import ContextVar
current_tenant: ContextVar[str] = ContextVar("current_tenant")
class TenantAwarePool:
def __init__(self, pool: asyncpg.Pool):
self._pool = pool
async def acquire(self):
conn = await self._pool.acquire()
tenant_id = current_tenant.get()
# Set the session variable that RLS policies read
await conn.execute(
"SET app.tenant_id = $1", tenant_id
)
return conn
async def release(self, conn):
# Reset to prevent leaking context to the next user
await conn.execute("RESET app.tenant_id")
await self._pool.release(conn)
Now even if application code runs SELECT * FROM projects without a WHERE clause, PostgreSQL returns only the current tenant’s rows. A buggy query can’t leak data.
Testing RLS Policies
import pytest
@pytest.mark.asyncio
async def test_rls_prevents_cross_tenant_read(db_pool):
"""Verify database-level isolation independent of application code."""
async with db_pool.acquire() as conn:
# Insert as Tenant A
await conn.execute("SET app.tenant_id = 'tenant-a'")
await conn.execute(
"INSERT INTO projects (id, tenant_id, name) VALUES ($1, $2, $3)",
"proj-1", "tenant-a", "Secret Project",
)
async with db_pool.acquire() as conn:
# Query as Tenant B — should see nothing
await conn.execute("SET app.tenant_id = 'tenant-b'")
rows = await conn.fetch("SELECT * FROM projects WHERE name = 'Secret Project'")
assert len(rows) == 0, "RLS failed: Tenant B can see Tenant A's data"
@pytest.mark.asyncio
async def test_rls_prevents_cross_tenant_update(db_pool):
"""Verify RLS blocks updates to other tenants' data."""
async with db_pool.acquire() as conn:
await conn.execute("SET app.tenant_id = 'tenant-b'")
result = await conn.execute(
"UPDATE projects SET name = 'Hacked' WHERE id = 'proj-1'"
)
# Should affect 0 rows because RLS filters tenant-a's rows
assert result == "UPDATE 0"
SQLAlchemy Multi-Layer Isolation
Combine application-level filtering with database RLS for defense in depth:
from sqlalchemy import event, Column, String, text
from sqlalchemy.orm import Session, DeclarativeBase
from contextvars import ContextVar
current_tenant: ContextVar[str] = ContextVar("current_tenant")
class Base(DeclarativeBase):
pass
class TenantModel(Base):
"""Abstract base for all tenant-scoped models."""
__abstract__ = True
tenant_id = Column(String(64), nullable=False, index=True)
# Layer 1: Application-level automatic filtering
@event.listens_for(Session, "do_orm_execute")
def apply_tenant_filter(execute_state):
"""Add tenant filter to every SELECT query."""
if not execute_state.is_select:
return
try:
tenant_id = current_tenant.get()
except LookupError:
raise RuntimeError(
"Query executed without tenant context! "
"This is a potential data isolation violation."
)
# Add WHERE tenant_id = ? to applicable entities
stmt = execute_state.statement
for entity in execute_state.all_mappers:
if hasattr(entity.class_, "tenant_id"):
stmt = stmt.filter(entity.class_.tenant_id == tenant_id)
execute_state.statement = stmt
# Layer 2: Set database-level context for RLS
@event.listens_for(Session, "after_begin")
def set_rls_context(session, transaction, connection):
try:
tenant_id = current_tenant.get()
connection.execute(
text("SET LOCAL app.tenant_id = :tid"),
{"tid": tenant_id},
)
except LookupError:
pass
This gives two independent isolation layers:
- SQLAlchemy adds
WHERE tenant_id = ?to every query (application layer) - PostgreSQL RLS independently filters rows (database layer)
Even if one layer fails, the other catches it.
Encrypted Tenant Data
For maximum protection, encrypt sensitive columns with tenant-specific keys:
from cryptography.fernet import Fernet
import base64
import hashlib
class TenantEncryption:
def __init__(self, master_key: bytes):
self._master_key = master_key
def _derive_key(self, tenant_id: str) -> bytes:
"""Derive a unique encryption key per tenant."""
derived = hashlib.pbkdf2_hmac(
"sha256",
self._master_key,
tenant_id.encode(),
iterations=100_000,
)
return base64.urlsafe_b64encode(derived)
def encrypt(self, tenant_id: str, plaintext: str) -> str:
key = self._derive_key(tenant_id)
f = Fernet(key)
return f.encrypt(plaintext.encode()).decode()
def decrypt(self, tenant_id: str, ciphertext: str) -> str:
key = self._derive_key(tenant_id)
f = Fernet(key)
return f.decrypt(ciphertext.encode()).decode()
# Usage with SQLAlchemy custom type
from sqlalchemy import TypeDecorator, String
class EncryptedString(TypeDecorator):
impl = String
cache_ok = True
def __init__(self, encryption: TenantEncryption, *args, **kwargs):
self._encryption = encryption
super().__init__(*args, **kwargs)
def process_bind_param(self, value, dialect):
if value is None:
return None
tenant_id = current_tenant.get()
return self._encryption.encrypt(tenant_id, value)
def process_result_value(self, value, dialect):
if value is None:
return None
tenant_id = current_tenant.get()
return self._encryption.decrypt(tenant_id, value)
With tenant-specific keys, even if a database dump is stolen, decrypting Tenant A’s data requires their specific derived key.
File Storage Isolation
import boto3
from pathlib import PurePosixPath
class TenantStorage:
def __init__(self, s3_client, bucket: str):
self.s3 = s3_client
self.bucket = bucket
def _tenant_prefix(self, tenant_id: str) -> str:
return f"tenants/{tenant_id}/"
async def upload(self, tenant_id: str, filename: str, data: bytes):
key = f"{self._tenant_prefix(tenant_id)}{filename}"
self.s3.put_object(Bucket=self.bucket, Key=key, Body=data)
return key
async def list_files(self, tenant_id: str) -> list[str]:
prefix = self._tenant_prefix(tenant_id)
response = self.s3.list_objects_v2(
Bucket=self.bucket, Prefix=prefix
)
return [
obj["Key"].removeprefix(prefix)
for obj in response.get("Contents", [])
]
async def download(self, tenant_id: str, filename: str) -> bytes:
key = f"{self._tenant_prefix(tenant_id)}{filename}"
# Security: verify the key is within the tenant prefix
resolved = str(PurePosixPath(key).resolve())
if not resolved.startswith(self._tenant_prefix(tenant_id)):
raise PermissionError("Path traversal attempt detected")
response = self.s3.get_object(Bucket=self.bucket, Key=key)
return response["Body"].read()
For stronger isolation, use separate S3 buckets per tenant with IAM policies that limit each tenant’s credentials to their bucket.
Background Job Isolation
Background jobs often run outside the request context. They need explicit tenant context:
from celery import Celery, signals
app = Celery("myapp")
class TenantTask(app.Task):
"""Base task that requires and sets tenant context."""
def apply_async(self, args=None, kwargs=None, **options):
# Inject current tenant into task headers
try:
tenant_id = current_tenant.get()
options.setdefault("headers", {})["tenant_id"] = tenant_id
except LookupError:
raise RuntimeError("Cannot queue task without tenant context")
return super().apply_async(args, kwargs, **options)
def __call__(self, *args, **kwargs):
# Set tenant context from headers before task executes
tenant_id = self.request.get("tenant_id")
if not tenant_id:
raise RuntimeError("Task missing tenant_id header")
token = current_tenant.set(tenant_id)
try:
return self.run(*args, **kwargs)
finally:
current_tenant.reset(token)
@app.task(base=TenantTask)
def generate_report(report_type: str):
# tenant context is automatically set from headers
tenant_id = current_tenant.get()
data = fetch_tenant_data(tenant_id)
return build_report(data, report_type)
Isolation Audit Framework
Periodically verify that isolation holds. This catches regressions:
class IsolationAuditor:
def __init__(self, db_pool):
self.db = db_pool
async def audit_table(self, table_name: str) -> list[str]:
"""Check for common isolation violations."""
violations = []
# Check 1: Missing tenant_id
result = await self.db.fetchval(f"""
SELECT COUNT(*) FROM {table_name}
WHERE tenant_id IS NULL OR tenant_id = ''
""")
if result > 0:
violations.append(
f"{table_name}: {result} rows with missing tenant_id"
)
# Check 2: RLS policy exists
rls_enabled = await self.db.fetchval("""
SELECT relrowsecurity FROM pg_class
WHERE relname = $1
""", table_name)
if not rls_enabled:
violations.append(
f"{table_name}: Row-Level Security is not enabled"
)
# Check 3: tenant_id is indexed
has_index = await self.db.fetchval("""
SELECT EXISTS(
SELECT 1 FROM pg_indexes
WHERE tablename = $1
AND indexdef LIKE '%tenant_id%'
)
""", table_name)
if not has_index:
violations.append(
f"{table_name}: No index on tenant_id (performance risk)"
)
return violations
async def full_audit(self) -> dict[str, list[str]]:
"""Audit all tenant-scoped tables."""
tables = await self.db.fetch("""
SELECT table_name FROM information_schema.columns
WHERE column_name = 'tenant_id'
AND table_schema = 'public'
""")
results = {}
for row in tables:
table = row["table_name"]
violations = await self.audit_table(table)
if violations:
results[table] = violations
return results
Run this as a CI check and a weekly production audit.
Data Deletion and Right to Erasure
GDPR’s “right to erasure” requires complete tenant data removal:
class TenantDataManager:
async def delete_tenant_data(self, tenant_id: str) -> dict:
"""Delete all data for a tenant across all storage layers."""
report = {}
# Database tables
tables = await self._get_tenant_tables()
for table in tables:
count = await self.db.fetchval(
f"DELETE FROM {table} WHERE tenant_id = $1 RETURNING COUNT(*)",
tenant_id,
)
report[f"db:{table}"] = count
# File storage
files_deleted = await self.storage.delete_tenant_prefix(tenant_id)
report["files"] = files_deleted
# Cache
await self.cache.flush_tenant(tenant_id)
report["cache"] = "flushed"
# Search index
await self.search.delete_tenant_documents(tenant_id)
report["search"] = "cleared"
# Audit log (keep for compliance, but anonymize)
await self.db.execute(
"UPDATE audit_logs SET user_data = '{}' WHERE tenant_id = $1",
tenant_id,
)
report["audit_logs"] = "anonymized"
return report
One thing to remember: Production data isolation requires multiple layers — application filtering, database-enforced RLS, encrypted storage, scoped caches, and regular audits. Any single mechanism can fail; layering them makes accidental cross-tenant data exposure nearly impossible. Always test isolation explicitly with cross-tenant access attempts.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.