Consent Management in Python — Deep Dive

The consent data model must be append-only and capture enough context to satisfy auditors:

from datetime import datetime
from enum import Enum
from uuid import uuid4
from sqlalchemy import (
    Column, String, Boolean, DateTime, Text, 
    ForeignKey, Index, JSON
)
from sqlalchemy.orm import DeclarativeBase, relationship

class ConsentPurpose(str, Enum):
    ANALYTICS = "analytics"
    MARKETING_EMAIL = "marketing_email"
    MARKETING_PUSH = "marketing_push"
    THIRD_PARTY_SHARING = "third_party_sharing"
    PERSONALIZATION = "personalization"
    RESEARCH = "research"

class ConsentSource(str, Enum):
    SIGNUP_FORM = "signup_form"
    COOKIE_BANNER = "cookie_banner"
    SETTINGS_PAGE = "settings_page"
    EMAIL_PREFERENCE_CENTER = "email_preference_center"
    API = "api"
    BULK_RECONSENT = "bulk_reconsent"

class Base(DeclarativeBase):
    pass

class PrivacyPolicyVersion(Base):
    """Track versions of the privacy policy users consented under."""
    __tablename__ = "privacy_policy_versions"
    
    id = Column(String(36), primary_key=True, default=lambda: str(uuid4()))
    version = Column(String(20), unique=True, nullable=False)
    published_at = Column(DateTime, nullable=False)
    content_hash = Column(String(64), nullable=False)  # SHA-256 of policy text
    summary_of_changes = Column(Text)
    requires_reconsent = Column(Boolean, default=False)

class ConsentRecord(Base):
    """Immutable record of a consent action. Never UPDATE, only INSERT."""
    __tablename__ = "consent_records"
    
    id = Column(String(36), primary_key=True, default=lambda: str(uuid4()))
    user_id = Column(String(36), nullable=False, index=True)
    purpose = Column(String(50), nullable=False)
    granted = Column(Boolean, nullable=False)
    recorded_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    
    # Context for audit
    policy_version_id = Column(String(36), ForeignKey("privacy_policy_versions.id"))
    source = Column(String(50), nullable=False)  # which UI/API collected this
    ip_address = Column(String(45))
    user_agent = Column(Text)
    
    # For withdrawal, reference the original grant
    supersedes_id = Column(String(36), ForeignKey("consent_records.id"), nullable=True)
    
    __table_args__ = (
        Index("idx_consent_user_purpose", "user_id", "purpose", "recorded_at"),
    )
from dataclasses import dataclass
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

@dataclass
class ConsentState:
    purpose: str
    granted: bool
    granted_at: datetime | None
    policy_version: str | None

