Privacy Impact Assessment with Python — Deep Dive

PII detection with Microsoft Presidio

Presidio provides a production-ready PII detection engine. It combines NLP models with regex patterns and context-aware rules:

from presidio_analyzer import AnalyzerEngine, RecognizerResult
from presidio_analyzer.nlp_engine import NlpEngineProvider
from dataclasses import dataclass


@dataclass
class PIIFinding:
    entity_type: str
    text: str
    score: float
    start: int
    end: int
    source: str


class PIIScanner:
    def __init__(self):
        provider = NlpEngineProvider(nlp_configuration={
            "nlp_engine_name": "spacy",
            "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
        })
        self.analyzer = AnalyzerEngine(
            nlp_engine=provider.create_engine(),
            supported_languages=["en"],
        )

    def scan_text(self, text: str, source: str = "") -> list[PIIFinding]:
        """Scan free text for PII entities."""
        results = self.analyzer.analyze(
            text=text,
            language="en",
            entities=[
                "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
                "CREDIT_CARD", "US_SSN", "IP_ADDRESS",
                "LOCATION", "DATE_TIME", "IBAN_CODE",
                "MEDICAL_LICENSE", "US_PASSPORT",
            ],
            score_threshold=0.5,
        )
        return [
            PIIFinding(
                entity_type=r.entity_type,
                text=text[r.start:r.end],
                score=r.score,
                start=r.start,
                end=r.end,
                source=source,
            )
            for r in results
        ]

    def scan_database_sample(
        self, connection_string: str, table: str, sample_size: int = 1000
    ) -> dict[str, list[PIIFinding]]:
        """Sample rows from a database table and scan each column for PII."""
        import sqlalchemy as sa

        engine = sa.create_engine(connection_string)
        with engine.connect() as conn:
            result = conn.execute(
                sa.text(f"SELECT * FROM {table} LIMIT :n"),
                {"n": sample_size},
            )
            columns = result.keys()
            rows = result.fetchall()

        findings_by_column: dict[str, list[PIIFinding]] = {}
        for col in columns:
            col_findings = []
            col_idx = list(columns).index(col)
            for row in rows:
                value = str(row[col_idx]) if row[col_idx] else ""
                if value:
                    hits = self.scan_text(value, source=f"{table}.{col}")
                    col_findings.extend(hits)
            if col_findings:
                findings_by_column[col] = col_findings

        return findings_by_column

Data flow mapping

Tracing how personal data moves between systems creates the foundation for risk assessment:

from dataclasses import dataclass, field
from enum import Enum
import json


class DataFlowType(Enum):
    COLLECTION = "collection"
    STORAGE = "storage"
    PROCESSING = "processing"
    TRANSFER = "transfer"
    DELETION = "deletion"


class CrossBorderTransfer(Enum):
    NONE = "none"
    EU_TO_EU = "eu_to_eu"
    EU_TO_ADEQUATE = "eu_to_adequate"
    EU_TO_THIRD_COUNTRY = "eu_to_third_country"


@dataclass
class DataFlowNode:
    system_id: str
    name: str
    description: str
    data_categories: list[str]  # e.g., ["name", "email", "health_data"]
    legal_basis: str  # consent, contract, legitimate_interest, etc.
    retention_period: str
    encryption_at_rest: bool
    encryption_in_transit: bool
    access_controls: list[str]
    location: str  # country/region


@dataclass
class DataFlow:
    source: DataFlowNode
    destination: DataFlowNode
    flow_type: DataFlowType
    data_categories: list[str]
    cross_border: CrossBorderTransfer
    purpose: str
    automated_decision_making: bool = False


@dataclass
class DataFlowMap:
    project_name: str
    nodes: list[DataFlowNode] = field(default_factory=list)
    flows: list[DataFlow] = field(default_factory=list)

    def find_cross_border_flows(self) -> list[DataFlow]:
        return [
            f for f in self.flows
            if f.cross_border == CrossBorderTransfer.EU_TO_THIRD_COUNTRY
        ]

    def find_unencrypted_storage(self) -> list[DataFlowNode]:
        return [
            n for n in self.nodes
            if not n.encryption_at_rest
            and any(cat in n.data_categories for cat in [
                "health_data", "financial_data", "biometric_data",
            ])
        ]

    def to_mermaid(self) -> str:
        """Generate a Mermaid flowchart for visualization."""
        lines = ["graph LR"]
        for flow in self.flows:
            src = flow.source.system_id
            dst = flow.destination.system_id
            label = f"{flow.flow_type.value}: {', '.join(flow.data_categories[:2])}"
            style = "-->|🔴|" if flow.cross_border == CrossBorderTransfer.EU_TO_THIRD_COUNTRY else "-->|✅|"
            lines.append(f"    {src}[{flow.source.name}] {style} {dst}[{flow.destination.name}]")
        return "\n".join(lines)

