Compliance Audit Trails in Python — Deep Dive
Audit event model with cryptographic integrity
Each audit record includes a hash of the previous record, creating a tamper-evident chain similar to a blockchain but simpler:
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Any
from uuid import uuid4
@dataclass
class AuditEvent:
event_id: str = field(default_factory=lambda: str(uuid4()))
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
actor_id: str = ""
actor_type: str = "user" # user | service | system
action: str = ""
resource_type: str = ""
resource_id: str = ""
outcome: str = "success" # success | failure | denied
details: dict[str, Any] = field(default_factory=dict)
previous_hash: str = ""
event_hash: str = ""
# Context fields
ip_address: str = ""
user_agent: str = ""
request_id: str = ""
session_id: str = ""
def compute_hash(self) -> str:
"""Compute SHA-256 hash of all fields except event_hash itself."""
payload = {
"event_id": self.event_id,
"timestamp": self.timestamp,
"actor_id": self.actor_id,
"actor_type": self.actor_type,
"action": self.action,
"resource_type": self.resource_type,
"resource_id": self.resource_id,
"outcome": self.outcome,
"details": self.details,
"previous_hash": self.previous_hash,
}
canonical = json.dumps(payload, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode()).hexdigest()
class AuditChain:
"""Manages the cryptographic chain of audit events."""
def __init__(self, store: "AuditStore"):
self.store = store
self._last_hash: str = ""
async def initialize(self):
"""Load the hash of the most recent event."""
last_event = await self.store.get_latest()
if last_event:
self._last_hash = last_event.event_hash
async def append(self, event: AuditEvent) -> AuditEvent:
event.previous_hash = self._last_hash
event.event_hash = event.compute_hash()
await self.store.write(event)
self._last_hash = event.event_hash
return event
async def verify_integrity(self, limit: int = 10000) -> dict:
"""Verify the chain hasn't been tampered with."""
events = await self.store.get_recent(limit)
broken_at = None
verified = 0
for i, event in enumerate(events):
expected_hash = event.compute_hash()
if expected_hash != event.event_hash:
broken_at = event.event_id
break
if i > 0:
if event.previous_hash != events[i - 1].event_hash:
broken_at = event.event_id
break
verified += 1
return {
"verified": verified,
"total": len(events),
"integrity": "valid" if broken_at is None else "BROKEN",
"broken_at": broken_at,
}
SQLAlchemy integration with automatic audit capture
Hook into SQLAlchemy’s event system to automatically capture all data modifications:
from sqlalchemy import event, inspect
from sqlalchemy.orm import Session
class SQLAlchemyAuditor:
"""Automatically capture ORM changes as audit events."""
def __init__(self, audit_chain: AuditChain):
self.chain = audit_chain
self._pending_events: list[AuditEvent] = []
def attach(self, session_factory):
event.listen(session_factory, "after_flush", self._after_flush)
event.listen(session_factory, "after_commit", self._after_commit)
event.listen(session_factory, "after_rollback", self._after_rollback)
def _after_flush(self, session: Session, flush_context):
"""Capture changes during flush (before commit)."""
for obj in session.new:
self._pending_events.append(self._create_event(
action="create",
obj=obj,
changes=self._serialize_obj(obj),
))
for obj in session.dirty:
changes = {}
insp = inspect(obj)
for attr in insp.attrs:
hist = attr.history
if hist.has_changes():
changes[attr.key] = {
"old": hist.deleted[0] if hist.deleted else None,
"new": hist.added[0] if hist.added else None,
}
if changes:
self._pending_events.append(self._create_event(
action="update",
obj=obj,
changes=changes,
))
for obj in session.deleted:
self._pending_events.append(self._create_event(
action="delete",
obj=obj,
changes=self._serialize_obj(obj),
))
def _after_commit(self, session: Session):
"""Write pending audit events after successful commit."""
import asyncio
for evt in self._pending_events:
asyncio.create_task(self.chain.append(evt))
self._pending_events.clear()
def _after_rollback(self, session: Session):
"""Discard pending events on rollback."""
self._pending_events.clear()
def _create_event(self, action: str, obj, changes: dict) -> AuditEvent:
from flask import g, request # or your framework's request context
return AuditEvent(
actor_id=getattr(g, "current_user_id", "system"),
action=action,
resource_type=type(obj).__tablename__,
resource_id=str(getattr(obj, "id", "")),
details={"changes": self._sanitize(changes)},
ip_address=getattr(request, "remote_addr", ""),
request_id=getattr(g, "request_id", ""),
)
def _serialize_obj(self, obj) -> dict:
insp = inspect(obj)
return {
c.key: getattr(obj, c.key)
for c in insp.mapper.column_attrs
if not c.key.startswith("_")
}
def _sanitize(self, data: dict) -> dict:
"""Remove sensitive values from audit details."""
sensitive_keys = {"password", "password_hash", "secret", "token", "ssn"}
sanitized = {}
for key, value in data.items():
if key.lower() in sensitive_keys:
sanitized[key] = "[REDACTED]"
elif isinstance(value, dict):
sanitized[key] = self._sanitize(value)
else:
sanitized[key] = str(value) if value is not None else None
return sanitized
Immutable storage backends
PostgreSQL with restricted permissions
-- Create audit table
CREATE TABLE audit_events (
event_id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
actor_id TEXT NOT NULL,
actor_type TEXT NOT NULL,
action TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
outcome TEXT NOT NULL,
details JSONB,
previous_hash TEXT NOT NULL,
event_hash TEXT NOT NULL,
ip_address INET,
user_agent TEXT,
request_id TEXT,
session_id TEXT
);
-- Partition by month for performance
CREATE TABLE audit_events_2026_03 PARTITION OF audit_events
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');
-- Create indexes for common query patterns
CREATE INDEX idx_audit_actor ON audit_events (actor_id, timestamp);
CREATE INDEX idx_audit_resource ON audit_events (resource_type, resource_id, timestamp);
CREATE INDEX idx_audit_action ON audit_events (action, timestamp);
-- Restrict the application user: INSERT and SELECT only
GRANT INSERT, SELECT ON audit_events TO app_audit_writer;
-- No UPDATE, DELETE, or TRUNCATE
Write to S3 with Object Lock
import boto3
import json
from datetime import datetime
class S3AuditStore:
"""Write audit events to S3 with Object Lock (WORM compliance)."""
def __init__(self, bucket: str, prefix: str = "audit/"):
self.s3 = boto3.client("s3")
self.bucket = bucket
self.prefix = prefix
async def write_batch(self, events: list[AuditEvent]) -> None:
"""Write a batch of events as a single immutable object."""
now = datetime.utcnow()
key = (
f"{self.prefix}"
f"{now.strftime('%Y/%m/%d')}/"
f"{now.strftime('%H%M%S')}-{events[0].event_id[:8]}.jsonl"
)
body = "\n".join(
json.dumps(asdict(e), default=str) for e in events
)
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=body.encode(),
ContentType="application/x-ndjson",
ObjectLockMode="COMPLIANCE",
ObjectLockRetainUntilDate=now.replace(year=now.year + 7),
)
FastAPI middleware for automatic request auditing
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
import time
class AuditMiddleware(BaseHTTPMiddleware):
AUDITED_METHODS = {"POST", "PUT", "PATCH", "DELETE"}
SKIP_PATHS = {"/health", "/metrics", "/favicon.ico"}
def __init__(self, app: FastAPI, audit_chain: AuditChain):
super().__init__(app)
self.chain = audit_chain
async def dispatch(self, request: Request, call_next):
if (request.method not in self.AUDITED_METHODS
or request.url.path in self.SKIP_PATHS):
return await call_next(request)
start = time.monotonic()
response = await call_next(request)
duration_ms = (time.monotonic() - start) * 1000
# Extract user from auth context
user_id = getattr(request.state, "user_id", "anonymous")
event = AuditEvent(
actor_id=user_id,
action=f"{request.method} {request.url.path}",
resource_type=request.url.path.split("/")[1] if "/" in request.url.path else "",
resource_id=request.path_params.get("id", ""),
outcome="success" if response.status_code < 400 else "failure",
details={
"status_code": response.status_code,
"duration_ms": round(duration_ms, 1),
"query_params": dict(request.query_params),
},
ip_address=request.client.host if request.client else "",
user_agent=request.headers.get("user-agent", ""),
request_id=request.headers.get("x-request-id", ""),
)
await self.chain.append(event)
return response
Compliance reporting queries
class AuditReporter:
"""Generate compliance reports from audit trail data."""
def __init__(self, session: AsyncSession):
self.session = session
async def access_report(
self, resource_type: str, resource_id: str,
start_date: datetime, end_date: datetime,
) -> list[dict]:
"""Who accessed this resource and when? (HIPAA, GDPR)"""
result = await self.session.execute(
text(
"SELECT actor_id, action, timestamp, ip_address, outcome "
"FROM audit_events "
"WHERE resource_type = :rtype AND resource_id = :rid "
"AND timestamp BETWEEN :start AND :end "
"ORDER BY timestamp"
),
{
"rtype": resource_type, "rid": resource_id,
"start": start_date, "end": end_date,
},
)
return [dict(row._mapping) for row in result.fetchall()]
async def privileged_action_report(
self, start_date: datetime, end_date: datetime,
) -> list[dict]:
"""All admin/system actions in the period. (SOC 2, PCI DSS)"""
result = await self.session.execute(
text(
"SELECT * FROM audit_events "
"WHERE actor_type IN ('admin', 'system') "
"AND timestamp BETWEEN :start AND :end "
"ORDER BY timestamp"
),
{"start": start_date, "end": end_date},
)
return [dict(row._mapping) for row in result.fetchall()]
async def failed_access_report(
self, start_date: datetime, end_date: datetime,
min_failures: int = 5,
) -> list[dict]:
"""Actors with repeated failures. (PCI DSS, SOC 2)"""
result = await self.session.execute(
text(
"SELECT actor_id, COUNT(*) as failure_count, "
"MIN(timestamp) as first_failure, MAX(timestamp) as last_failure "
"FROM audit_events "
"WHERE outcome IN ('failure', 'denied') "
"AND timestamp BETWEEN :start AND :end "
"GROUP BY actor_id "
"HAVING COUNT(*) >= :min_failures "
"ORDER BY failure_count DESC"
),
{"start": start_date, "end": end_date, "min_failures": min_failures},
)
return [dict(row._mapping) for row in result.fetchall()]
Tradeoffs
Performance impact: Writing an audit event for every request adds latency. Async writes to a queue (Redis, Kafka) decouple audit logging from request processing. The tradeoff is a small window where events could be lost if the queue crashes before persistence.
Storage growth: Audit trails grow indefinitely. Monthly partitioning in PostgreSQL, combined with archival to S3 after the hot query period, keeps the primary database manageable. Plan for approximately 1-5 KB per event, which accumulates to hundreds of gigabytes per year for high-traffic applications.
Sensitive data in audit logs: Audit trails must record what changed, but recording the actual values creates a secondary copy of sensitive data. The standard approach is to hash or redact sensitive values in the audit record while keeping enough context to be useful. Recording “password was changed” is sufficient; recording the actual password is a security risk.
Chain verification cost: Verifying a cryptographic chain of millions of events is slow. In practice, run periodic verification (nightly) on recent batches rather than the full chain. Store verification checkpoints to avoid re-verifying the entire history.
The one thing to remember: Production compliance audit trails require cryptographic integrity verification, append-only storage with restricted permissions, automatic capture via ORM hooks or middleware, and pre-built query patterns for the specific compliance frameworks your organization must satisfy.
See Also
- Python Consent Management How Python apps ask permission like a polite guest — and remember exactly what you said yes and no to
- Python Data Anonymization How Python can disguise personal information so well that nobody — not even the original collector — can figure out who it belongs to
- Python Data Retention Policies Why your Python app needs an expiration date for data — just like the one on milk cartons — and what happens when data goes stale
- Python Differential Privacy How adding a pinch of random noise to data lets companies learn from millions of people without knowing anything about any single person
- Python Gdpr Compliance Why Europe's privacy law is like a restaurant that must tell you every ingredient — and how Python apps follow the recipe