Regulatory Compliance Automation with Python — Deep Dive

Regulatory change monitoring

The US Federal Register provides a public API that returns new rules, proposed rules, and notices in structured JSON. Building a monitor that tracks changes relevant to your organization:

import httpx
from dataclasses import dataclass
from datetime import date, timedelta


@dataclass
class RegulatoryChange:
    document_number: str
    title: str
    abstract: str
    agencies: list[str]
    publication_date: str
    document_type: str  # rule, proposed_rule, notice
    url: str
    relevance_score: float = 0.0


class FederalRegisterMonitor:
    BASE_URL = "https://www.federalregister.gov/api/v1"

    def __init__(self, keywords: list[str], agencies: list[str]):
        self.keywords = keywords
        self.agencies = agencies
        self.client = httpx.Client(timeout=30)

    def fetch_recent_documents(
        self, days_back: int = 7
    ) -> list[RegulatoryChange]:
        """Fetch recent Federal Register documents matching criteria."""
        start_date = date.today() - timedelta(days=days_back)
        params = {
            "conditions[publication_date][gte]": start_date.isoformat(),
            "conditions[agencies][]": self.agencies,
            "per_page": 100,
            "order": "newest",
            "fields[]": [
                "document_number", "title", "abstract",
                "agencies", "publication_date", "type", "html_url",
            ],
        }

        response = self.client.get(
            f"{self.BASE_URL}/documents.json", params=params
        )
        response.raise_for_status()
        data = response.json()

        changes = []
        for doc in data.get("results", []):
            change = RegulatoryChange(
                document_number=doc["document_number"],
                title=doc["title"],
                abstract=doc.get("abstract", ""),
                agencies=[a["name"] for a in doc.get("agencies", [])],
                publication_date=doc["publication_date"],
                document_type=doc["type"],
                url=doc["html_url"],
            )
            change.relevance_score = self._score_relevance(change)
            if change.relevance_score > 0.3:
                changes.append(change)

        return sorted(changes, key=lambda c: c.relevance_score, reverse=True)

    def _score_relevance(self, change: RegulatoryChange) -> float:
        """Score how relevant a regulatory change is to our organization."""
        text = f"{change.title} {change.abstract}".lower()
        matches = sum(1 for kw in self.keywords if kw.lower() in text)
        return min(matches / max(len(self.keywords), 1), 1.0)

For non-US jurisdictions, similar approaches work with EUR-Lex (EU), legislation.gov.uk (UK), and country-specific regulatory databases. Many lack public APIs, requiring web scraping with Scrapy or playwright.

Obligation extraction with NLP

Regulatory text contains obligations hidden in dense legal language. Extracting them requires understanding modal verbs (“shall,” “must,” “may not”) and their legal significance:

import spacy
from dataclasses import dataclass
from enum import Enum


class ObligationType(Enum):
    MANDATORY = "mandatory"      # shall, must, is required to
    PROHIBITED = "prohibited"    # shall not, must not, may not
    PERMISSIVE = "permissive"    # may, is permitted to
    CONDITIONAL = "conditional"  # if/when ... then shall


@dataclass
class Obligation:
    text: str
    obligation_type: ObligationType
    subject: str          # who must comply
    action: str           # what they must do
    condition: str | None # under what circumstances
    deadline: str | None  # by when
    source_section: str   # regulatory reference


OBLIGATION_MARKERS = {
    ObligationType.MANDATORY: [
        "shall", "must", "is required to", "will ensure",
        "is obligated to", "has a duty to",
    ],
    ObligationType.PROHIBITED: [
        "shall not", "must not", "may not", "is prohibited from",
        "will not", "is not permitted to",
    ],
    ObligationType.PERMISSIVE: [
        "may", "is permitted to", "is authorized to", "has the right to",
    ],
}


