Python Certificate Pinning — Deep Dive

The Trust Model Problem

The web PKI model relies on approximately 130 root CAs, each capable of issuing certificates for any domain. This creates a combinatorial trust surface. Historical breaches illustrate the risk:

  • DigiNotar (2011): Compromised CA issued fraudulent *.google.com certificates, used to intercept Iranian users’ Gmail traffic. All major browsers revoked DigiNotar’s root, collapsing the CA.
  • Symantec (2017): Issued over 30,000 certificates with improper validation. Chrome and Firefox distrusted their entire root hierarchy.
  • Kazakhstan (2020): Government ordered ISPs to install a state-operated root CA, enabling mass HTTPS interception.

Certificate pinning defends against all three scenarios by restricting which certificates your application accepts, regardless of what the OS trust store contains.

SPKI Pin Extraction

The industry-standard pin format hashes the Subject Public Key Info (SPKI) — the DER-encoded public key and algorithm identifier. This is what Chrome used for HPKP and what OWASP recommends for mobile apps.

import ssl
import hashlib
import base64
import socket

def get_spki_pin(hostname: str, port: int = 443) -> str:
    """Extract the SPKI SHA-256 pin from a live server."""
    ctx = ssl.create_default_context()
    with socket.create_connection((hostname, port)) as sock:
        with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
            der_cert = ssock.getpeercert(binary_form=True)
    
    # Extract public key from DER certificate
    # Using OpenSSL via subprocess for SPKI extraction
    import subprocess
    result = subprocess.run(
        ["openssl", "x509", "-inform", "DER", "-pubkey", "-noout"],
        input=der_cert, capture_output=True
    )
    pem_pubkey = result.stdout
    
    # Convert PEM public key to DER for hashing
    result2 = subprocess.run(
        ["openssl", "pkey", "-pubin", "-outform", "DER"],
        input=pem_pubkey, capture_output=True
    )
    der_pubkey = result2.stdout
    
    pin = base64.b64encode(
        hashlib.sha256(der_pubkey).digest()
    ).decode()
    return f"sha256/{pin}"

# pin = get_spki_pin("api.stripe.com")
# "sha256/4a6b0..."

Custom SSLContext with Pinning

Approach 1: Restricted CA Bundle

import ssl
import certifi
from pathlib import Path

def create_pinned_context(ca_file: str | Path) -> ssl.SSLContext:
    """Create an SSL context that trusts only a specific CA."""
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ctx.minimum_version = ssl.TLSVersion.TLSv1_2
    ctx.verify_mode = ssl.CERT_REQUIRED
    ctx.check_hostname = True
    
    # Load ONLY our pinned CA — not system defaults
    ctx.load_verify_locations(cafile=str(ca_file))
    
    # Disable loading system CA bundle
    # (SSLContext doesn't load system CAs by default; 
    #  only load_default_certs() does)
    
    return ctx

# Usage with urllib3/requests
import urllib3

ctx = create_pinned_context("/path/to/pinned-ca.pem")
pool = urllib3.HTTPSConnectionPool(
    "api.example.com", port=443, ssl_context=ctx
)
response = pool.request("GET", "/health")

Approach 2: Post-Handshake Pin Verification

import ssl
import hashlib
import base64
import socket
from typing import Set

class PinVerifier:
    """Verify certificate pins after TLS handshake."""
    
    def __init__(self, pins: Set[str]):
        # pins format: {"sha256/base64encoded...", ...}
        self.pins = pins
    
    def verify_connection(self, ssock: ssl.SSLSocket) -> bool:
        der_cert = ssock.getpeercert(binary_form=True)
        cert_hash = base64.b64encode(
            hashlib.sha256(der_cert).digest()
        ).decode()
        pin = f"sha256/{cert_hash}"
        return pin in self.pins
    
    def connect_and_verify(self, hostname: str, 
                            port: int = 443) -> ssl.SSLSocket:
        ctx = ssl.create_default_context()
        sock = socket.create_connection((hostname, port))
        ssock = ctx.wrap_socket(sock, server_hostname=hostname)
        
        if not self.verify_connection(ssock):
            ssock.close()
            raise ssl.SSLCertVerificationError(
                f"Certificate pin mismatch for {hostname}. "
                f"Expected one of: {self.pins}"
            )
        return ssock

# Usage
verifier = PinVerifier({
    "sha256/ko8tivDrIRfxB6CE01Q+fY0+1Gk7xQg8JOA7U/q0okM=",  # Current
    "sha256/Vr8H0mjGvGJPaLGPgKzfBX6dBjRbL2nD91R3fYXhTEw=",  # Backup
})

ssock = verifier.connect_and_verify("api.example.com")

Approach 3: Requests with Custom Transport Adapter

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
import hashlib
import base64
import ssl

class PinningAdapter(HTTPAdapter):
    """Requests adapter that verifies certificate pins."""
    
    def __init__(self, pins: set, **kwargs):
        self.pins = pins
        super().__init__(**kwargs)
    
    def send(self, request, **kwargs):
        response = super().send(request, **kwargs)
        
        # Access the underlying connection's certificate
        conn = response.raw._connection
        if conn and hasattr(conn, 'sock') and conn.sock:
            der_cert = conn.sock.getpeercert(binary_form=True)
            cert_hash = base64.b64encode(
                hashlib.sha256(der_cert).digest()
            ).decode()
            pin = f"sha256/{cert_hash}"
            
            if pin not in self.pins:
                response.close()
                raise ssl.SSLCertVerificationError(
                    f"Pin verification failed. Got: {pin}"
                )
        
        return response

