Tokenization of Sensitive Data in Python — Deep Dive

Building a token vault from scratch

A minimal vault needs: encrypted storage, collision-free token generation, access-controlled detokenization, and audit logging.

import secrets
import hashlib
import hmac
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, String, DateTime, Boolean, Text
from sqlalchemy.orm import declarative_base, Session
from cryptography.fernet import Fernet

Base = declarative_base()

class TokenMapping(Base):
    __tablename__ = "token_mappings"
    token = Column(String(64), primary_key=True)
    encrypted_value = Column(Text, nullable=False)
    data_type = Column(String(32), nullable=False)  # "credit_card", "ssn", etc.
    created_at = Column(DateTime, default=datetime.utcnow)
    expires_at = Column(DateTime, nullable=True)
    is_active = Column(Boolean, default=True)

class AuditLog(Base):
    __tablename__ = "token_audit_log"
    id = Column(String(36), primary_key=True, default=lambda: secrets.token_hex(18))
    token = Column(String(64), nullable=False)
    action = Column(String(16), nullable=False)  # "tokenize", "detokenize", "rotate"
    requester = Column(String(128), nullable=False)
    timestamp = Column(DateTime, default=datetime.utcnow)
    ip_address = Column(String(45), nullable=True)

class TokenVault:
    """Production-grade token vault with encryption at rest and audit logging."""

    def __init__(self, database_url: str, encryption_key: bytes):
        self.engine = create_engine(database_url)
        Base.metadata.create_all(self.engine)
        self.cipher = Fernet(encryption_key)

    def tokenize(self, sensitive_value: str, data_type: str,
                 requester: str, ttl_days: int = None) -> str:
        """Replace sensitive data with a token."""
        with Session(self.engine) as session:
            # Check if this value is already tokenized (idempotent)
            existing = self._find_existing_token(session, sensitive_value, data_type)
            if existing:
                self._audit(session, existing, "tokenize_existing", requester)
                return existing

            # Generate collision-free token
            token = self._generate_token(session, data_type)

            # Encrypt the sensitive value before storing
            encrypted = self.cipher.encrypt(sensitive_value.encode()).decode()

            expires_at = None
            if ttl_days:
                expires_at = datetime.utcnow() + timedelta(days=ttl_days)

            mapping = TokenMapping(
                token=token,
                encrypted_value=encrypted,
                data_type=data_type,
                expires_at=expires_at
            )
            session.add(mapping)
            self._audit(session, token, "tokenize", requester)
            session.commit()

            return token

    def detokenize(self, token: str, requester: str) -> str:
        """Retrieve original value from token. Raises if expired or missing."""
        with Session(self.engine) as session:
            mapping = session.get(TokenMapping, token)

            if not mapping or not mapping.is_active:
                self._audit(session, token, "detokenize_failed", requester)
                session.commit()
                raise ValueError(f"Token not found or inactive: {token[:8]}...")

            if mapping.expires_at and mapping.expires_at < datetime.utcnow():
                self._audit(session, token, "detokenize_expired", requester)
                session.commit()
                raise ValueError(f"Token expired: {token[:8]}...")

            self._audit(session, token, "detokenize", requester)
            session.commit()

            return self.cipher.decrypt(mapping.encrypted_value.encode()).decode()

    def rotate_token(self, old_token: str, requester: str) -> str:
        """Replace a token with a new one. Old token becomes inactive."""
        with Session(self.engine) as session:
            mapping = session.get(TokenMapping, old_token)
            if not mapping or not mapping.is_active:
                raise ValueError("Cannot rotate inactive or missing token")

            # Create new token pointing to same encrypted value
            new_token = self._generate_token(session, mapping.data_type)
            new_mapping = TokenMapping(
                token=new_token,
                encrypted_value=mapping.encrypted_value,
                data_type=mapping.data_type,
                expires_at=mapping.expires_at
            )

            mapping.is_active = False
            session.add(new_mapping)
            self._audit(session, old_token, "rotate_old", requester)
            self._audit(session, new_token, "rotate_new", requester)
            session.commit()

            return new_token

    def delete(self, token: str, requester: str):
        """GDPR erasure — permanently destroy the mapping."""
        with Session(self.engine) as session:
            mapping = session.get(TokenMapping, token)
            if mapping:
                session.delete(mapping)
                self._audit(session, token, "delete", requester)
                session.commit()

    def _generate_token(self, session: Session, data_type: str) -> str:
        """Generate a unique token with type prefix."""
        prefix = {"credit_card": "cc", "ssn": "ssn", "pii": "pii"}.get(data_type, "tok")
        for _ in range(10):  # Retry on collision
            token = f"{prefix}_{secrets.token_urlsafe(32)}"
            if not session.get(TokenMapping, token):
                return token
        raise RuntimeError("Token generation failed — too many collisions")

    def _find_existing_token(self, session, value: str, data_type: str) -> str:
        """Check if value already tokenized (for idempotency)."""
        # Use HMAC fingerprint to find without decrypting everything
        fingerprint = hmac.new(b"search-key", value.encode(), hashlib.sha256).hexdigest()
        # In production, store fingerprint column for O(1) lookup
        # Simplified: scan active tokens of same type
        return None  # Simplified; production uses fingerprint index

    def _audit(self, session, token, action, requester):
        session.add(AuditLog(token=token, action=action, requester=requester))

Format-preserving token generation

When tokens must pass format validation in legacy systems:

import re