def extract_obligations(text: str, section_id: str) -> list[Obligation]:
    """Extract regulatory obligations from legal text."""
    nlp = spacy.load("en_core_web_trf")
    doc = nlp(text)
    obligations = []

    for sent in doc.sents:
        sent_text = sent.text.strip()
        sent_lower = sent_text.lower()

        # Determine obligation type
        ob_type = None
        # Check prohibitions first (they contain "shall" + "not")
        for marker in OBLIGATION_MARKERS[ObligationType.PROHIBITED]:
            if marker in sent_lower:
                ob_type = ObligationType.PROHIBITED
                break
        if not ob_type:
            for marker in OBLIGATION_MARKERS[ObligationType.MANDATORY]:
                if marker in sent_lower:
                    ob_type = ObligationType.MANDATORY
                    break
        if not ob_type:
            for marker in OBLIGATION_MARKERS[ObligationType.PERMISSIVE]:
                if marker in sent_lower:
                    ob_type = ObligationType.PERMISSIVE
                    break

        if not ob_type:
            continue

        # Extract subject (who must comply)
        subject = "unspecified"
        for token in sent:
            if token.dep_ == "nsubj":
                subject = " ".join(
                    t.text for t in token.subtree
                ).strip()
                break

        # Detect conditional structure
        condition = None
        if sent_lower.startswith(("if ", "when ", "where ", "in the event")):
            parts = sent_text.split(",", 1)
            if len(parts) == 2:
                condition = parts[0].strip()

        # Extract deadline patterns
        deadline = None
        import re
        deadline_match = re.search(
            r"within (\d+ (?:days?|business days?|hours?|months?))",
            sent_text, re.IGNORECASE,
        )
        if deadline_match:
            deadline = deadline_match.group(1)

        obligations.append(Obligation(
            text=sent_text,
            obligation_type=ob_type,
            subject=subject,
            action=sent_text,
            condition=condition,
            deadline=deadline,
            source_section=section_id,
        ))

    return obligations

Obligation-to-control mapping

Once obligations are extracted, they need to map to internal controls. This uses semantic similarity to match regulatory requirements with existing company procedures:

from sentence_transformers import SentenceTransformer, util
from dataclasses import dataclass
import numpy as np


@dataclass
class InternalControl:
    control_id: str
    description: str
    owner: str
    evidence_type: str  # log, config, attestation, report


@dataclass
class ControlMapping:
    obligation: Obligation
    control: InternalControl
    similarity: float
    gap: bool  # True if no adequate control exists


class ObligationMapper:
    def __init__(self, controls: list[InternalControl]):
        self.model = SentenceTransformer("all-MiniLM-L6-v2")
        self.controls = controls
        self.control_embeddings = self.model.encode(
            [c.description for c in controls],
            convert_to_tensor=True,
        )

    def map_obligations(
        self, obligations: list[Obligation], threshold: float = 0.45
    ) -> list[ControlMapping]:
        """Map each obligation to the best matching internal control."""
        mappings = []
        for ob in obligations:
            ob_embedding = self.model.encode(
                ob.text, convert_to_tensor=True
            )
            similarities = util.cos_sim(
                ob_embedding, self.control_embeddings
            )[0]
            best_idx = int(similarities.argmax())
            best_score = float(similarities[best_idx])

            mappings.append(ControlMapping(
                obligation=ob,
                control=self.controls[best_idx],
                similarity=best_score,
                gap=best_score < threshold,
            ))

        return mappings

Automated evidence collection

Compliance evidence needs to be collected automatically, time-stamped, and stored immutably:

import hashlib
import json
from datetime import datetime, timezone
from dataclasses import dataclass
from pathlib import Path


@dataclass
class EvidenceRecord:
    control_id: str
    evidence_type: str
    collected_at: str
    data: dict
    hash: str
    collector: str


class EvidenceCollector:
    def __init__(self, storage_path: Path):
        self.storage = storage_path
        self.storage.mkdir(parents=True, exist_ok=True)

    def collect_and_store(
        self,
        control_id: str,
        evidence_type: str,
        data: dict,
        collector: str = "automated",
    ) -> EvidenceRecord:
        """Collect evidence with tamper-evident hash chain."""
        timestamp = datetime.now(timezone.utc).isoformat()
        payload = json.dumps(
            {"control_id": control_id, "timestamp": timestamp, "data": data},
            sort_keys=True,
        )
        evidence_hash = hashlib.sha256(payload.encode()).hexdigest()

        record = EvidenceRecord(
            control_id=control_id,
            evidence_type=evidence_type,
            collected_at=timestamp,
            data=data,
            hash=evidence_hash,
            collector=collector,
        )

        # Store as append-only JSONL
        file_path = self.storage / f"{control_id}_{evidence_type}.jsonl"
        with open(file_path, "a") as f:
            f.write(json.dumps({
                "control_id": record.control_id,
                "type": record.evidence_type,
                "collected_at": record.collected_at,
                "data": record.data,
                "hash": record.hash,
                "collector": record.collector,
            }) + "\n")

        return record