# Usage
session = requests.Session()
session.mount("https://api.example.com", PinningAdapter(pins={
    "sha256/ko8tivDrIRfxB6CE01Q+fY0+1Gk7xQg8JOA7U/q0okM=",
}))

response = session.get("https://api.example.com/data")

Automated Pin Rotation

import json
import hashlib
import base64
import ssl
import socket
from datetime import datetime, timezone
from pathlib import Path

class PinManager:
    """Manage certificate pins with rotation support."""
    
    def __init__(self, config_path: Path):
        self.config_path = config_path
        self.config = json.loads(config_path.read_text())
    
    def check_and_rotate(self, hostname: str) -> dict:
        """Check current server cert and update pins if needed."""
        current_pin = self._get_live_pin(hostname)
        host_config = self.config.get(hostname, {
            "pins": [], "last_checked": None
        })
        
        known_pins = {p["hash"] for p in host_config["pins"]}
        
        if current_pin not in known_pins:
            # New pin detected — add it, alert operators
            host_config["pins"].append({
                "hash": current_pin,
                "first_seen": datetime.now(timezone.utc).isoformat(),
                "status": "pending_review",
            })
            result = {"action": "new_pin_detected", "pin": current_pin}
        else:
            result = {"action": "no_change"}
        
        # Expire pins not seen in 90 days
        host_config["pins"] = [
            p for p in host_config["pins"]
            if p["status"] != "expired"
        ]
        
        host_config["last_checked"] = (
            datetime.now(timezone.utc).isoformat()
        )
        self.config[hostname] = host_config
        self.config_path.write_text(
            json.dumps(self.config, indent=2)
        )
        
        return result
    
    def _get_live_pin(self, hostname: str) -> str:
        ctx = ssl.create_default_context()
        with socket.create_connection((hostname, 443)) as sock:
            with ctx.wrap_socket(sock, 
                                  server_hostname=hostname) as ssock:
                der = ssock.getpeercert(binary_form=True)
        return "sha256/" + base64.b64encode(
            hashlib.sha256(der).digest()
        ).decode()

httpx with Certificate Pinning

import httpx
import ssl

def create_pinned_httpx_client(
    ca_file: str, 
    hostname: str
) -> httpx.Client:
    """Create an httpx client pinned to a specific CA."""
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ctx.minimum_version = ssl.TLSVersion.TLSv1_2
    ctx.load_verify_locations(cafile=ca_file)
    ctx.check_hostname = True
    ctx.verify_mode = ssl.CERT_REQUIRED
    
    return httpx.Client(
        verify=ctx,
        base_url=f"https://{hostname}",
    )

# Usage
client = create_pinned_httpx_client(
    "/etc/ssl/pinned/stripe-ca.pem",
    "api.stripe.com"
)
response = client.get("/v1/charges")

Attack Scenarios Pinning Defeats

1. Corporate MITM Proxy

Many organizations install a corporate root CA on employee machines to inspect HTTPS traffic. A pinned application ignores this CA and refuses the proxy’s re-signed certificate.

2. DNS Hijacking + Fraudulent Certificate

An attacker poisons DNS to redirect traffic to their server and presents a certificate from a compromised or colluding CA. Pinning rejects the certificate because it doesn’t match the expected pin.

3. BGP Hijacking

Nation-state attackers reroute IP traffic at the network level. Even with a valid certificate from a coerced CA, pinning detects the mismatch.

Operational Hardening

Monitor certificate expiration:

import ssl
import socket
from datetime import datetime

def days_until_expiry(hostname: str) -> int:
    ctx = ssl.create_default_context()
    with socket.create_connection((hostname, 443)) as sock:
        with ctx.wrap_socket(sock, 
                              server_hostname=hostname) as ssock:
            cert = ssock.getpeercert()
    expires = datetime.strptime(
        cert["notAfter"], "%b %d %H:%M:%S %Y %Z"
    )
    return (expires - datetime.utcnow()).days

# Alert if < 30 days
days = days_until_expiry("api.example.com")
if days < 30:
    send_alert(f"Certificate expires in {days} days!")

Multiple backup pins: Always maintain at least two pins — the current certificate and a pre-generated backup key pair stored offline. If the current key is compromised, you can immediately deploy the backup.

Pin at the intermediate level for third-party services: You don’t control Stripe’s or GitHub’s certificate rotation schedule. Pin their intermediate CA instead of the leaf certificate. This survives routine renewals while still protecting against CA compromise from other roots.

Test pin verification in CI: Include a test that connects to your pinned services and verifies the current pin is still in your pin set. This gives advance warning before rotation breaks production.

The one thing to remember: certificate pinning transforms TLS from “trust any recognized CA” to “trust only this specific identity” — a decisive upgrade for high-security connections, provided you plan for the operational reality of certificate rotation.

pythonsecuritynetworking

See Also