Python Data Isolation Strategies — Deep Dive

PostgreSQL Row-Level Security (RLS)

RLS makes the database enforce tenant boundaries, independent of application code:

-- Enable RLS on the table
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;

-- Force RLS even for table owners
ALTER TABLE projects FORCE ROW LEVEL SECURITY;

-- Policy: users can only see rows matching their tenant
CREATE POLICY tenant_isolation ON projects
    USING (tenant_id = current_setting('app.tenant_id')::text);

-- Policy for INSERT: tenant_id must match
CREATE POLICY tenant_insert ON projects
    FOR INSERT
    WITH CHECK (tenant_id = current_setting('app.tenant_id')::text);

Setting the Tenant Context per Connection

import asyncpg
from contextvars import ContextVar

current_tenant: ContextVar[str] = ContextVar("current_tenant")

class TenantAwarePool:
    def __init__(self, pool: asyncpg.Pool):
        self._pool = pool

    async def acquire(self):
        conn = await self._pool.acquire()
        tenant_id = current_tenant.get()
        # Set the session variable that RLS policies read
        await conn.execute(
            "SET app.tenant_id = $1", tenant_id
        )
        return conn

    async def release(self, conn):
        # Reset to prevent leaking context to the next user
        await conn.execute("RESET app.tenant_id")
        await self._pool.release(conn)

Now even if application code runs SELECT * FROM projects without a WHERE clause, PostgreSQL returns only the current tenant’s rows. A buggy query can’t leak data.

Testing RLS Policies

import pytest

@pytest.mark.asyncio
async def test_rls_prevents_cross_tenant_read(db_pool):
    """Verify database-level isolation independent of application code."""
    async with db_pool.acquire() as conn:
        # Insert as Tenant A
        await conn.execute("SET app.tenant_id = 'tenant-a'")
        await conn.execute(
            "INSERT INTO projects (id, tenant_id, name) VALUES ($1, $2, $3)",
            "proj-1", "tenant-a", "Secret Project",
        )

    async with db_pool.acquire() as conn:
        # Query as Tenant B — should see nothing
        await conn.execute("SET app.tenant_id = 'tenant-b'")
        rows = await conn.fetch("SELECT * FROM projects WHERE name = 'Secret Project'")
        assert len(rows) == 0, "RLS failed: Tenant B can see Tenant A's data"

@pytest.mark.asyncio
async def test_rls_prevents_cross_tenant_update(db_pool):
    """Verify RLS blocks updates to other tenants' data."""
    async with db_pool.acquire() as conn:
        await conn.execute("SET app.tenant_id = 'tenant-b'")
        result = await conn.execute(
            "UPDATE projects SET name = 'Hacked' WHERE id = 'proj-1'"
        )
        # Should affect 0 rows because RLS filters tenant-a's rows
        assert result == "UPDATE 0"

SQLAlchemy Multi-Layer Isolation

Combine application-level filtering with database RLS for defense in depth:

from sqlalchemy import event, Column, String, text
from sqlalchemy.orm import Session, DeclarativeBase
from contextvars import ContextVar

current_tenant: ContextVar[str] = ContextVar("current_tenant")

class Base(DeclarativeBase):
    pass

class TenantModel(Base):
    """Abstract base for all tenant-scoped models."""
    __abstract__ = True
    tenant_id = Column(String(64), nullable=False, index=True)

# Layer 1: Application-level automatic filtering
@event.listens_for(Session, "do_orm_execute")
def apply_tenant_filter(execute_state):
    """Add tenant filter to every SELECT query."""
    if not execute_state.is_select:
        return

    try:
        tenant_id = current_tenant.get()
    except LookupError:
        raise RuntimeError(
            "Query executed without tenant context! "
            "This is a potential data isolation violation."
        )

    # Add WHERE tenant_id = ? to applicable entities
    stmt = execute_state.statement
    for entity in execute_state.all_mappers:
        if hasattr(entity.class_, "tenant_id"):
            stmt = stmt.filter(entity.class_.tenant_id == tenant_id)
    execute_state.statement = stmt

