Data Masking Techniques in Python — Deep Dive

Deterministic masking with Faker and HMAC-based seeding

The key to consistent masking across tables is deterministic fake generation — the same input always produces the same fake output.

import hashlib
import hmac
from faker import Faker

class DeterministicMasker:
    """Masks values consistently using HMAC-derived seeds."""

    def __init__(self, secret_key: str, locale: str = "en_US"):
        self.key = secret_key.encode()
        self.faker = Faker(locale)

    def _seed_for(self, value: str, domain: str) -> int:
        """Derive a deterministic seed from value and domain."""
        mac = hmac.new(self.key, f"{domain}:{value}".encode(), hashlib.sha256)
        return int(mac.hexdigest(), 16) % (2**32)

    def mask_name(self, real_name: str) -> str:
        seed = self._seed_for(real_name, "name")
        Faker.seed(seed)
        return self.faker.name()

    def mask_email(self, real_email: str) -> str:
        seed = self._seed_for(real_email, "email")
        Faker.seed(seed)
        return self.faker.email()

    def mask_phone(self, real_phone: str) -> str:
        seed = self._seed_for(real_phone, "phone")
        Faker.seed(seed)
        return self.faker.phone_number()

    def mask_address(self, real_address: str) -> str:
        seed = self._seed_for(real_address, "address")
        Faker.seed(seed)
        return self.faker.address()

# Usage: same name always maps to same fake
masker = DeterministicMasker(secret_key="production-masking-key-2026")
print(masker.mask_name("John Smith"))   # Always "Maria Garcia" (or whatever the seed produces)
print(masker.mask_name("John Smith"))   # Same result — deterministic

The HMAC key must be stored securely and rotated periodically. Anyone with the key and the algorithm can replicate the mapping — which is necessary for consistency but a risk if leaked.

Format-preserving encryption for structured fields

When masked values must match the exact format of originals — same length, same character classes, passing validation checks:

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import struct

class FormatPreservingMasker:
    """Simple FPE using AES in CTR mode with format constraints."""

    def __init__(self, key: bytes):
        self.key = key

    def mask_credit_card(self, card_number: str) -> str:
        """Mask credit card preserving format and Luhn validity."""
        digits = card_number.replace("-", "").replace(" ", "")
        prefix = digits[:6]   # Keep BIN/IIN for realistic routing
        suffix = digits[-4:]  # Keep last 4 (common business requirement)

        # Encrypt middle digits
        middle = digits[6:-4]
        encrypted_middle = self._encrypt_digits(middle, f"cc:{digits}")

        # Reconstruct and fix Luhn check digit
        masked = prefix + encrypted_middle + suffix[:-1]
        check = self._luhn_check_digit(masked)
        return self._format_card(masked + str(check), card_number)

    def mask_ssn(self, ssn: str) -> str:
        """Mask SSN preserving XXX-XX-XXXX format."""
        digits = ssn.replace("-", "")
        encrypted = self._encrypt_digits(digits, f"ssn:{digits}")
        return f"{encrypted[:3]}-{encrypted[3:5]}-{encrypted[5:9]}"

    def _encrypt_digits(self, digits: str, context: str) -> str:
        """Encrypt digit string to same-length digit string."""
        # Use HMAC to derive a deterministic numeric mapping
        import hmac, hashlib
        mac = hmac.new(self.key, context.encode(), hashlib.sha256).digest()
        result = []
        for i, d in enumerate(digits):
            offset = mac[i % len(mac)] % 10
            result.append(str((int(d) + offset) % 10))
        return "".join(result)

    def _luhn_check_digit(self, partial: str) -> int:
        """Calculate Luhn check digit for partial card number."""
        digits = [int(d) for d in partial]
        odd_digits = digits[-1::-2]
        even_digits = digits[-2::-2]
        total = sum(odd_digits)
        for d in even_digits:
            total += sum(divmod(d * 2, 10))
        return (10 - total % 10) % 10

    def _format_card(self, digits: str, original: str) -> str:
        """Reapply original formatting (dashes, spaces)."""
        result = []
        digit_idx = 0
        for ch in original:
            if ch.isdigit():
                result.append(digits[digit_idx])
                digit_idx += 1
            else:
                result.append(ch)
        return "".join(result)

