Regulatory Compliance Automation with Python — Deep Dive
Regulatory change monitoring
The US Federal Register provides a public API that returns new rules, proposed rules, and notices in structured JSON. Building a monitor that tracks changes relevant to your organization:
import httpx
from dataclasses import dataclass
from datetime import date, timedelta
@dataclass
class RegulatoryChange:
document_number: str
title: str
abstract: str
agencies: list[str]
publication_date: str
document_type: str # rule, proposed_rule, notice
url: str
relevance_score: float = 0.0
class FederalRegisterMonitor:
BASE_URL = "https://www.federalregister.gov/api/v1"
def __init__(self, keywords: list[str], agencies: list[str]):
self.keywords = keywords
self.agencies = agencies
self.client = httpx.Client(timeout=30)
def fetch_recent_documents(
self, days_back: int = 7
) -> list[RegulatoryChange]:
"""Fetch recent Federal Register documents matching criteria."""
start_date = date.today() - timedelta(days=days_back)
params = {
"conditions[publication_date][gte]": start_date.isoformat(),
"conditions[agencies][]": self.agencies,
"per_page": 100,
"order": "newest",
"fields[]": [
"document_number", "title", "abstract",
"agencies", "publication_date", "type", "html_url",
],
}
response = self.client.get(
f"{self.BASE_URL}/documents.json", params=params
)
response.raise_for_status()
data = response.json()
changes = []
for doc in data.get("results", []):
change = RegulatoryChange(
document_number=doc["document_number"],
title=doc["title"],
abstract=doc.get("abstract", ""),
agencies=[a["name"] for a in doc.get("agencies", [])],
publication_date=doc["publication_date"],
document_type=doc["type"],
url=doc["html_url"],
)
change.relevance_score = self._score_relevance(change)
if change.relevance_score > 0.3:
changes.append(change)
return sorted(changes, key=lambda c: c.relevance_score, reverse=True)
def _score_relevance(self, change: RegulatoryChange) -> float:
"""Score how relevant a regulatory change is to our organization."""
text = f"{change.title} {change.abstract}".lower()
matches = sum(1 for kw in self.keywords if kw.lower() in text)
return min(matches / max(len(self.keywords), 1), 1.0)
For non-US jurisdictions, similar approaches work with EUR-Lex (EU), legislation.gov.uk (UK), and country-specific regulatory databases. Many lack public APIs, requiring web scraping with Scrapy or playwright.
Obligation extraction with NLP
Regulatory text contains obligations hidden in dense legal language. Extracting them requires understanding modal verbs (“shall,” “must,” “may not”) and their legal significance:
import spacy
from dataclasses import dataclass
from enum import Enum
class ObligationType(Enum):
MANDATORY = "mandatory" # shall, must, is required to
PROHIBITED = "prohibited" # shall not, must not, may not
PERMISSIVE = "permissive" # may, is permitted to
CONDITIONAL = "conditional" # if/when ... then shall
@dataclass
class Obligation:
text: str
obligation_type: ObligationType
subject: str # who must comply
action: str # what they must do
condition: str | None # under what circumstances
deadline: str | None # by when
source_section: str # regulatory reference
OBLIGATION_MARKERS = {
ObligationType.MANDATORY: [
"shall", "must", "is required to", "will ensure",
"is obligated to", "has a duty to",
],
ObligationType.PROHIBITED: [
"shall not", "must not", "may not", "is prohibited from",
"will not", "is not permitted to",
],
ObligationType.PERMISSIVE: [
"may", "is permitted to", "is authorized to", "has the right to",
],
}
def extract_obligations(text: str, section_id: str) -> list[Obligation]:
"""Extract regulatory obligations from legal text."""
nlp = spacy.load("en_core_web_trf")
doc = nlp(text)
obligations = []
for sent in doc.sents:
sent_text = sent.text.strip()
sent_lower = sent_text.lower()
# Determine obligation type
ob_type = None
# Check prohibitions first (they contain "shall" + "not")
for marker in OBLIGATION_MARKERS[ObligationType.PROHIBITED]:
if marker in sent_lower:
ob_type = ObligationType.PROHIBITED
break
if not ob_type:
for marker in OBLIGATION_MARKERS[ObligationType.MANDATORY]:
if marker in sent_lower:
ob_type = ObligationType.MANDATORY
break
if not ob_type:
for marker in OBLIGATION_MARKERS[ObligationType.PERMISSIVE]:
if marker in sent_lower:
ob_type = ObligationType.PERMISSIVE
break
if not ob_type:
continue
# Extract subject (who must comply)
subject = "unspecified"
for token in sent:
if token.dep_ == "nsubj":
subject = " ".join(
t.text for t in token.subtree
).strip()
break
# Detect conditional structure
condition = None
if sent_lower.startswith(("if ", "when ", "where ", "in the event")):
parts = sent_text.split(",", 1)
if len(parts) == 2:
condition = parts[0].strip()
# Extract deadline patterns
deadline = None
import re
deadline_match = re.search(
r"within (\d+ (?:days?|business days?|hours?|months?))",
sent_text, re.IGNORECASE,
)
if deadline_match:
deadline = deadline_match.group(1)
obligations.append(Obligation(
text=sent_text,
obligation_type=ob_type,
subject=subject,
action=sent_text,
condition=condition,
deadline=deadline,
source_section=section_id,
))
return obligations
Obligation-to-control mapping
Once obligations are extracted, they need to map to internal controls. This uses semantic similarity to match regulatory requirements with existing company procedures:
from sentence_transformers import SentenceTransformer, util
from dataclasses import dataclass
import numpy as np
@dataclass
class InternalControl:
control_id: str
description: str
owner: str
evidence_type: str # log, config, attestation, report
@dataclass
class ControlMapping:
obligation: Obligation
control: InternalControl
similarity: float
gap: bool # True if no adequate control exists
class ObligationMapper:
def __init__(self, controls: list[InternalControl]):
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.controls = controls
self.control_embeddings = self.model.encode(
[c.description for c in controls],
convert_to_tensor=True,
)
def map_obligations(
self, obligations: list[Obligation], threshold: float = 0.45
) -> list[ControlMapping]:
"""Map each obligation to the best matching internal control."""
mappings = []
for ob in obligations:
ob_embedding = self.model.encode(
ob.text, convert_to_tensor=True
)
similarities = util.cos_sim(
ob_embedding, self.control_embeddings
)[0]
best_idx = int(similarities.argmax())
best_score = float(similarities[best_idx])
mappings.append(ControlMapping(
obligation=ob,
control=self.controls[best_idx],
similarity=best_score,
gap=best_score < threshold,
))
return mappings
Automated evidence collection
Compliance evidence needs to be collected automatically, time-stamped, and stored immutably:
import hashlib
import json
from datetime import datetime, timezone
from dataclasses import dataclass
from pathlib import Path
@dataclass
class EvidenceRecord:
control_id: str
evidence_type: str
collected_at: str
data: dict
hash: str
collector: str
class EvidenceCollector:
def __init__(self, storage_path: Path):
self.storage = storage_path
self.storage.mkdir(parents=True, exist_ok=True)
def collect_and_store(
self,
control_id: str,
evidence_type: str,
data: dict,
collector: str = "automated",
) -> EvidenceRecord:
"""Collect evidence with tamper-evident hash chain."""
timestamp = datetime.now(timezone.utc).isoformat()
payload = json.dumps(
{"control_id": control_id, "timestamp": timestamp, "data": data},
sort_keys=True,
)
evidence_hash = hashlib.sha256(payload.encode()).hexdigest()
record = EvidenceRecord(
control_id=control_id,
evidence_type=evidence_type,
collected_at=timestamp,
data=data,
hash=evidence_hash,
collector=collector,
)
# Store as append-only JSONL
file_path = self.storage / f"{control_id}_{evidence_type}.jsonl"
with open(file_path, "a") as f:
f.write(json.dumps({
"control_id": record.control_id,
"type": record.evidence_type,
"collected_at": record.collected_at,
"data": record.data,
"hash": record.hash,
"collector": record.collector,
}) + "\n")
return record
# Example: collect encryption evidence from AWS
def collect_s3_encryption_evidence(
collector: EvidenceCollector,
) -> list[EvidenceRecord]:
"""Check S3 bucket encryption and store evidence."""
import boto3
s3 = boto3.client("s3")
records = []
for bucket in s3.list_buckets()["Buckets"]:
name = bucket["Name"]
try:
encryption = s3.get_bucket_encryption(Bucket=name)
rules = encryption["ServerSideEncryptionConfiguration"]["Rules"]
compliant = any(
r["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
in ("aws:kms", "AES256")
for r in rules
)
except s3.exceptions.ClientError:
compliant = False
rules = []
record = collector.collect_and_store(
control_id="CTRL-ENC-001",
evidence_type="s3_encryption",
data={
"bucket": name,
"compliant": compliant,
"encryption_rules": str(rules),
},
)
records.append(record)
return records
Compliance report generation
Regulators expect specific formats. Python generates these using templates:
from jinja2 import Environment, FileSystemLoader
from datetime import datetime, timezone
from dataclasses import dataclass
@dataclass
class ComplianceReport:
report_id: str
regulation: str
period: str
total_controls: int
compliant_controls: int
gaps: list[dict]
evidence_summary: list[dict]
def generate_compliance_report(
report: ComplianceReport,
template_dir: str = "templates/compliance",
) -> str:
"""Generate an audit-ready compliance report."""
env = Environment(loader=FileSystemLoader(template_dir))
template = env.get_template("compliance_report.html")
compliance_rate = (
report.compliant_controls / report.total_controls * 100
if report.total_controls > 0
else 0
)
return template.render(
report=report,
compliance_rate=round(compliance_rate, 1),
generated_at=datetime.now(timezone.utc).isoformat(),
risk_level="HIGH" if compliance_rate < 80 else (
"MEDIUM" if compliance_rate < 95 else "LOW"
),
)
Scheduling and orchestration
Compliance checks need to run on reliable schedules. Different requirements have different frequencies — some daily, some quarterly:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
def setup_compliance_scheduler():
scheduler = BackgroundScheduler()
# Daily: check infrastructure compliance
scheduler.add_job(
run_infrastructure_checks,
CronTrigger(hour=6, minute=0),
id="daily_infra_check",
name="Daily Infrastructure Compliance",
)
# Weekly: scan for regulatory changes
scheduler.add_job(
scan_regulatory_changes,
CronTrigger(day_of_week="mon", hour=8),
id="weekly_reg_scan",
name="Weekly Regulatory Change Scan",
)
# Quarterly: generate full compliance report
scheduler.add_job(
generate_quarterly_report,
CronTrigger(month="1,4,7,10", day=1, hour=9),
id="quarterly_report",
name="Quarterly Compliance Report",
)
scheduler.start()
return scheduler
Tradeoffs and real-world considerations
False positives vs. false negatives — Compliance automation must err on the side of false positives. Missing a real violation is far worse than flagging a non-issue. Set sensitivity high and let human reviewers dismiss false alarms.
Regulation interpretation — The same regulatory text can be interpreted differently by different lawyers. Automation encodes one interpretation. Document your interpretation logic and get legal sign-off before deploying.
Multi-jurisdiction conflicts — GDPR’s right to erasure can conflict with financial regulations requiring record retention. Automation must flag these conflicts rather than silently applying one rule over another.
Audit trail integrity — Auditors must trust that evidence wasn’t tampered with. Use append-only storage, cryptographic hashing, and ideally write evidence to an immutable store like AWS QLDB or a blockchain-based ledger for critical records.
The one thing to remember: Production regulatory compliance automation chains together change monitoring, obligation extraction, control mapping, evidence collection, and report generation — each component automated but with human oversight at interpretation and judgment points.
See Also
- Python Playwright Automation Use a concrete everyday metaphor to understand reliable browser automation with Playwright for Python before touching code.
- Python Selenium Automation Use a concrete everyday metaphor to understand browser automation and UI regression checks with Selenium before touching code.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
- Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
- Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.