Python Secure Coding Practices — Deep Dive

Defensive Input Validation

Multi-Layer Validation with Pydantic

from pydantic import BaseModel, Field, field_validator, EmailStr
from typing import Annotated
import re

class CreateUserRequest(BaseModel):
    email: EmailStr
    username: Annotated[str, Field(min_length=3, max_length=30)]
    age: Annotated[int, Field(ge=13, le=150)]
    bio: Annotated[str, Field(max_length=500)] = ""
    
    @field_validator("username")
    @classmethod
    def username_alphanumeric(cls, v: str) -> str:
        if not re.match(r"^[a-zA-Z0-9_-]+$", v):
            raise ValueError("Username must be alphanumeric")
        return v.lower()
    
    @field_validator("bio")
    @classmethod
    def sanitize_bio(cls, v: str) -> str:
        # Strip null bytes and control characters
        return re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", v)

Pydantic validates at the API boundary, but critical business rules should be enforced at the domain layer too. Defense in depth means a validation bypass at one layer doesn’t compromise the system.

Path Traversal Prevention

from pathlib import Path

UPLOAD_DIR = Path("/var/app/uploads").resolve()

def safe_file_path(user_filename: str) -> Path:
    """Resolve a user-supplied filename safely."""
    # Remove any path components
    clean_name = Path(user_filename).name
    
    # Additional sanitization
    clean_name = re.sub(r"[^\w\-.]", "_", clean_name)
    
    full_path = (UPLOAD_DIR / clean_name).resolve()
    
    # Verify the resolved path is inside the upload directory
    if not str(full_path).startswith(str(UPLOAD_DIR)):
        raise ValueError("Path traversal attempt detected")
    
    return full_path

# Attacker input: "../../etc/passwd" → resolves to /var/app/uploads/passwd
# Attacker input: "../../../etc/shadow" → raises ValueError

The .resolve() call resolves symlinks and .. components, and the prefix check ensures the final path stays within the allowed directory regardless of the input.

Integer Overflow and Boundary Attacks

def transfer_funds(amount: int, from_account: str, to_account: str):
    """Transfer with boundary validation."""
    if not isinstance(amount, int):
        raise TypeError("Amount must be integer (cents)")
    if amount <= 0:
        raise ValueError("Amount must be positive")
    if amount > 100_000_000:  # $1M cap per transaction
        raise ValueError("Amount exceeds transaction limit")
    
    # Python integers don't overflow, but downstream systems might
    # Databases, JSON serializers, and C extensions have limits
    if amount > 2**53 - 1:  # JavaScript safe integer limit
        raise ValueError("Amount exceeds safe integer range")

Python’s arbitrary-precision integers don’t overflow, but serialization boundaries do. JSON (JavaScript), PostgreSQL INTEGER (4 bytes), and C extensions all have fixed limits.

Secrets Management Architecture

Hierarchical Secret Loading

import os
from pathlib import Path
from typing import Optional

class SecretManager:
    """Load secrets with fallback chain: vault → env → file → error."""
    
    def __init__(self):
        self._cache: dict[str, str] = {}
    
    def get(self, key: str) -> str:
        if key in self._cache:
            return self._cache[key]
        
        value = (
            self._from_vault(key) or
            self._from_env(key) or
            self._from_file(key)
        )
        
        if value is None:
            raise RuntimeError(f"Secret '{key}' not found in any source")
        
        self._cache[key] = value
        return value
    
    def _from_vault(self, key: str) -> Optional[str]:
        """HashiCorp Vault or cloud secret manager."""
        # Implementation depends on provider
        return None
    
    def _from_env(self, key: str) -> Optional[str]:
        return os.environ.get(key)
    
    def _from_file(self, key: str) -> Optional[str]:
        """Docker/K8s secret files: /run/secrets/<key>."""
        secret_file = Path(f"/run/secrets/{key}")
        if secret_file.exists():
            return secret_file.read_text().strip()
        return None

secrets = SecretManager()
db_password = secrets.get("DATABASE_PASSWORD")

Secret Scanning in Pre-Commit

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/Yelp/detect-secrets
    rev: v1.4.0
    hooks:
      - id: detect-secrets
        args: ['--baseline', '.secrets.baseline']

detect-secrets uses entropy analysis and regex patterns to catch API keys, passwords, and tokens before they enter version control. The baseline file tracks known false positives.

Safe Deserialization

import json
import yaml

# DANGEROUS: yaml.load with Loader=None (default in old PyYAML)
# Can execute arbitrary Python code
data = yaml.safe_load(untrusted_yaml)  # ALWAYS use safe_load

# DANGEROUS: eval/exec on user input
result = eval(user_expression)  # Never do this

# SAFE alternative for math expressions
import ast
def safe_eval_number(expr: str) -> float:
    """Evaluate simple math expressions safely."""
    tree = ast.parse(expr, mode="eval")
    for node in ast.walk(tree):
        if not isinstance(node, (
            ast.Expression, ast.Constant, ast.UnaryOp,
            ast.BinOp, ast.USub, ast.UAdd,
            ast.Add, ast.Sub, ast.Mult, ast.Div
        )):
            raise ValueError(f"Unsafe expression: {expr}")
    return eval(compile(tree, "<expr>", "eval"))

safe_eval_number("2 + 3 * 4")  # 14
safe_eval_number("__import__('os').system('ls')")  # ValueError

Pickle Replacement Strategies