class ConsentService:
    def __init__(self, session: AsyncSession, event_publisher):
        self.session = session
        self.publisher = event_publisher
    
    async def get_user_consents(self, user_id: str) -> dict[str, ConsentState]:
        """Get current consent state for all purposes."""
        # Subquery: latest record per purpose
        latest_subq = (
            select(
                ConsentRecord.purpose,
                func.max(ConsentRecord.recorded_at).label("latest_at"),
            )
            .where(ConsentRecord.user_id == user_id)
            .group_by(ConsentRecord.purpose)
            .subquery()
        )
        
        results = await self.session.execute(
            select(ConsentRecord)
            .join(latest_subq, 
                  (ConsentRecord.purpose == latest_subq.c.purpose)
                  & (ConsentRecord.recorded_at == latest_subq.c.latest_at))
            .where(ConsentRecord.user_id == user_id)
        )
        
        states = {}
        for record in results.scalars():
            policy = await self.session.get(
                PrivacyPolicyVersion, record.policy_version_id
            )
            states[record.purpose] = ConsentState(
                purpose=record.purpose,
                granted=record.granted,
                granted_at=record.recorded_at if record.granted else None,
                policy_version=policy.version if policy else None,
            )
        
        # Include purposes with no consent recorded (default: not granted)
        for purpose in ConsentPurpose:
            if purpose.value not in states:
                states[purpose.value] = ConsentState(
                    purpose=purpose.value,
                    granted=False,
                    granted_at=None,
                    policy_version=None,
                )
        
        return states
    
    async def grant_consent(
        self, user_id: str, purpose: str, 
        source: str, policy_version_id: str,
        ip_address: str = "", user_agent: str = "",
    ) -> ConsentRecord:
        record = ConsentRecord(
            user_id=user_id,
            purpose=purpose,
            granted=True,
            source=source,
            policy_version_id=policy_version_id,
            ip_address=ip_address,
            user_agent=user_agent,
        )
        self.session.add(record)
        await self.session.flush()
        
        await self.publisher.publish("consent.granted", {
            "user_id": user_id,
            "purpose": purpose,
            "record_id": record.id,
            "timestamp": record.recorded_at.isoformat(),
        })
        
        return record
    
    async def withdraw_consent(
        self, user_id: str, purpose: str,
        source: str, ip_address: str = "", user_agent: str = "",
    ) -> ConsentRecord:
        """Withdraw consent — creates a new record, publishes event."""
        # Find the current grant to reference
        current = await self._get_latest_record(user_id, purpose)
        
        record = ConsentRecord(
            user_id=user_id,
            purpose=purpose,
            granted=False,
            source=source,
            policy_version_id=current.policy_version_id if current else None,
            ip_address=ip_address,
            user_agent=user_agent,
            supersedes_id=current.id if current else None,
        )
        self.session.add(record)
        await self.session.flush()
        
        # Publish withdrawal event for downstream systems
        await self.publisher.publish("consent.withdrawn", {
            "user_id": user_id,
            "purpose": purpose,
            "record_id": record.id,
            "timestamp": record.recorded_at.isoformat(),
        })
        
        return record
    
    async def check_consent(self, user_id: str, purpose: str) -> bool:
        """Fast consent check for use in processing pipelines."""
        record = await self._get_latest_record(user_id, purpose)
        return record.granted if record else False
    
    async def _get_latest_record(
        self, user_id: str, purpose: str
    ) -> ConsentRecord | None:
        result = await self.session.execute(
            select(ConsentRecord)
            .where(
                ConsentRecord.user_id == user_id,
                ConsentRecord.purpose == purpose,
            )
            .order_by(ConsentRecord.recorded_at.desc())
            .limit(1)
        )
        return result.scalar_one_or_none()

Enforce consent checks at the function level:

from functools import wraps
from typing import Callable

class ConsentRequired:
    """Decorator that blocks processing without active consent."""
    
    def __init__(self, purpose: str, consent_service: ConsentService):
        self.purpose = purpose
        self.service = consent_service
    
    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Extract user_id from first arg or kwargs
            user_id = kwargs.get("user_id") or (args[0] if args else None)
            if not user_id:
                raise ValueError("user_id required for consent-gated operations")
            
            has_consent = await self.service.check_consent(user_id, self.purpose)
            if not has_consent:
                raise ConsentNotGranted(
                    f"User {user_id} has not consented to '{self.purpose}'"
                )
            
            return await func(*args, **kwargs)
        return wrapper

class ConsentNotGranted(Exception):
    pass

# Usage
@ConsentRequired("marketing_email", consent_service)
async def send_marketing_email(user_id: str, campaign_id: str):
    """Only executes if user has active marketing_email consent."""
    ...

@ConsentRequired("analytics", consent_service)
async def track_user_behavior(user_id: str, event: dict):
    """Only tracks if user has active analytics consent."""
    ...

When the privacy policy changes and requires re-consent, flag affected users:

class ReconsentManager:
    def __init__(self, session: AsyncSession, consent_service: ConsentService):
        self.session = session
        self.consent = consent_service
    
    async def publish_new_policy(
        self, version: str, content_hash: str,
        summary: str, requires_reconsent: bool,
        affected_purposes: list[str] | None = None,
    ) -> PrivacyPolicyVersion:
        policy = PrivacyPolicyVersion(
            version=version,
            published_at=datetime.utcnow(),
            content_hash=content_hash,
            summary_of_changes=summary,
            requires_reconsent=requires_reconsent,
        )
        self.session.add(policy)
        await self.session.flush()
        
        if requires_reconsent:
            await self._flag_users_for_reconsent(
                policy.id, affected_purposes or list(ConsentPurpose)
            )
        
        return policy
    
    async def _flag_users_for_reconsent(
        self, policy_id: str, purposes: list
    ) -> int:
        """Mark users whose consent was under an older policy."""
        # Find users with active consent for affected purposes
        result = await self.session.execute(
            text(
                "INSERT INTO reconsent_queue (user_id, purpose, old_policy_id, new_policy_id) "
                "SELECT DISTINCT cr.user_id, cr.purpose, cr.policy_version_id, :new_policy "
                "FROM consent_records cr "
                "INNER JOIN ("
                "    SELECT user_id, purpose, MAX(recorded_at) AS latest "
                "    FROM consent_records "
                "    WHERE purpose = ANY(:purposes) "
                "    GROUP BY user_id, purpose"
                ") latest ON cr.user_id = latest.user_id "
                "    AND cr.purpose = latest.purpose "
                "    AND cr.recorded_at = latest.latest "
                "WHERE cr.granted = true "
                "AND cr.policy_version_id != :new_policy"
            ),
            {
                "new_policy": policy_id,
                "purposes": [p.value if isinstance(p, ConsentPurpose) else p for p in purposes],
            },
        )
        return result.rowcount
    
    async def get_reconsent_needed(self, user_id: str) -> list[dict]:
        """Check if a user needs to re-consent to any purposes."""
        result = await self.session.execute(
            text(
                "SELECT rq.purpose, ppv.version, ppv.summary_of_changes "
                "FROM reconsent_queue rq "
                "JOIN privacy_policy_versions ppv ON rq.new_policy_id = ppv.id "
                "WHERE rq.user_id = :user_id AND rq.completed_at IS NULL"
            ),
            {"user_id": user_id},
        )
        return [dict(row._mapping) for row in result.fetchall()]