GDPR risk scoring

GDPR requires assessing both the likelihood and severity of harm. The European Data Protection Board provides guidance on risk factors:

from dataclasses import dataclass
from enum import IntEnum


class Likelihood(IntEnum):
    NEGLIGIBLE = 1
    LIMITED = 2
    SIGNIFICANT = 3
    MAXIMUM = 4


class Severity(IntEnum):
    NEGLIGIBLE = 1
    LIMITED = 2
    SIGNIFICANT = 3
    MAXIMUM = 4


@dataclass
class RiskFactor:
    name: str
    description: str
    likelihood: Likelihood
    severity: Severity

    @property
    def risk_score(self) -> int:
        return self.likelihood * self.severity

    @property
    def risk_level(self) -> str:
        score = self.risk_score
        if score <= 2:
            return "low"
        elif score <= 6:
            return "medium"
        elif score <= 12:
            return "high"
        return "critical"


class GDPRRiskAssessor:
    """Assess privacy risks based on GDPR Article 35 criteria."""

    SENSITIVE_CATEGORIES = {
        "health_data", "biometric_data", "genetic_data",
        "racial_ethnic_origin", "political_opinions",
        "religious_beliefs", "trade_union_membership",
        "sexual_orientation", "criminal_records",
    }

    def assess_data_flow_map(
        self, flow_map: DataFlowMap
    ) -> list[RiskFactor]:
        risks = []

        # Check for sensitive data categories
        all_categories = set()
        for node in flow_map.nodes:
            all_categories.update(node.data_categories)

        sensitive_found = all_categories & self.SENSITIVE_CATEGORIES
        if sensitive_found:
            risks.append(RiskFactor(
                name="Special category data processing",
                description=f"Processing sensitive data: {', '.join(sensitive_found)}",
                likelihood=Likelihood.SIGNIFICANT,
                severity=Severity.MAXIMUM,
            ))

        # Check for cross-border transfers
        cross_border = flow_map.find_cross_border_flows()
        if cross_border:
            risks.append(RiskFactor(
                name="Cross-border data transfer",
                description=f"{len(cross_border)} flows to third countries without adequacy decisions",
                likelihood=Likelihood.SIGNIFICANT,
                severity=Severity.SIGNIFICANT,
            ))

        # Check for unencrypted sensitive storage
        unencrypted = flow_map.find_unencrypted_storage()
        if unencrypted:
            risks.append(RiskFactor(
                name="Unencrypted sensitive data storage",
                description=f"{len(unencrypted)} systems store sensitive data without encryption at rest",
                likelihood=Likelihood.SIGNIFICANT,
                severity=Severity.MAXIMUM,
            ))

        # Check for automated decision-making
        auto_flows = [f for f in flow_map.flows if f.automated_decision_making]
        if auto_flows:
            risks.append(RiskFactor(
                name="Automated decision-making",
                description="Processing involves automated decisions with legal or significant effects",
                likelihood=Likelihood.LIMITED,
                severity=Severity.SIGNIFICANT,
            ))

        # Check for missing retention policies
        no_retention = [
            n for n in flow_map.nodes if not n.retention_period
        ]
        if no_retention:
            risks.append(RiskFactor(
                name="Missing retention policy",
                description=f"{len(no_retention)} systems have no defined data retention period",
                likelihood=Likelihood.SIGNIFICANT,
                severity=Severity.LIMITED,
            ))

        return risks

    def requires_dpia(self, risks: list[RiskFactor]) -> bool:
        """Determine if a full DPIA is mandatory per GDPR Article 35."""
        high_risks = [r for r in risks if r.risk_level in ("high", "critical")]
        return len(high_risks) >= 2

Generating the PIA report

The final output is a structured document that satisfies regulatory requirements:

from jinja2 import Environment, BaseLoader
from datetime import datetime, timezone