# Instead of pickle, use format-specific serializers:
import json      # Human-readable, no code execution
import msgpack   # Binary, fast, no code execution
import cbor2     # Binary, standards-based, no code execution

# For complex Python objects, serialize to dicts first:
from dataclasses import dataclass, asdict

@dataclass
class Config:
    host: str
    port: int
    debug: bool

config = Config("localhost", 8080, False)
serialized = json.dumps(asdict(config))
restored = Config(**json.loads(serialized))

Secure HTTP Client Configuration

import httpx

def create_secure_client() -> httpx.Client:
    return httpx.Client(
        timeout=httpx.Timeout(
            connect=5.0,      # Connection timeout
            read=30.0,        # Read timeout
            write=10.0,       # Write timeout
            pool=5.0,         # Pool timeout
        ),
        follow_redirects=False,  # Don't follow blindly
        max_redirects=3,         # If enabled, limit chain
        verify=True,             # TLS verification (default)
    )

# Always set timeouts — an infinite timeout is a denial-of-service vector
# Never disable TLS verification in production

Rate Limiting and Resource Controls

import time
from collections import defaultdict
from functools import wraps

class RateLimiter:
    """Token bucket rate limiter."""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests: dict[str, list[float]] = defaultdict(list)
    
    def is_allowed(self, key: str) -> bool:
        now = time.time()
        window_start = now - self.window
        
        # Clean old entries
        self.requests[key] = [
            t for t in self.requests[key] if t > window_start
        ]
        
        if len(self.requests[key]) >= self.max_requests:
            return False
        
        self.requests[key].append(now)
        return True

# Usage in FastAPI
limiter = RateLimiter(max_requests=100, window_seconds=60)

@app.middleware("http")
async def rate_limit_middleware(request, call_next):
    client_ip = request.client.host
    if not limiter.is_allowed(client_ip):
        return JSONResponse(
            status_code=429,
            content={"detail": "Too many requests"},
            headers={"Retry-After": "60"},
        )
    return await call_next(request)

Static Analysis Pipeline

# Comprehensive security linting
pip install bandit ruff mypy

# Bandit: Python-specific security scanner
bandit -r src/ -ll -ii  # Medium+ severity, medium+ confidence

# Ruff: Fast linter with security rules
ruff check src/ --select S  # S = security rules (bandit port)

# Mypy: Type checking catches class of bugs at compile time
mypy src/ --strict

Bandit Configuration

# pyproject.toml
[tool.bandit]
exclude_dirs = ["tests", "migrations"]
skips = ["B101"]  # Skip assert warnings (used in tests)

[tool.bandit.assert_used]
skips = ["*/test_*.py", "*_test.py"]

Bandit catches: hardcoded passwords (B105), SQL injection (B608), shell injection (B602), insecure temp files (B108), pickle usage (B301), and dozens more.

Secure Logging

import logging
import re

class SanitizingFormatter(logging.Formatter):
    """Redact sensitive data from log messages."""
    
    PATTERNS = [
        (re.compile(r"password['\"]?\s*[:=]\s*['\"]?([^'\";\s]+)"), 
         "password=***REDACTED***"),
        (re.compile(r"token['\"]?\s*[:=]\s*['\"]?([^'\";\s]+)"), 
         "token=***REDACTED***"),
        (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), 
         "***SSN***"),
        (re.compile(r"Bearer\s+\S+"), 
         "Bearer ***REDACTED***"),
    ]
    
    def format(self, record: logging.LogRecord) -> str:
        message = super().format(record)
        for pattern, replacement in self.PATTERNS:
            message = pattern.sub(replacement, message)
        return message

handler = logging.StreamHandler()
handler.setFormatter(SanitizingFormatter(
    "%(asctime)s %(levelname)s %(name)s %(message)s"
))
logger = logging.getLogger("app")
logger.addHandler(handler)

Security Testing Automation

# test_security.py — Security-specific test suite

import pytest
import re

class TestSecurityHeaders:
    def test_hsts_header(self, client):
        r = client.get("/")
        assert "strict-transport-security" in r.headers
    
    def test_content_type_options(self, client):
        r = client.get("/")
        assert r.headers.get("x-content-type-options") == "nosniff"
    
    def test_frame_options(self, client):
        r = client.get("/")
        assert r.headers.get("x-frame-options") in ("DENY", "SAMEORIGIN")

class TestInputValidation:
    @pytest.mark.parametrize("payload", [
        "'; DROP TABLE users; --",
        "<script>alert(1)</script>",
        "{{7*7}}",  # Template injection
        "${7*7}",   # Expression injection
        "../../../etc/passwd",
    ])
    def test_malicious_input_rejected(self, auth_client, payload):
        r = auth_client.post("/api/search/", json={"query": payload})
        assert r.status_code in (200, 400, 422)
        # Should never return a 500 (unhandled injection)
        assert r.status_code != 500

Dependency Lockfile Integrity

# Verify that requirements.txt hasn't been tampered with
import hashlib
from pathlib import Path

def verify_lockfile_integrity(
    lockfile: Path, 
    expected_hash: str
) -> bool:
    """Check lockfile hash against a known-good value."""
    content = lockfile.read_bytes()
    actual = hashlib.sha256(content).hexdigest()
    return actual == expected_hash

# Store the hash in a separate, signed config
# CI checks this before installing dependencies

The one thing to remember: secure coding is the compound interest of software engineering — each small habit contributes almost nothing alone, but together they create systems that resist real-world attacks across thousands of threat vectors.

pythonsecuritybest-practices

See Also