Python Certificate Pinning — Deep Dive
The Trust Model Problem
The web PKI model relies on approximately 130 root CAs, each capable of issuing certificates for any domain. This creates a combinatorial trust surface. Historical breaches illustrate the risk:
- DigiNotar (2011): Compromised CA issued fraudulent
*.google.comcertificates, used to intercept Iranian users’ Gmail traffic. All major browsers revoked DigiNotar’s root, collapsing the CA. - Symantec (2017): Issued over 30,000 certificates with improper validation. Chrome and Firefox distrusted their entire root hierarchy.
- Kazakhstan (2020): Government ordered ISPs to install a state-operated root CA, enabling mass HTTPS interception.
Certificate pinning defends against all three scenarios by restricting which certificates your application accepts, regardless of what the OS trust store contains.
SPKI Pin Extraction
The industry-standard pin format hashes the Subject Public Key Info (SPKI) — the DER-encoded public key and algorithm identifier. This is what Chrome used for HPKP and what OWASP recommends for mobile apps.
import ssl
import hashlib
import base64
import socket
def get_spki_pin(hostname: str, port: int = 443) -> str:
"""Extract the SPKI SHA-256 pin from a live server."""
ctx = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
der_cert = ssock.getpeercert(binary_form=True)
# Extract public key from DER certificate
# Using OpenSSL via subprocess for SPKI extraction
import subprocess
result = subprocess.run(
["openssl", "x509", "-inform", "DER", "-pubkey", "-noout"],
input=der_cert, capture_output=True
)
pem_pubkey = result.stdout
# Convert PEM public key to DER for hashing
result2 = subprocess.run(
["openssl", "pkey", "-pubin", "-outform", "DER"],
input=pem_pubkey, capture_output=True
)
der_pubkey = result2.stdout
pin = base64.b64encode(
hashlib.sha256(der_pubkey).digest()
).decode()
return f"sha256/{pin}"
# pin = get_spki_pin("api.stripe.com")
# "sha256/4a6b0..."
Custom SSLContext with Pinning
Approach 1: Restricted CA Bundle
import ssl
import certifi
from pathlib import Path
def create_pinned_context(ca_file: str | Path) -> ssl.SSLContext:
"""Create an SSL context that trusts only a specific CA."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
# Load ONLY our pinned CA — not system defaults
ctx.load_verify_locations(cafile=str(ca_file))
# Disable loading system CA bundle
# (SSLContext doesn't load system CAs by default;
# only load_default_certs() does)
return ctx
# Usage with urllib3/requests
import urllib3
ctx = create_pinned_context("/path/to/pinned-ca.pem")
pool = urllib3.HTTPSConnectionPool(
"api.example.com", port=443, ssl_context=ctx
)
response = pool.request("GET", "/health")
Approach 2: Post-Handshake Pin Verification
import ssl
import hashlib
import base64
import socket
from typing import Set
class PinVerifier:
"""Verify certificate pins after TLS handshake."""
def __init__(self, pins: Set[str]):
# pins format: {"sha256/base64encoded...", ...}
self.pins = pins
def verify_connection(self, ssock: ssl.SSLSocket) -> bool:
der_cert = ssock.getpeercert(binary_form=True)
cert_hash = base64.b64encode(
hashlib.sha256(der_cert).digest()
).decode()
pin = f"sha256/{cert_hash}"
return pin in self.pins
def connect_and_verify(self, hostname: str,
port: int = 443) -> ssl.SSLSocket:
ctx = ssl.create_default_context()
sock = socket.create_connection((hostname, port))
ssock = ctx.wrap_socket(sock, server_hostname=hostname)
if not self.verify_connection(ssock):
ssock.close()
raise ssl.SSLCertVerificationError(
f"Certificate pin mismatch for {hostname}. "
f"Expected one of: {self.pins}"
)
return ssock
# Usage
verifier = PinVerifier({
"sha256/ko8tivDrIRfxB6CE01Q+fY0+1Gk7xQg8JOA7U/q0okM=", # Current
"sha256/Vr8H0mjGvGJPaLGPgKzfBX6dBjRbL2nD91R3fYXhTEw=", # Backup
})
ssock = verifier.connect_and_verify("api.example.com")
Approach 3: Requests with Custom Transport Adapter
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
import hashlib
import base64
import ssl
class PinningAdapter(HTTPAdapter):
"""Requests adapter that verifies certificate pins."""
def __init__(self, pins: set, **kwargs):
self.pins = pins
super().__init__(**kwargs)
def send(self, request, **kwargs):
response = super().send(request, **kwargs)
# Access the underlying connection's certificate
conn = response.raw._connection
if conn and hasattr(conn, 'sock') and conn.sock:
der_cert = conn.sock.getpeercert(binary_form=True)
cert_hash = base64.b64encode(
hashlib.sha256(der_cert).digest()
).decode()
pin = f"sha256/{cert_hash}"
if pin not in self.pins:
response.close()
raise ssl.SSLCertVerificationError(
f"Pin verification failed. Got: {pin}"
)
return response
# Usage
session = requests.Session()
session.mount("https://api.example.com", PinningAdapter(pins={
"sha256/ko8tivDrIRfxB6CE01Q+fY0+1Gk7xQg8JOA7U/q0okM=",
}))
response = session.get("https://api.example.com/data")
Automated Pin Rotation
import json
import hashlib
import base64
import ssl
import socket
from datetime import datetime, timezone
from pathlib import Path
class PinManager:
"""Manage certificate pins with rotation support."""
def __init__(self, config_path: Path):
self.config_path = config_path
self.config = json.loads(config_path.read_text())
def check_and_rotate(self, hostname: str) -> dict:
"""Check current server cert and update pins if needed."""
current_pin = self._get_live_pin(hostname)
host_config = self.config.get(hostname, {
"pins": [], "last_checked": None
})
known_pins = {p["hash"] for p in host_config["pins"]}
if current_pin not in known_pins:
# New pin detected — add it, alert operators
host_config["pins"].append({
"hash": current_pin,
"first_seen": datetime.now(timezone.utc).isoformat(),
"status": "pending_review",
})
result = {"action": "new_pin_detected", "pin": current_pin}
else:
result = {"action": "no_change"}
# Expire pins not seen in 90 days
host_config["pins"] = [
p for p in host_config["pins"]
if p["status"] != "expired"
]
host_config["last_checked"] = (
datetime.now(timezone.utc).isoformat()
)
self.config[hostname] = host_config
self.config_path.write_text(
json.dumps(self.config, indent=2)
)
return result
def _get_live_pin(self, hostname: str) -> str:
ctx = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with ctx.wrap_socket(sock,
server_hostname=hostname) as ssock:
der = ssock.getpeercert(binary_form=True)
return "sha256/" + base64.b64encode(
hashlib.sha256(der).digest()
).decode()
httpx with Certificate Pinning
import httpx
import ssl
def create_pinned_httpx_client(
ca_file: str,
hostname: str
) -> httpx.Client:
"""Create an httpx client pinned to a specific CA."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.load_verify_locations(cafile=ca_file)
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
return httpx.Client(
verify=ctx,
base_url=f"https://{hostname}",
)
# Usage
client = create_pinned_httpx_client(
"/etc/ssl/pinned/stripe-ca.pem",
"api.stripe.com"
)
response = client.get("/v1/charges")
Attack Scenarios Pinning Defeats
1. Corporate MITM Proxy
Many organizations install a corporate root CA on employee machines to inspect HTTPS traffic. A pinned application ignores this CA and refuses the proxy’s re-signed certificate.
2. DNS Hijacking + Fraudulent Certificate
An attacker poisons DNS to redirect traffic to their server and presents a certificate from a compromised or colluding CA. Pinning rejects the certificate because it doesn’t match the expected pin.
3. BGP Hijacking
Nation-state attackers reroute IP traffic at the network level. Even with a valid certificate from a coerced CA, pinning detects the mismatch.
Operational Hardening
Monitor certificate expiration:
import ssl
import socket
from datetime import datetime
def days_until_expiry(hostname: str) -> int:
ctx = ssl.create_default_context()
with socket.create_connection((hostname, 443)) as sock:
with ctx.wrap_socket(sock,
server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
expires = datetime.strptime(
cert["notAfter"], "%b %d %H:%M:%S %Y %Z"
)
return (expires - datetime.utcnow()).days
# Alert if < 30 days
days = days_until_expiry("api.example.com")
if days < 30:
send_alert(f"Certificate expires in {days} days!")
Multiple backup pins: Always maintain at least two pins — the current certificate and a pre-generated backup key pair stored offline. If the current key is compromised, you can immediately deploy the backup.
Pin at the intermediate level for third-party services: You don’t control Stripe’s or GitHub’s certificate rotation schedule. Pin their intermediate CA instead of the leaf certificate. This survives routine renewals while still protecting against CA compromise from other roots.
Test pin verification in CI: Include a test that connects to your pinned services and verifies the current pin is still in your pin set. This gives advance warning before rotation breaks production.
The one thing to remember: certificate pinning transforms TLS from “trust any recognized CA” to “trust only this specific identity” — a decisive upgrade for high-security connections, provided you plan for the operational reality of certificate rotation.
See Also
- Python Cryptography Library Understand Python Cryptography Library with a vivid mental model so secure Python choices feel obvious, not scary.
- Python Dependency Vulnerability Scanning Why the libraries your Python project uses might be secretly broken — and how to find out before hackers do.
- Python Hashlib Hashing How Python turns any data into a unique fingerprint — and why that fingerprint can never be reversed.
- Python Hmac Authentication How Python proves a message wasn't tampered with — using a secret handshake only you and the receiver know.
- Python Owasp Top Ten The ten most common ways hackers break into web apps — and how Python developers can stop every single one.