fpe = FormatPreservingMasker(key=b"sixteen-byte-key")
print(fpe.mask_credit_card("4532-8721-9045-6389"))  # 4532-XXXX-XXXX-6389 format
print(fpe.mask_ssn("123-45-6789"))                   # XXX-XX-XXXX format

For production use, consider the FF1 or FF3-1 algorithms from NIST SP 800-38G, available in libraries like pyffx or mysto.

Automated PII detection with regex and NLP

Before masking, you need to find sensitive data. Combining pattern matching with named entity recognition covers most cases:

import re
from dataclasses import dataclass
from typing import Optional

@dataclass
class PIIDetection:
    column: str
    pii_type: str
    confidence: float
    sample_match: Optional[str] = None

class PIIDetector:
    """Scan DataFrame columns for personally identifiable information."""

    PATTERNS = {
        "email": re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}"),
        "phone_us": re.compile(r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b"),
        "ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
        "credit_card": re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"),
        "ip_address": re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"),
        "date_of_birth": re.compile(r"\b(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])/\d{4}\b"),
    }

    NAME_INDICATORS = {"name", "first", "last", "surname", "fullname", "customer_name",
                       "patient", "employee", "contact", "person"}
    ADDRESS_INDICATORS = {"address", "street", "addr", "city", "zip", "postal"}

    def scan_dataframe(self, df, sample_size: int = 1000):
        """Scan all columns and return PII detections."""
        detections = []
        sample = df.head(sample_size)

        for col in df.columns:
            # Check column name heuristics
            col_lower = col.lower().replace("_", " ").replace("-", " ")
            col_tokens = set(col_lower.split())

            if col_tokens & self.NAME_INDICATORS:
                detections.append(PIIDetection(col, "person_name", 0.8))
                continue

            if col_tokens & self.ADDRESS_INDICATORS:
                detections.append(PIIDetection(col, "address", 0.8))
                continue

            # Check value patterns
            if sample[col].dtype == object:
                values = sample[col].dropna().astype(str)
                for pii_type, pattern in self.PATTERNS.items():
                    matches = values.apply(lambda v: bool(pattern.search(v)))
                    match_rate = matches.mean()
                    if match_rate > 0.3:
                        sample_match = values[matches].iloc[0] if matches.any() else None
                        detections.append(PIIDetection(col, pii_type, match_rate, sample_match))
                        break

        return detections

Full database masking pipeline

Combining detection, configuration, and execution into an automated pipeline:

import pandas as pd
from sqlalchemy import create_engine, inspect, text
from typing import Dict, List

class DatabaseMaskingPipeline:
    """End-to-end database masking with referential integrity."""

    def __init__(self, source_uri: str, target_uri: str, masking_key: str):
        self.source = create_engine(source_uri)
        self.target = create_engine(target_uri)
        self.masker = DeterministicMasker(masking_key)
        self.detector = PIIDetector()

    def discover_schema(self) -> Dict[str, List[str]]:
        """Map all tables and their columns."""
        inspector = inspect(self.source)
        schema = {}
        for table in inspector.get_table_names():
            columns = [col["name"] for col in inspector.get_columns(table)]
            schema[table] = columns
        return schema

    def create_masking_plan(self) -> Dict[str, Dict[str, str]]:
        """Auto-detect PII and generate masking rules."""
        plan = {}
        schema = self.discover_schema()

        for table, columns in schema.items():
            df_sample = pd.read_sql(f"SELECT * FROM {table} LIMIT 1000", self.source)
            detections = self.detector.scan_dataframe(df_sample)

            if detections:
                plan[table] = {}
                for det in detections:
                    masking_fn = self._select_masking_function(det.pii_type)
                    plan[table][det.column] = masking_fn

        return plan

    def _select_masking_function(self, pii_type: str) -> str:
        """Map PII type to masking strategy."""
        mapping = {
            "person_name": "mask_name",
            "email": "mask_email",
            "phone_us": "mask_phone",
            "ssn": "mask_ssn",
            "address": "mask_address",
            "credit_card": "mask_credit_card",
            "ip_address": "mask_ip",
            "date_of_birth": "mask_date",
        }
        return mapping.get(pii_type, "redact")

    def execute(self, plan: Dict[str, Dict[str, str]], batch_size: int = 10000):
        """Execute masking plan, writing masked data to target database."""
        schema = self.discover_schema()

        for table in schema:
            print(f"Processing {table}...")
            for chunk in pd.read_sql(f"SELECT * FROM {table}", self.source,
                                     chunksize=batch_size):
                if table in plan:
                    for column, fn_name in plan[table].items():
                        mask_fn = getattr(self.masker, fn_name, self._redact)
                        chunk[column] = chunk[column].apply(
                            lambda v: mask_fn(str(v)) if pd.notna(v) else v
                        )

                chunk.to_sql(table, self.target, if_exists="append", index=False)

            print(f"  ✓ {table} masked and written")

    def _redact(self, value: str) -> str:
        return "***REDACTED***"

