Data Sanitization in Python — Deep Dive

The sanitization pipeline

Production applications process untrusted data through multiple stages. Each stage addresses a different class of risk:

Raw Input → Decode → Normalize → Validate → Context-Sanitize → Store/Render

Decode: Ensure the input is valid UTF-8. Reject or replace invalid byte sequences. Python 3 strings are Unicode by default, but data from network sockets or file uploads may contain invalid sequences.

Normalize: Apply Unicode NFC normalization. Without normalization, visually identical strings (like “café” composed with é vs e + combining accent) compare as different, which breaks deduplication and can bypass validation rules.

Validate: Check data types, lengths, formats, and business rules. Reject data that doesn’t meet expectations.

Context-Sanitize: Escape or transform data for its specific output context (HTML, SQL, URL, JSON, shell).

Unicode normalization attacks

Unicode has multiple representations for the same visual character. Attackers exploit this:

import unicodedata

# These look identical but are different bytes
s1 = "café"                              # é as single codepoint U+00E9
s2 = "cafe\u0301"                        # e + combining acute accent U+0301
print(s1 == s2)                          # False
print(unicodedata.normalize("NFC", s1) ==
      unicodedata.normalize("NFC", s2))  # True

# Homoglyph attack: Cyrillic 'а' (U+0430) looks like Latin 'a' (U+0061)
admin_fake = "\u0430dmin"  # Cyrillic а
admin_real = "admin"       # Latin a
print(admin_fake == admin_real)  # False, but they look identical

Defense: normalize to NFC early, and for security-sensitive comparisons (usernames, slugs), restrict to ASCII or use a confusable-character detection library.

def normalize_username(raw: str) -> str:
    normalized = unicodedata.normalize("NFC", raw.strip().lower())
    # Reject non-ASCII characters in usernames
    if not normalized.isascii():
        raise ValueError("Username must contain only ASCII characters")
    if not normalized.isalnum():
        raise ValueError("Username must be alphanumeric")
    return normalized

HTML sanitization with nh3

The nh3 library (Rust-based, fast) replaces the deprecated bleach for HTML sanitization:

import nh3

# Allow only safe formatting tags
def sanitize_user_html(raw_html: str) -> str:
    return nh3.clean(
        raw_html,
        tags={"p", "br", "b", "i", "em", "strong", "a", "ul", "ol", "li",
              "blockquote", "code", "pre"},
        attributes={
            "a": {"href", "title"},
        },
        url_schemes={"http", "https", "mailto"},
        link_rel="noopener noreferrer nofollow",
        strip_comments=True,
    )

# Example
dirty = '<p>Hello <script>alert("xss")</script> <a href="javascript:void(0)">click</a></p>'
clean = sanitize_user_html(dirty)
# Result: '<p>Hello  <a rel="noopener noreferrer nofollow">click</a></p>'

Key decisions when configuring HTML sanitization:

  • Allowlist, not denylist. Only permit tags you explicitly want. New HTML elements and attributes appear regularly; a denylist will miss future attack vectors.
  • Sanitize URL schemes in links. Allowing javascript: URLs defeats the purpose of tag sanitization.
  • Add rel="nofollow noopener" to links. Prevents SEO manipulation and window.opener attacks.

Pydantic validators for structured sanitization

Pydantic handles type validation. Add sanitization with custom validators:

from pydantic import BaseModel, field_validator, Field
import re
import nh3
import unicodedata

class UserComment(BaseModel):
    author: str = Field(min_length=1, max_length=100)
    body: str = Field(min_length=1, max_length=10000)
    email: str

    @field_validator("author")
    @classmethod
    def sanitize_author(cls, v: str) -> str:
        v = unicodedata.normalize("NFC", v.strip())
        v = re.sub(r"[<>&\"']", "", v)  # Strip HTML-significant chars
        v = " ".join(v.split())  # Collapse whitespace
        if not v:
            raise ValueError("Author name cannot be empty after sanitization")
        return v

    @field_validator("body")
    @classmethod
    def sanitize_body(cls, v: str) -> str:
        v = unicodedata.normalize("NFC", v.strip())
        return nh3.clean(v, tags={"p", "b", "i", "em", "strong", "br"})

    @field_validator("email")
    @classmethod
    def normalize_email(cls, v: str) -> str:
        v = v.strip().lower()
        if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", v):
            raise ValueError("Invalid email format")
        return v

This model validates structure and sanitizes content in one pass. The API endpoint receives a clean, safe object.

File upload sanitization

Uploaded files are particularly dangerous because they can carry malware, path traversal attacks, and deceptive content types:

import hashlib
import magic
from pathlib import Path

ALLOWED_MIME_TYPES = {
    "image/jpeg", "image/png", "image/gif", "image/webp",
    "application/pdf",
    "text/plain", "text/csv",
}

MAX_FILE_SIZE = 10 * 1024 * 1024  # 10 MB

