GDPR Compliance in Python — Deep Dive
Consent storage architecture
A production consent system needs more than a boolean column. You need an immutable audit trail that records every consent event with enough context to prove compliance during an audit.
from datetime import datetime
from enum import Enum
from sqlalchemy import Column, String, DateTime, Text, ForeignKey, Boolean
from sqlalchemy.orm import DeclarativeBase, relationship
class ConsentPurpose(str, Enum):
MARKETING_EMAIL = "marketing_email"
ANALYTICS = "analytics"
THIRD_PARTY_SHARING = "third_party_sharing"
PERSONALIZATION = "personalization"
class Base(DeclarativeBase):
pass
class ConsentRecord(Base):
__tablename__ = "consent_records"
id = Column(String(36), primary_key=True)
user_id = Column(String(36), ForeignKey("users.id"), nullable=False)
purpose = Column(String(50), nullable=False)
granted = Column(Boolean, nullable=False)
collected_at = Column(DateTime, default=datetime.utcnow, nullable=False)
ip_address = Column(String(45)) # IPv6 max length
user_agent = Column(Text)
privacy_policy_version = Column(String(20), nullable=False)
consent_mechanism = Column(String(100)) # e.g., "signup_form_v3", "cookie_banner_v2"
# Never update — always append new records
# Withdrawal is a new record with granted=False
The key design decision: consent records are append-only. You never update an existing consent record. When a user withdraws consent, you insert a new record with granted=False. This gives you a complete timeline that auditors can verify.
To check current consent status, query the most recent record per purpose:
from sqlalchemy import select, func
def get_active_consents(session, user_id: str) -> dict[str, bool]:
"""Get the latest consent state for each purpose."""
subq = (
select(
ConsentRecord.purpose,
func.max(ConsentRecord.collected_at).label("latest")
)
.where(ConsentRecord.user_id == user_id)
.group_by(ConsentRecord.purpose)
.subquery()
)
results = session.execute(
select(ConsentRecord.purpose, ConsentRecord.granted)
.join(subq, (ConsentRecord.purpose == subq.c.purpose)
& (ConsentRecord.collected_at == subq.c.latest))
.where(ConsentRecord.user_id == user_id)
).all()
return {row.purpose: row.granted for row in results}
Data Subject Access Requests (DSARs)
Article 15 requests require you to compile everything you know about a user within 30 days. In a microservice architecture, this means querying multiple services and aggregating results.
import json
import asyncio
from dataclasses import dataclass, field
from typing import Protocol
class DataSource(Protocol):
async def export_user_data(self, user_id: str) -> dict:
...
@dataclass
class DSARPipeline:
sources: list[DataSource] = field(default_factory=list)
def register(self, source: DataSource) -> None:
self.sources.append(source)
async def execute(self, user_id: str) -> dict:
"""Collect data from all registered sources concurrently."""
tasks = [source.export_user_data(user_id) for source in self.sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
export = {"user_id": user_id, "exported_at": datetime.utcnow().isoformat()}
for source, result in zip(self.sources, results):
source_name = type(source).__name__
if isinstance(result, Exception):
export[source_name] = {"error": str(result)}
else:
export[source_name] = result
return export
# Register data sources across your application
class OrderDataSource:
def __init__(self, db_session):
self.db = db_session
async def export_user_data(self, user_id: str) -> dict:
orders = await self.db.execute(
select(Order).where(Order.user_id == user_id)
)
return {
"orders": [
{
"order_id": o.id,
"date": o.created_at.isoformat(),
"total": str(o.total),
"items": [item.name for item in o.items],
}
for o in orders.scalars()
]
}
In production, DSARs often reveal data you forgot existed — analytics events, customer support tickets stored in a third-party system, cached recommendation profiles. Building the DSAR pipeline is an excellent exercise in data mapping.
Cascading erasure service
The right to erasure is the most technically challenging GDPR requirement. Personal data spreads across primary databases, search indices, object storage, log files, analytics platforms, message queues, and backups.
from typing import Protocol
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
class ErasureTarget(Protocol):
"""Each system holding personal data implements this."""
async def erase_user(self, user_id: str) -> ErasureResult:
...
@dataclass
class ErasureResult:
target: str
success: bool
records_deleted: int = 0
error: str | None = None
class ErasureOrchestrator:
def __init__(self):
self._targets: list[ErasureTarget] = []
self._pre_checks: list = []
def register(self, target: ErasureTarget) -> None:
self._targets.append(target)
async def execute(self, user_id: str) -> list[ErasureResult]:
"""Execute erasure across all registered targets.
Order matters: delete from dependent systems first,
then primary database, then search indices, then caches.
"""
results = []
for target in self._targets:
target_name = type(target).__name__
try:
result = await target.erase_user(user_id)
results.append(result)
logger.info(
"Erasure completed",
extra={
"target": target_name,
"user_id": user_id,
"records_deleted": result.records_deleted,
},
)
except Exception as e:
results.append(ErasureResult(
target=target_name,
success=False,
error=str(e),
))
logger.error(
"Erasure failed",
extra={"target": target_name, "user_id": user_id},
exc_info=True,
)
# Log overall result for audit trail
failed = [r for r in results if not r.success]
if failed:
logger.warning(
f"Erasure incomplete: {len(failed)} targets failed for {user_id}"
)
return results
Key considerations for erasure:
Backup retention: You can’t practically erase from backups. The accepted approach is documenting a retention period for backups and ensuring erased records aren’t restored. Some teams maintain a “tombstone” table of erased user IDs that the restore process checks against.
Anonymization as alternative: Where you have a legal basis to keep aggregate data (e.g., financial records required by tax law), anonymize rather than delete. Replace personal identifiers with irreversible hashes or remove them entirely, keeping only statistical data.
Foreign key cascades: Design your schema so user deletion cascades cleanly. If you have ON DELETE CASCADE constraints, the database handles related records. Otherwise, your orchestrator must walk the dependency graph.
Data retention automation
GDPR requires that data isn’t kept longer than necessary. Automated retention policies prevent data accumulation:
from datetime import datetime, timedelta
from sqlalchemy import delete
RETENTION_POLICIES = {
"session_logs": timedelta(days=90),
"analytics_events": timedelta(days=365),
"support_tickets": timedelta(days=730), # 2 years
"consent_records": timedelta(days=2555), # 7 years (audit requirement)
}
async def enforce_retention(session, dry_run: bool = False):
"""Purge data older than its retention period."""
now = datetime.utcnow()
for table_name, max_age in RETENTION_POLICIES.items():
cutoff = now - max_age
table = metadata.tables[table_name]
# Count first
count_query = select(func.count()).where(table.c.created_at < cutoff)
count = (await session.execute(count_query)).scalar()
if dry_run:
logger.info(f"[DRY RUN] Would delete {count} rows from {table_name}")
continue
if count > 0:
# Batch delete to avoid long locks
batch_size = 10_000
deleted_total = 0
while deleted_total < count:
stmt = (
delete(table)
.where(table.c.created_at < cutoff)
.execution_options(synchronize_session=False)
)
# Limit not standard in DELETE — use subquery
result = await session.execute(stmt)
deleted_total += result.rowcount
await session.commit()
logger.info(f"Retention: deleted {deleted_total} from {table_name}")
Breach detection and notification
The 72-hour notification window demands automation. Manual processes rarely meet this deadline.
from enum import Enum
from dataclasses import dataclass
class BreachSeverity(Enum):
LOW = "low" # No personal data likely accessed
MEDIUM = "medium" # Limited personal data exposed
HIGH = "high" # Sensitive data or large-scale exposure
CRITICAL = "critical" # Financial, health, or identity data
@dataclass
class BreachReport:
detected_at: datetime
description: str
severity: BreachSeverity
affected_users: int
data_categories: list[str]
containment_actions: list[str]
async def handle_breach(report: BreachReport, notifier):
"""Automated breach response workflow."""
# Step 1: Log with full context (immutable audit record)
logger.critical("DATA BREACH DETECTED", extra={
"severity": report.severity.value,
"affected_users": report.affected_users,
"data_categories": report.data_categories,
})
# Step 2: Notify DPO immediately
await notifier.alert_dpo(report)
# Step 3: For HIGH/CRITICAL, start the 72-hour clock
if report.severity in (BreachSeverity.HIGH, BreachSeverity.CRITICAL):
deadline = report.detected_at + timedelta(hours=72)
await notifier.create_authority_notification_task(
report=report,
deadline=deadline,
)
# Step 4: For CRITICAL, notify affected users
if report.severity == BreachSeverity.CRITICAL:
await notifier.queue_user_notifications(report)
Privacy by design with Pydantic
Use Pydantic models to enforce data minimization at the API boundary:
from pydantic import BaseModel, EmailStr, Field
class UserCreateRequest(BaseModel):
"""Only accept fields we actually need."""
email: EmailStr
display_name: str = Field(max_length=100)
# No phone, no address, no date_of_birth
# unless the feature specifically requires them
class UserPublicResponse(BaseModel):
"""Never expose internal IDs or sensitive fields in API responses."""
display_name: str
member_since: str
# No email, no user_id, no IP address
class Config:
# Prevent extra fields from leaking through
extra = "forbid"
This approach makes data minimization a compile-time concern (caught by type checkers) rather than a runtime hope.
Tradeoffs and real-world complications
Performance vs. compliance: Cascading deletes across multiple databases and services can be slow. Some teams process erasure requests asynchronously with a guaranteed SLA (e.g., “deleted within 30 days”), which GDPR allows.
Analytics vs. minimization: Product teams want detailed analytics; GDPR wants minimal data. The compromise is aggregation — collect events with anonymous session IDs, not user IDs, and aggregate before storage.
Global consistency: If you serve users in multiple jurisdictions (GDPR, CCPA, LGPD), build your privacy infrastructure for the strictest standard. Feature-flagging by jurisdiction creates technical debt that leads to compliance gaps.
The one thing to remember: GDPR compliance in Python is an architecture problem — consent records need append-only audit trails, DSAR exports must span every data store, and erasure requires orchestrated deletion across databases, caches, indices, and third-party services.
See Also
- Python Compliance Audit Trails Why your Python app needs a tamper-proof diary that records every important action — like a security camera for your data
- Python Consent Management How Python apps ask permission like a polite guest — and remember exactly what you said yes and no to
- Python Data Anonymization How Python can disguise personal information so well that nobody — not even the original collector — can figure out who it belongs to
- Python Data Retention Policies Why your Python app needs an expiration date for data — just like the one on milk cartons — and what happens when data goes stale
- Python Differential Privacy How adding a pinch of random noise to data lets companies learn from millions of people without knowing anything about any single person