from fastapi import APIRouter, Depends, Request
from pydantic import BaseModel

router = APIRouter(prefix="/consent", tags=["consent"])

class ConsentUpdateRequest(BaseModel):
    purposes: dict[str, bool]  # {"analytics": true, "marketing_email": false}

class ConsentResponse(BaseModel):
    purposes: dict[str, dict]

@router.get("/", response_model=ConsentResponse)
async def get_consents(
    request: Request,
    consent_service: ConsentService = Depends(get_consent_service),
):
    user_id = request.state.user_id
    states = await consent_service.get_user_consents(user_id)
    return ConsentResponse(
        purposes={
            k: {
                "granted": v.granted,
                "granted_at": v.granted_at.isoformat() if v.granted_at else None,
                "policy_version": v.policy_version,
            }
            for k, v in states.items()
        }
    )

@router.put("/")
async def update_consents(
    body: ConsentUpdateRequest,
    request: Request,
    consent_service: ConsentService = Depends(get_consent_service),
):
    user_id = request.state.user_id
    policy = await get_current_policy_version()
    
    results = {}
    for purpose, granted in body.purposes.items():
        if granted:
            record = await consent_service.grant_consent(
                user_id=user_id,
                purpose=purpose,
                source="settings_page",
                policy_version_id=policy.id,
                ip_address=request.client.host,
                user_agent=request.headers.get("user-agent", ""),
            )
        else:
            record = await consent_service.withdraw_consent(
                user_id=user_id,
                purpose=purpose,
                source="settings_page",
                ip_address=request.client.host,
                user_agent=request.headers.get("user-agent", ""),
            )
        results[purpose] = {"granted": granted, "record_id": record.id}
    
    await consent_service.session.commit()
    return {"updated": results}

Tradeoffs

Granularity vs. user fatigue: More purposes give users finer control but create decision fatigue. Too many checkboxes and users either accept all or abandon signup. Group related purposes logically (e.g., “marketing communications” covers email and push).

Consent check latency: Checking consent on every API call adds latency. Cache active consent states in Redis with a short TTL (30-60 seconds). The tradeoff is a brief delay between withdrawal and enforcement — acceptable under “without undue delay.”

Consent as a service vs. embedded: A dedicated consent microservice is clean architecturally but adds a network hop to every consent check. Embedding consent logic in each service avoids the hop but scatters consent logic. Most teams use a shared library with a centralized database.

Re-consent fatigue: Frequent policy changes that trigger re-consent flows annoy users and reduce consent rates. Batch non-critical changes and only trigger re-consent for material changes that affect user rights.

The one thing to remember: Production consent management requires an append-only data model tracking per-purpose grants and withdrawals, event-driven propagation to downstream services, automated re-consent workflows for policy changes, and consent-gate decorators that enforce checks at the processing level.

pythonprivacyconsentgdpr

See Also

  • Python Compliance Audit Trails Why your Python app needs a tamper-proof diary that records every important action — like a security camera for your data
  • Python Data Anonymization How Python can disguise personal information so well that nobody — not even the original collector — can figure out who it belongs to
  • Python Data Retention Policies Why your Python app needs an expiration date for data — just like the one on milk cartons — and what happens when data goes stale
  • Python Differential Privacy How adding a pinch of random noise to data lets companies learn from millions of people without knowing anything about any single person
  • Python Gdpr Compliance Why Europe's privacy law is like a restaurant that must tell you every ingredient — and how Python apps follow the recipe