def sanitize_upload(filename: str, content: bytes) -> tuple[str, bytes]:
    # 1. Check file size
    if len(content) > MAX_FILE_SIZE:
        raise ValueError(f"File too large: {len(content)} bytes")

    # 2. Detect actual content type (don't trust the Content-Type header)
    detected_mime = magic.from_buffer(content, mime=True)
    if detected_mime not in ALLOWED_MIME_TYPES:
        raise ValueError(f"File type not allowed: {detected_mime}")

    # 3. Sanitize filename
    safe_name = sanitize_filename(filename)

    # 4. Verify extension matches detected type
    ext = Path(safe_name).suffix.lower()
    expected_exts = {
        "image/jpeg": {".jpg", ".jpeg"},
        "image/png": {".png"},
        "image/gif": {".gif"},
        "application/pdf": {".pdf"},
    }
    if detected_mime in expected_exts and ext not in expected_exts[detected_mime]:
        raise ValueError(f"Extension {ext} doesn't match content type {detected_mime}")

    return safe_name, content


def sanitize_filename(name: str) -> str:
    """Create a safe filename from user input."""
    # Strip directory components
    name = Path(name).name

    # Remove null bytes
    name = name.replace("\x00", "")

    # Keep only safe characters
    stem = Path(name).stem
    suffix = Path(name).suffix

    stem = re.sub(r"[^\w\-]", "_", stem)[:100]
    suffix = suffix.lower()[:10]

    if not stem:
        stem = hashlib.md5(name.encode()).hexdigest()[:8]

    return f"{stem}{suffix}"

SQL context: parameterized queries

Never interpolate user data into SQL strings. Always use parameterized queries:

# DANGEROUS — SQL injection possible
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")

# SAFE — parameterized query
cursor.execute("SELECT * FROM users WHERE name = %s", (user_input,))

# SQLAlchemy ORM — safe by default
user = session.query(User).filter(User.name == user_input).first()

# SQLAlchemy Core — also safe
stmt = select(users).where(users.c.name == user_input)

Parameterized queries handle escaping at the database driver level. The SQL structure and the data are sent separately, making injection structurally impossible.

Shell context sanitization

The safest approach is to never pass user input to shell commands:

import subprocess
import shlex

user_filename = "report.pdf; rm -rf /"

# DANGEROUS — shell injection
subprocess.run(f"wc -l {user_filename}", shell=True)

# SAFE — argument list, no shell interpretation
subprocess.run(["wc", "-l", user_filename], shell=False)

# If shell=True is absolutely required, escape properly
subprocess.run(f"wc -l {shlex.quote(user_filename)}", shell=True)

Using shell=False with a list of arguments is the primary defense. The user input becomes a single argument to wc, not a shell command to interpret.

Building a sanitization middleware

For APIs that process JSON request bodies, a middleware can apply baseline sanitization before handlers run:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import json
import unicodedata

class SanitizationMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        if request.method in ("POST", "PUT", "PATCH"):
            content_type = request.headers.get("content-type", "")
            if "application/json" in content_type:
                body = await request.body()
                try:
                    data = json.loads(body)
                    sanitized = self._sanitize_recursive(data)
                    # Replace request body with sanitized data
                    request._body = json.dumps(sanitized).encode()
                except json.JSONDecodeError:
                    pass  # Let the framework handle invalid JSON
        return await call_next(request)

    def _sanitize_recursive(self, obj):
        if isinstance(obj, str):
            obj = unicodedata.normalize("NFC", obj)
            obj = obj.replace("\x00", "")  # Strip null bytes
            return obj.strip()
        elif isinstance(obj, dict):
            return {k: self._sanitize_recursive(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._sanitize_recursive(item) for item in obj]
        return obj

This middleware normalizes Unicode and strips null bytes from all string values in JSON payloads. Context-specific sanitization (HTML escaping, etc.) still happens at the point of use.

Testing sanitization

import pytest

class TestSanitization:
    def test_strips_script_tags(self):
        result = sanitize_user_html('<p>Hello <script>alert(1)</script></p>')
        assert "<script>" not in result
        assert "Hello" in result

    def test_preserves_safe_html(self):
        result = sanitize_user_html("<p><strong>Bold</strong> text</p>")
        assert "<strong>Bold</strong>" in result

    def test_sanitizes_javascript_urls(self):
        result = sanitize_user_html('<a href="javascript:alert(1)">click</a>')
        assert "javascript:" not in result

    def test_unicode_normalization(self):
        assert normalize_text("cafe\u0301") == normalize_text("café")

    def test_filename_path_traversal(self):
        assert sanitize_filename("../../../etc/passwd") == "etc_passwd"
        assert "/" not in sanitize_filename("dir/file.txt")

    def test_null_byte_removal(self):
        assert sanitize_filename("file\x00.txt.exe") == "file_txt.exe"

The one thing to remember: Data sanitization in Python must be context-aware (HTML, SQL, shell, URL each need different treatment), applied at the point of output rather than only at input, and layered with validation, Unicode normalization, and framework-level auto-escaping for defense in depth.

pythonsecuritywebdata

See Also