Tokenization of Sensitive Data in Python — Deep Dive
Building a token vault from scratch
A minimal vault needs: encrypted storage, collision-free token generation, access-controlled detokenization, and audit logging.
import secrets
import hashlib
import hmac
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, String, DateTime, Boolean, Text
from sqlalchemy.orm import declarative_base, Session
from cryptography.fernet import Fernet
Base = declarative_base()
class TokenMapping(Base):
__tablename__ = "token_mappings"
token = Column(String(64), primary_key=True)
encrypted_value = Column(Text, nullable=False)
data_type = Column(String(32), nullable=False) # "credit_card", "ssn", etc.
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime, nullable=True)
is_active = Column(Boolean, default=True)
class AuditLog(Base):
__tablename__ = "token_audit_log"
id = Column(String(36), primary_key=True, default=lambda: secrets.token_hex(18))
token = Column(String(64), nullable=False)
action = Column(String(16), nullable=False) # "tokenize", "detokenize", "rotate"
requester = Column(String(128), nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
ip_address = Column(String(45), nullable=True)
class TokenVault:
"""Production-grade token vault with encryption at rest and audit logging."""
def __init__(self, database_url: str, encryption_key: bytes):
self.engine = create_engine(database_url)
Base.metadata.create_all(self.engine)
self.cipher = Fernet(encryption_key)
def tokenize(self, sensitive_value: str, data_type: str,
requester: str, ttl_days: int = None) -> str:
"""Replace sensitive data with a token."""
with Session(self.engine) as session:
# Check if this value is already tokenized (idempotent)
existing = self._find_existing_token(session, sensitive_value, data_type)
if existing:
self._audit(session, existing, "tokenize_existing", requester)
return existing
# Generate collision-free token
token = self._generate_token(session, data_type)
# Encrypt the sensitive value before storing
encrypted = self.cipher.encrypt(sensitive_value.encode()).decode()
expires_at = None
if ttl_days:
expires_at = datetime.utcnow() + timedelta(days=ttl_days)
mapping = TokenMapping(
token=token,
encrypted_value=encrypted,
data_type=data_type,
expires_at=expires_at
)
session.add(mapping)
self._audit(session, token, "tokenize", requester)
session.commit()
return token
def detokenize(self, token: str, requester: str) -> str:
"""Retrieve original value from token. Raises if expired or missing."""
with Session(self.engine) as session:
mapping = session.get(TokenMapping, token)
if not mapping or not mapping.is_active:
self._audit(session, token, "detokenize_failed", requester)
session.commit()
raise ValueError(f"Token not found or inactive: {token[:8]}...")
if mapping.expires_at and mapping.expires_at < datetime.utcnow():
self._audit(session, token, "detokenize_expired", requester)
session.commit()
raise ValueError(f"Token expired: {token[:8]}...")
self._audit(session, token, "detokenize", requester)
session.commit()
return self.cipher.decrypt(mapping.encrypted_value.encode()).decode()
def rotate_token(self, old_token: str, requester: str) -> str:
"""Replace a token with a new one. Old token becomes inactive."""
with Session(self.engine) as session:
mapping = session.get(TokenMapping, old_token)
if not mapping or not mapping.is_active:
raise ValueError("Cannot rotate inactive or missing token")
# Create new token pointing to same encrypted value
new_token = self._generate_token(session, mapping.data_type)
new_mapping = TokenMapping(
token=new_token,
encrypted_value=mapping.encrypted_value,
data_type=mapping.data_type,
expires_at=mapping.expires_at
)
mapping.is_active = False
session.add(new_mapping)
self._audit(session, old_token, "rotate_old", requester)
self._audit(session, new_token, "rotate_new", requester)
session.commit()
return new_token
def delete(self, token: str, requester: str):
"""GDPR erasure — permanently destroy the mapping."""
with Session(self.engine) as session:
mapping = session.get(TokenMapping, token)
if mapping:
session.delete(mapping)
self._audit(session, token, "delete", requester)
session.commit()
def _generate_token(self, session: Session, data_type: str) -> str:
"""Generate a unique token with type prefix."""
prefix = {"credit_card": "cc", "ssn": "ssn", "pii": "pii"}.get(data_type, "tok")
for _ in range(10): # Retry on collision
token = f"{prefix}_{secrets.token_urlsafe(32)}"
if not session.get(TokenMapping, token):
return token
raise RuntimeError("Token generation failed — too many collisions")
def _find_existing_token(self, session, value: str, data_type: str) -> str:
"""Check if value already tokenized (for idempotency)."""
# Use HMAC fingerprint to find without decrypting everything
fingerprint = hmac.new(b"search-key", value.encode(), hashlib.sha256).hexdigest()
# In production, store fingerprint column for O(1) lookup
# Simplified: scan active tokens of same type
return None # Simplified; production uses fingerprint index
def _audit(self, session, token, action, requester):
session.add(AuditLog(token=token, action=action, requester=requester))
Format-preserving token generation
When tokens must pass format validation in legacy systems:
import re
class FormatPreservingTokenGenerator:
"""Generate tokens that match the format of the original data."""
def generate_credit_card_token(self, original: str, preserve_last4: bool = True) -> str:
"""16-digit token that passes Luhn validation."""
digits = original.replace("-", "").replace(" ", "")
suffix = digits[-4:] if preserve_last4 else self._random_digits(4)
# Generate random middle, fix Luhn
prefix = "0000" # Non-routable BIN to distinguish tokens from real cards
middle = self._random_digits(7)
partial = prefix + middle + suffix
check = self._luhn_check_digit(partial[:15])
return partial[:15] + str(check)
def generate_ssn_token(self) -> str:
"""9-digit token in SSN format, using invalid area numbers."""
# SSNs starting with 9xx are reserved/invalid — safe for tokens
area = f"9{self._random_digits(2)}"
group = self._random_digits(2)
serial = self._random_digits(4)
return f"{area}-{group}-{serial}"
def generate_phone_token(self, country: str = "US") -> str:
"""Phone-formatted token using reserved/test ranges."""
if country == "US":
# 555-01xx range is reserved for fiction
return f"555-01{self._random_digits(2)}-{self._random_digits(4)}"
return self._random_digits(10)
def _random_digits(self, n: int) -> str:
return "".join([str(secrets.randbelow(10)) for _ in range(n)])
def _luhn_check_digit(self, partial: str) -> int:
digits = [int(d) for d in partial]
odd = digits[-1::-2]
even = digits[-2::-2]
total = sum(odd) + sum(sum(divmod(d * 2, 10)) for d in even)
return (10 - total % 10) % 10
REST API for the token vault
Wrapping the vault in a FastAPI service with authentication and rate limiting:
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
import time
app = FastAPI(title="Token Vault API")
vault = TokenVault(
database_url="postgresql://vault:secret@localhost/tokens",
encryption_key=Fernet.generate_key()
)
# Simple API key auth (production: use mTLS + OAuth2)
VALID_API_KEYS = {"payment-service": "key-abc123", "billing": "key-def456"}
def verify_api_key(x_api_key: str = Header(...)):
for service, key in VALID_API_KEYS.items():
if hmac.compare_digest(key, x_api_key):
return service
raise HTTPException(status_code=401, detail="Invalid API key")
class TokenizeRequest(BaseModel):
value: str
data_type: str
ttl_days: int = None
class TokenResponse(BaseModel):
token: str
@app.post("/tokenize", response_model=TokenResponse)
def tokenize(req: TokenizeRequest, requester: str = Depends(verify_api_key)):
token = vault.tokenize(req.value, req.data_type, requester, req.ttl_days)
return TokenResponse(token=token)
@app.post("/detokenize")
def detokenize(token: str, requester: str = Depends(verify_api_key)):
try:
value = vault.detokenize(token, requester)
return {"value": value}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
@app.post("/rotate")
def rotate(token: str, requester: str = Depends(verify_api_key)):
new_token = vault.rotate_token(token, requester)
return {"old_token": token, "new_token": new_token}
@app.delete("/tokens/{token}")
def delete_token(token: str, requester: str = Depends(verify_api_key)):
vault.delete(token, requester)
return {"status": "deleted"}
Integration with Stripe tokenization
For payment tokenization, Stripe handles the vault so you don’t build one:
import stripe
stripe.api_key = "sk_live_..."
def store_card_as_token(card_number: str, exp_month: int,
exp_year: int, cvc: str) -> str:
"""Create a Stripe token (normally done client-side with Stripe.js)."""
token = stripe.Token.create(
card={
"number": card_number,
"exp_month": exp_month,
"exp_year": exp_year,
"cvc": cvc,
}
)
return token.id # tok_xxxxx — store this instead of the card number
def charge_with_token(token_id: str, amount_cents: int, currency: str = "usd"):
"""Charge using the token — real card number never touches your server."""
charge = stripe.Charge.create(
amount=amount_cents,
currency=currency,
source=token_id,
description="Subscription payment"
)
return charge.id
def create_reusable_customer_token(token_id: str, email: str) -> str:
"""Convert single-use token to reusable customer for subscriptions."""
customer = stripe.Customer.create(
email=email,
source=token_id # Attaches the card as a reusable payment method
)
return customer.id # cus_xxxxx — use for recurring charges
In the Stripe model, sensitive card data flows directly from the customer’s browser to Stripe (via Stripe.js or Elements). Your server never sees the raw card number — only tokens.
Monitoring and anomaly detection
Detecting misuse of the token vault:
from collections import defaultdict
from datetime import datetime, timedelta
class TokenVaultMonitor:
"""Detect unusual detokenization patterns."""
def __init__(self, alert_threshold: int = 100, window_minutes: int = 5):
self.threshold = alert_threshold
self.window = timedelta(minutes=window_minutes)
self.requests = defaultdict(list)
def record_detokenization(self, requester: str, token: str):
now = datetime.utcnow()
self.requests[requester].append(now)
# Clean old entries
cutoff = now - self.window
self.requests[requester] = [
t for t in self.requests[requester] if t > cutoff
]
# Check for bulk detokenization (potential breach)
if len(self.requests[requester]) > self.threshold:
self._alert(
f"ALERT: {requester} made {len(self.requests[requester])} "
f"detokenization requests in {self.window.seconds // 60} minutes"
)
return True
return False
def _alert(self, message: str):
# Send to PagerDuty, Slack, etc.
print(f"🚨 {message}")
The one thing to remember: A production tokenization system in Python combines a vault (SQLAlchemy + Fernet encryption at rest) with format-preserving token generation, audit logging for compliance, and anomaly detection on detokenization patterns — or delegates to specialized services like Stripe when the sensitive data is payment cards.
See Also
- Python Certificate Management How websites prove they are who they say they are — like a digital passport checked every time you visit
- Python Data Masking Techniques How companies hide real names, emails, and credit card numbers while keeping data useful for testing and analytics
- Python Homomorphic Encryption How you can do math on locked data without ever unlocking it — like solving a puzzle inside a sealed box
- Python Key Management Practices Why the key to your encryption is more important than the encryption itself — and how to keep it safe
- Python Secure Multiparty Computation How a group of friends can figure out who earns the most without anyone revealing their actual salary