# Usage
pipeline = DatabaseMaskingPipeline(
    source_uri="postgresql://prod-readonly:pass@prod-host/app",
    target_uri="postgresql://dev:pass@dev-host/app_masked",
    masking_key="rotate-this-key-quarterly"
)
plan = pipeline.create_masking_plan()
print("Auto-detected masking plan:", plan)
pipeline.execute(plan)

Masking JSON and unstructured data

Production data isn’t only in databases. Log files, JSON APIs, and document stores need masking too:

import json
import re
from typing import Any

class JSONMasker:
    """Recursively mask PII in nested JSON structures."""

    SENSITIVE_KEYS = {"name", "email", "phone", "ssn", "address", "dob",
                      "credit_card", "password", "token", "secret"}

    def __init__(self, masker: DeterministicMasker):
        self.masker = masker

    def mask_document(self, doc: Any) -> Any:
        """Recursively traverse and mask sensitive fields."""
        if isinstance(doc, dict):
            return {k: self._mask_field(k, v) for k, v in doc.items()}
        elif isinstance(doc, list):
            return [self.mask_document(item) for item in doc]
        return doc

    def _mask_field(self, key: str, value: Any) -> Any:
        if isinstance(value, (dict, list)):
            return self.mask_document(value)

        key_lower = key.lower().replace("_", "").replace("-", "")

        if not isinstance(value, str):
            return value

        if "email" in key_lower:
            return self.masker.mask_email(value)
        elif "name" in key_lower:
            return self.masker.mask_name(value)
        elif "phone" in key_lower:
            return self.masker.mask_phone(value)
        elif any(s in key_lower for s in ("password", "secret", "token")):
            return "***REDACTED***"

        # Check value patterns even if key isn't indicative
        if re.match(r"[^@]+@[^@]+\.[^@]+", value):
            return self.masker.mask_email(value)

        return value

# Mask a JSON API response
raw = {"user": {"name": "Alice Johnson", "email": "alice@example.com",
                "orders": [{"id": 1, "shipping_address": "123 Main St"}]}}
masked = JSONMasker(masker).mask_document(raw)

Validation and testing masked output

After masking, verify the output meets both privacy and utility requirements:

def validate_masking(original_df, masked_df, plan):
    """Verify masking quality."""
    results = {"passed": [], "failed": []}

    for column, strategy in plan.items():
        # Check no original values leaked
        original_values = set(original_df[column].dropna().astype(str))
        masked_values = set(masked_df[column].dropna().astype(str))
        leaked = original_values & masked_values

        if leaked:
            results["failed"].append(f"{column}: {len(leaked)} values leaked through masking")
        else:
            results["passed"].append(f"{column}: no value leakage")

        # Check format preservation
        if strategy == "mask_credit_card":
            valid = masked_df[column].dropna().apply(
                lambda v: bool(re.match(r"\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}", str(v)))
            )
            if not valid.all():
                results["failed"].append(f"{column}: {(~valid).sum()} values broke CC format")

        # Check statistical preservation (for numeric variance masking)
        if original_df[column].dtype in ("float64", "int64"):
            orig_mean = original_df[column].mean()
            masked_mean = masked_df[column].mean()
            drift = abs(orig_mean - masked_mean) / orig_mean if orig_mean else 0
            if drift > 0.1:
                results["failed"].append(f"{column}: mean drifted {drift:.1%}")

    return results

The one thing to remember: Production data masking in Python requires three layers — automated PII detection to find sensitive fields, deterministic masking functions (HMAC-seeded Faker or format-preserving encryption) for consistent replacement across related tables, and validation to confirm both privacy protection and data utility after masking.

pythonprivacydata-maskingdata-protection

See Also