class FormatPreservingTokenGenerator:
    """Generate tokens that match the format of the original data."""

    def generate_credit_card_token(self, original: str, preserve_last4: bool = True) -> str:
        """16-digit token that passes Luhn validation."""
        digits = original.replace("-", "").replace(" ", "")
        suffix = digits[-4:] if preserve_last4 else self._random_digits(4)

        # Generate random middle, fix Luhn
        prefix = "0000"  # Non-routable BIN to distinguish tokens from real cards
        middle = self._random_digits(7)
        partial = prefix + middle + suffix

        check = self._luhn_check_digit(partial[:15])
        return partial[:15] + str(check)

    def generate_ssn_token(self) -> str:
        """9-digit token in SSN format, using invalid area numbers."""
        # SSNs starting with 9xx are reserved/invalid — safe for tokens
        area = f"9{self._random_digits(2)}"
        group = self._random_digits(2)
        serial = self._random_digits(4)
        return f"{area}-{group}-{serial}"

    def generate_phone_token(self, country: str = "US") -> str:
        """Phone-formatted token using reserved/test ranges."""
        if country == "US":
            # 555-01xx range is reserved for fiction
            return f"555-01{self._random_digits(2)}-{self._random_digits(4)}"
        return self._random_digits(10)

    def _random_digits(self, n: int) -> str:
        return "".join([str(secrets.randbelow(10)) for _ in range(n)])

    def _luhn_check_digit(self, partial: str) -> int:
        digits = [int(d) for d in partial]
        odd = digits[-1::-2]
        even = digits[-2::-2]
        total = sum(odd) + sum(sum(divmod(d * 2, 10)) for d in even)
        return (10 - total % 10) % 10

REST API for the token vault

Wrapping the vault in a FastAPI service with authentication and rate limiting:

from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
import time

app = FastAPI(title="Token Vault API")

vault = TokenVault(
    database_url="postgresql://vault:secret@localhost/tokens",
    encryption_key=Fernet.generate_key()
)

# Simple API key auth (production: use mTLS + OAuth2)
VALID_API_KEYS = {"payment-service": "key-abc123", "billing": "key-def456"}

def verify_api_key(x_api_key: str = Header(...)):
    for service, key in VALID_API_KEYS.items():
        if hmac.compare_digest(key, x_api_key):
            return service
    raise HTTPException(status_code=401, detail="Invalid API key")

class TokenizeRequest(BaseModel):
    value: str
    data_type: str
    ttl_days: int = None

class TokenResponse(BaseModel):
    token: str

@app.post("/tokenize", response_model=TokenResponse)
def tokenize(req: TokenizeRequest, requester: str = Depends(verify_api_key)):
    token = vault.tokenize(req.value, req.data_type, requester, req.ttl_days)
    return TokenResponse(token=token)

@app.post("/detokenize")
def detokenize(token: str, requester: str = Depends(verify_api_key)):
    try:
        value = vault.detokenize(token, requester)
        return {"value": value}
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))

@app.post("/rotate")
def rotate(token: str, requester: str = Depends(verify_api_key)):
    new_token = vault.rotate_token(token, requester)
    return {"old_token": token, "new_token": new_token}

@app.delete("/tokens/{token}")
def delete_token(token: str, requester: str = Depends(verify_api_key)):
    vault.delete(token, requester)
    return {"status": "deleted"}

Integration with Stripe tokenization

For payment tokenization, Stripe handles the vault so you don’t build one:

import stripe

stripe.api_key = "sk_live_..."

def store_card_as_token(card_number: str, exp_month: int,
                        exp_year: int, cvc: str) -> str:
    """Create a Stripe token (normally done client-side with Stripe.js)."""
    token = stripe.Token.create(
        card={
            "number": card_number,
            "exp_month": exp_month,
            "exp_year": exp_year,
            "cvc": cvc,
        }
    )
    return token.id  # tok_xxxxx — store this instead of the card number

def charge_with_token(token_id: str, amount_cents: int, currency: str = "usd"):
    """Charge using the token — real card number never touches your server."""
    charge = stripe.Charge.create(
        amount=amount_cents,
        currency=currency,
        source=token_id,
        description="Subscription payment"
    )
    return charge.id

def create_reusable_customer_token(token_id: str, email: str) -> str:
    """Convert single-use token to reusable customer for subscriptions."""
    customer = stripe.Customer.create(
        email=email,
        source=token_id  # Attaches the card as a reusable payment method
    )
    return customer.id  # cus_xxxxx — use for recurring charges

In the Stripe model, sensitive card data flows directly from the customer’s browser to Stripe (via Stripe.js or Elements). Your server never sees the raw card number — only tokens.

Monitoring and anomaly detection

Detecting misuse of the token vault:

from collections import defaultdict
from datetime import datetime, timedelta

class TokenVaultMonitor:
    """Detect unusual detokenization patterns."""

    def __init__(self, alert_threshold: int = 100, window_minutes: int = 5):
        self.threshold = alert_threshold
        self.window = timedelta(minutes=window_minutes)
        self.requests = defaultdict(list)

    def record_detokenization(self, requester: str, token: str):
        now = datetime.utcnow()
        self.requests[requester].append(now)

        # Clean old entries
        cutoff = now - self.window
        self.requests[requester] = [
            t for t in self.requests[requester] if t > cutoff
        ]

        # Check for bulk detokenization (potential breach)
        if len(self.requests[requester]) > self.threshold:
            self._alert(
                f"ALERT: {requester} made {len(self.requests[requester])} "
                f"detokenization requests in {self.window.seconds // 60} minutes"
            )
            return True
        return False

    def _alert(self, message: str):
        # Send to PagerDuty, Slack, etc.
        print(f"🚨 {message}")

The one thing to remember: A production tokenization system in Python combines a vault (SQLAlchemy + Fernet encryption at rest) with format-preserving token generation, audit logging for compliance, and anomaly detection on detokenization patterns — or delegates to specialized services like Stripe when the sensitive data is payment cards.

pythonsecuritytokenizationdata-protection

See Also