# Layer 2: Set database-level context for RLS
@event.listens_for(Session, "after_begin")
def set_rls_context(session, transaction, connection):
    try:
        tenant_id = current_tenant.get()
        connection.execute(
            text("SET LOCAL app.tenant_id = :tid"),
            {"tid": tenant_id},
        )
    except LookupError:
        pass

This gives two independent isolation layers:

  1. SQLAlchemy adds WHERE tenant_id = ? to every query (application layer)
  2. PostgreSQL RLS independently filters rows (database layer)

Even if one layer fails, the other catches it.

Encrypted Tenant Data

For maximum protection, encrypt sensitive columns with tenant-specific keys:

from cryptography.fernet import Fernet
import base64
import hashlib

class TenantEncryption:
    def __init__(self, master_key: bytes):
        self._master_key = master_key

    def _derive_key(self, tenant_id: str) -> bytes:
        """Derive a unique encryption key per tenant."""
        derived = hashlib.pbkdf2_hmac(
            "sha256",
            self._master_key,
            tenant_id.encode(),
            iterations=100_000,
        )
        return base64.urlsafe_b64encode(derived)

    def encrypt(self, tenant_id: str, plaintext: str) -> str:
        key = self._derive_key(tenant_id)
        f = Fernet(key)
        return f.encrypt(plaintext.encode()).decode()

    def decrypt(self, tenant_id: str, ciphertext: str) -> str:
        key = self._derive_key(tenant_id)
        f = Fernet(key)
        return f.decrypt(ciphertext.encode()).decode()

# Usage with SQLAlchemy custom type
from sqlalchemy import TypeDecorator, String

class EncryptedString(TypeDecorator):
    impl = String
    cache_ok = True

    def __init__(self, encryption: TenantEncryption, *args, **kwargs):
        self._encryption = encryption
        super().__init__(*args, **kwargs)

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        tenant_id = current_tenant.get()
        return self._encryption.encrypt(tenant_id, value)

    def process_result_value(self, value, dialect):
        if value is None:
            return None
        tenant_id = current_tenant.get()
        return self._encryption.decrypt(tenant_id, value)

With tenant-specific keys, even if a database dump is stolen, decrypting Tenant A’s data requires their specific derived key.

File Storage Isolation

import boto3
from pathlib import PurePosixPath

class TenantStorage:
    def __init__(self, s3_client, bucket: str):
        self.s3 = s3_client
        self.bucket = bucket

    def _tenant_prefix(self, tenant_id: str) -> str:
        return f"tenants/{tenant_id}/"

    async def upload(self, tenant_id: str, filename: str, data: bytes):
        key = f"{self._tenant_prefix(tenant_id)}{filename}"
        self.s3.put_object(Bucket=self.bucket, Key=key, Body=data)
        return key

    async def list_files(self, tenant_id: str) -> list[str]:
        prefix = self._tenant_prefix(tenant_id)
        response = self.s3.list_objects_v2(
            Bucket=self.bucket, Prefix=prefix
        )
        return [
            obj["Key"].removeprefix(prefix)
            for obj in response.get("Contents", [])
        ]

    async def download(self, tenant_id: str, filename: str) -> bytes:
        key = f"{self._tenant_prefix(tenant_id)}{filename}"
        # Security: verify the key is within the tenant prefix
        resolved = str(PurePosixPath(key).resolve())
        if not resolved.startswith(self._tenant_prefix(tenant_id)):
            raise PermissionError("Path traversal attempt detected")
        
        response = self.s3.get_object(Bucket=self.bucket, Key=key)
        return response["Body"].read()

For stronger isolation, use separate S3 buckets per tenant with IAM policies that limit each tenant’s credentials to their bucket.

Background Job Isolation

Background jobs often run outside the request context. They need explicit tenant context:

from celery import Celery, signals

app = Celery("myapp")