PIA_TEMPLATE = """
# Privacy Impact Assessment Report

**Project:** {{ project_name }}
**Assessor:** {{ assessor }}
**Date:** {{ assessment_date }}
**Status:** {{ "DPIA Required" if dpia_required else "Standard PIA" }}

## 1. Processing Description

{{ processing_description }}

## 2. Data Inventory

| System | Data Categories | Legal Basis | Encryption | Retention |
|--------|----------------|-------------|------------|-----------|
{% for node in nodes -%}
| {{ node.name }} | {{ node.data_categories | join(', ') }} | {{ node.legal_basis }} | {{ '✅' if node.encryption_at_rest else '❌' }} | {{ node.retention_period or 'Not defined' }} |
{% endfor %}

## 3. Data Flows

{{ mermaid_diagram }}

### Cross-Border Transfers
{% if cross_border_flows -%}
{% for flow in cross_border_flows -%}
- **{{ flow.source.name }}** → **{{ flow.destination.name }}** ({{ flow.destination.location }}): {{ flow.data_categories | join(', ') }}
{% endfor %}
{%- else -%}
No cross-border transfers to third countries identified.
{%- endif %}

## 4. Risk Assessment

| Risk | Likelihood | Severity | Score | Level |
|------|-----------|----------|-------|-------|
{% for risk in risks -%}
| {{ risk.name }} | {{ risk.likelihood.name }} | {{ risk.severity.name }} | {{ risk.risk_score }} | {{ risk.risk_level | upper }} |
{% endfor %}

## 5. Recommended Mitigations
{% for risk in risks if risk.risk_level in ('high', 'critical') -%}
### {{ risk.name }}
- **Risk:** {{ risk.description }}
- **Recommended action:** {{ mitigations.get(risk.name, 'Consult DPO for mitigation strategy') }}
{% endfor %}

## 6. DPO Consultation

{% if dpia_required -%}
⚠️ This processing triggers mandatory DPIA under GDPR Article 35. DPO consultation required before proceeding.
{%- else -%}
Standard PIA completed. No mandatory DPIA triggers identified.
{%- endif %}
"""


def generate_pia_report(
    flow_map: DataFlowMap,
    risks: list[RiskFactor],
    dpia_required: bool,
    assessor: str,
    processing_description: str,
    mitigations: dict[str, str] | None = None,
) -> str:
    env = Environment(loader=BaseLoader())
    template = env.from_string(PIA_TEMPLATE)

    return template.render(
        project_name=flow_map.project_name,
        assessor=assessor,
        assessment_date=datetime.now(timezone.utc).strftime("%Y-%m-%d"),
        dpia_required=dpia_required,
        processing_description=processing_description,
        nodes=flow_map.nodes,
        mermaid_diagram=flow_map.to_mermaid(),
        cross_border_flows=flow_map.find_cross_border_flows(),
        risks=risks,
        mitigations=mitigations or {},
    )

Integration with CI/CD

Privacy assessments should run automatically when infrastructure or data schemas change:

# .github/workflows/privacy-check.yml example logic
def check_schema_changes_for_pii(
    old_schema: dict, new_schema: dict, scanner: PIIScanner
) -> list[str]:
    """Check if schema changes introduce new PII columns."""
    warnings = []
    old_columns = set(old_schema.get("columns", {}).keys())
    new_columns = set(new_schema.get("columns", {}).keys())

    added = new_columns - old_columns
    for col in added:
        col_type = new_schema["columns"][col]
        # Heuristic: check column name for PII indicators
        pii_indicators = [
            "name", "email", "phone", "ssn", "address",
            "dob", "birth", "passport", "license",
        ]
        if any(indicator in col.lower() for indicator in pii_indicators):
            warnings.append(
                f"New column '{col}' ({col_type}) may contain PII. "
                f"Update the data flow map and PIA."
            )

    return warnings

Tradeoffs

Automation depth vs. accuracy — Automated PII detection catches common patterns but misses context. A column named “favorite_color” isn’t PII, but combined with other fields it could become identifying. Human review of automated findings is essential.

Scanning frequency vs. performance — Full database PII scans are expensive on production systems. Sampling (scan 1,000 rows instead of 10 million) provides statistical confidence without the load, but can miss rare PII patterns.

Template rigidity vs. flexibility — Standardized PIA templates ensure consistency but may not capture organization-specific risks. Build templates that cover regulatory minimums, then allow custom risk factors.

The one thing to remember: A production PIA pipeline combines Presidio-based PII scanning, data flow mapping with cross-border tracking, GDPR-aligned risk scoring, and automated report generation — creating continuous privacy assessment that catches risks as systems evolve rather than once before launch.

pythonprivacygdprdata-protection

See Also

  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
  • Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
  • Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.
  • Python 312 New Features Python 3.12 made type hints shorter, f-strings more powerful, and started preparing Python's engine for a world without the GIL.