# Example: collect encryption evidence from AWS
def collect_s3_encryption_evidence(
    collector: EvidenceCollector,
) -> list[EvidenceRecord]:
    """Check S3 bucket encryption and store evidence."""
    import boto3
    s3 = boto3.client("s3")
    records = []

    for bucket in s3.list_buckets()["Buckets"]:
        name = bucket["Name"]
        try:
            encryption = s3.get_bucket_encryption(Bucket=name)
            rules = encryption["ServerSideEncryptionConfiguration"]["Rules"]
            compliant = any(
                r["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
                in ("aws:kms", "AES256")
                for r in rules
            )
        except s3.exceptions.ClientError:
            compliant = False
            rules = []

        record = collector.collect_and_store(
            control_id="CTRL-ENC-001",
            evidence_type="s3_encryption",
            data={
                "bucket": name,
                "compliant": compliant,
                "encryption_rules": str(rules),
            },
        )
        records.append(record)

    return records

Compliance report generation

Regulators expect specific formats. Python generates these using templates:

from jinja2 import Environment, FileSystemLoader
from datetime import datetime, timezone
from dataclasses import dataclass


@dataclass
class ComplianceReport:
    report_id: str
    regulation: str
    period: str
    total_controls: int
    compliant_controls: int
    gaps: list[dict]
    evidence_summary: list[dict]


def generate_compliance_report(
    report: ComplianceReport,
    template_dir: str = "templates/compliance",
) -> str:
    """Generate an audit-ready compliance report."""
    env = Environment(loader=FileSystemLoader(template_dir))
    template = env.get_template("compliance_report.html")

    compliance_rate = (
        report.compliant_controls / report.total_controls * 100
        if report.total_controls > 0
        else 0
    )

    return template.render(
        report=report,
        compliance_rate=round(compliance_rate, 1),
        generated_at=datetime.now(timezone.utc).isoformat(),
        risk_level="HIGH" if compliance_rate < 80 else (
            "MEDIUM" if compliance_rate < 95 else "LOW"
        ),
    )

Scheduling and orchestration

Compliance checks need to run on reliable schedules. Different requirements have different frequencies — some daily, some quarterly:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger


def setup_compliance_scheduler():
    scheduler = BackgroundScheduler()

    # Daily: check infrastructure compliance
    scheduler.add_job(
        run_infrastructure_checks,
        CronTrigger(hour=6, minute=0),
        id="daily_infra_check",
        name="Daily Infrastructure Compliance",
    )

    # Weekly: scan for regulatory changes
    scheduler.add_job(
        scan_regulatory_changes,
        CronTrigger(day_of_week="mon", hour=8),
        id="weekly_reg_scan",
        name="Weekly Regulatory Change Scan",
    )

    # Quarterly: generate full compliance report
    scheduler.add_job(
        generate_quarterly_report,
        CronTrigger(month="1,4,7,10", day=1, hour=9),
        id="quarterly_report",
        name="Quarterly Compliance Report",
    )

    scheduler.start()
    return scheduler

Tradeoffs and real-world considerations

False positives vs. false negatives — Compliance automation must err on the side of false positives. Missing a real violation is far worse than flagging a non-issue. Set sensitivity high and let human reviewers dismiss false alarms.

Regulation interpretation — The same regulatory text can be interpreted differently by different lawyers. Automation encodes one interpretation. Document your interpretation logic and get legal sign-off before deploying.

Multi-jurisdiction conflicts — GDPR’s right to erasure can conflict with financial regulations requiring record retention. Automation must flag these conflicts rather than silently applying one rule over another.

Audit trail integrity — Auditors must trust that evidence wasn’t tampered with. Use append-only storage, cryptographic hashing, and ideally write evidence to an immutable store like AWS QLDB or a blockchain-based ledger for critical records.

The one thing to remember: Production regulatory compliance automation chains together change monitoring, obligation extraction, control mapping, evidence collection, and report generation — each component automated but with human oversight at interpretation and judgment points.

pythoncomplianceautomationregulatory

See Also

  • Python Playwright Automation Use a concrete everyday metaphor to understand reliable browser automation with Playwright for Python before touching code.
  • Python Selenium Automation Use a concrete everyday metaphor to understand browser automation and UI regression checks with Selenium before touching code.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
  • Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.