class TenantTask(app.Task):
    """Base task that requires and sets tenant context."""
    
    def apply_async(self, args=None, kwargs=None, **options):
        # Inject current tenant into task headers
        try:
            tenant_id = current_tenant.get()
            options.setdefault("headers", {})["tenant_id"] = tenant_id
        except LookupError:
            raise RuntimeError("Cannot queue task without tenant context")
        return super().apply_async(args, kwargs, **options)

    def __call__(self, *args, **kwargs):
        # Set tenant context from headers before task executes
        tenant_id = self.request.get("tenant_id")
        if not tenant_id:
            raise RuntimeError("Task missing tenant_id header")
        
        token = current_tenant.set(tenant_id)
        try:
            return self.run(*args, **kwargs)
        finally:
            current_tenant.reset(token)

@app.task(base=TenantTask)
def generate_report(report_type: str):
    # tenant context is automatically set from headers
    tenant_id = current_tenant.get()
    data = fetch_tenant_data(tenant_id)
    return build_report(data, report_type)

Isolation Audit Framework

Periodically verify that isolation holds. This catches regressions:

class IsolationAuditor:
    def __init__(self, db_pool):
        self.db = db_pool

    async def audit_table(self, table_name: str) -> list[str]:
        """Check for common isolation violations."""
        violations = []

        # Check 1: Missing tenant_id
        result = await self.db.fetchval(f"""
            SELECT COUNT(*) FROM {table_name} 
            WHERE tenant_id IS NULL OR tenant_id = ''
        """)
        if result > 0:
            violations.append(
                f"{table_name}: {result} rows with missing tenant_id"
            )

        # Check 2: RLS policy exists
        rls_enabled = await self.db.fetchval("""
            SELECT relrowsecurity FROM pg_class 
            WHERE relname = $1
        """, table_name)
        if not rls_enabled:
            violations.append(
                f"{table_name}: Row-Level Security is not enabled"
            )

        # Check 3: tenant_id is indexed
        has_index = await self.db.fetchval("""
            SELECT EXISTS(
                SELECT 1 FROM pg_indexes 
                WHERE tablename = $1 
                AND indexdef LIKE '%tenant_id%'
            )
        """, table_name)
        if not has_index:
            violations.append(
                f"{table_name}: No index on tenant_id (performance risk)"
            )

        return violations

    async def full_audit(self) -> dict[str, list[str]]:
        """Audit all tenant-scoped tables."""
        tables = await self.db.fetch("""
            SELECT table_name FROM information_schema.columns 
            WHERE column_name = 'tenant_id' 
            AND table_schema = 'public'
        """)

        results = {}
        for row in tables:
            table = row["table_name"]
            violations = await self.audit_table(table)
            if violations:
                results[table] = violations
        return results

Run this as a CI check and a weekly production audit.

Data Deletion and Right to Erasure

GDPR’s “right to erasure” requires complete tenant data removal:

class TenantDataManager:
    async def delete_tenant_data(self, tenant_id: str) -> dict:
        """Delete all data for a tenant across all storage layers."""
        report = {}

        # Database tables
        tables = await self._get_tenant_tables()
        for table in tables:
            count = await self.db.fetchval(
                f"DELETE FROM {table} WHERE tenant_id = $1 RETURNING COUNT(*)",
                tenant_id,
            )
            report[f"db:{table}"] = count

        # File storage
        files_deleted = await self.storage.delete_tenant_prefix(tenant_id)
        report["files"] = files_deleted

        # Cache
        await self.cache.flush_tenant(tenant_id)
        report["cache"] = "flushed"

        # Search index
        await self.search.delete_tenant_documents(tenant_id)
        report["search"] = "cleared"

        # Audit log (keep for compliance, but anonymize)
        await self.db.execute(
            "UPDATE audit_logs SET user_data = '{}' WHERE tenant_id = $1",
            tenant_id,
        )
        report["audit_logs"] = "anonymized"

        return report

One thing to remember: Production data isolation requires multiple layers — application filtering, database-enforced RLS, encrypted storage, scoped caches, and regular audits. Any single mechanism can fail; layering them makes accidental cross-tenant data exposure nearly impossible. Always test isolation explicitly with cross-tenant access attempts.

pythonsecurityarchitecture

See Also