Certificate Management in Python — Deep Dive
Generating self-signed certificates with the cryptography library
For development and testing, create certificates programmatically:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from datetime import datetime, timedelta, timezone
import ipaddress
def generate_self_signed_cert(
common_name: str,
san_domains: list[str] = None,
san_ips: list[str] = None,
days_valid: int = 365
) -> tuple[bytes, bytes]:
"""Generate a self-signed certificate and private key."""
# Generate EC private key (faster and smaller than RSA)
private_key = ec.generate_private_key(ec.SECP256R1())
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Internal"),
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
])
# Build Subject Alternative Names
san_list = []
for domain in (san_domains or []):
san_list.append(x509.DNSName(domain))
for ip in (san_ips or []):
san_list.append(x509.IPAddress(ipaddress.ip_address(ip)))
if not san_list:
san_list.append(x509.DNSName(common_name))
now = datetime.now(timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days_valid))
.add_extension(x509.SubjectAlternativeName(san_list), critical=False)
.add_extension(
x509.BasicConstraints(ca=False, path_length=None), critical=True
)
.sign(private_key, hashes.SHA256())
)
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
key_pem = private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)
return cert_pem, key_pem
cert_pem, key_pem = generate_self_signed_cert(
"myservice.internal",
san_domains=["myservice.internal", "localhost"],
san_ips=["127.0.0.1"]
)
Building an internal Certificate Authority
For production internal PKI, create a root CA and issue signed certificates:
class InternalCA:
"""Minimal internal Certificate Authority for service-to-service TLS."""
def __init__(self):
self.ca_key = ec.generate_private_key(ec.SECP256R1())
now = datetime.now(timezone.utc)
self.ca_cert = (
x509.CertificateBuilder()
.subject_name(x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg Internal CA"),
x509.NameAttribute(NameOID.COMMON_NAME, "MyOrg Root CA"),
]))
.issuer_name(x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg Internal CA"),
x509.NameAttribute(NameOID.COMMON_NAME, "MyOrg Root CA"),
]))
.public_key(self.ca_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=3650)) # 10-year CA
.add_extension(
x509.BasicConstraints(ca=True, path_length=1), critical=True
)
.add_extension(
x509.KeyUsage(
digital_signature=True, key_cert_sign=True, crl_sign=True,
content_commitment=False, key_encipherment=False,
data_encipherment=False, key_agreement=False,
encipher_only=False, decipher_only=False
), critical=True
)
.sign(self.ca_key, hashes.SHA256())
)
def issue_certificate(self, csr_pem: bytes, days_valid: int = 90) -> bytes:
"""Sign a CSR and return the certificate."""
csr = x509.load_pem_x509_csr(csr_pem)
# Verify CSR signature
if not csr.is_signature_valid:
raise ValueError("Invalid CSR signature")
now = datetime.now(timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(csr.subject)
.issuer_name(self.ca_cert.subject)
.public_key(csr.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + timedelta(days=days_valid))
.add_extension(
x509.BasicConstraints(ca=False, path_length=None), critical=True
)
)
# Copy SANs from CSR if present
try:
san_ext = csr.extensions.get_extension_for_class(
x509.SubjectAlternativeName
)
cert = cert.add_extension(san_ext.value, critical=False)
except x509.ExtensionNotFound:
pass
signed_cert = cert.sign(self.ca_key, hashes.SHA256())
return signed_cert.public_bytes(serialization.Encoding.PEM)
def get_ca_cert_pem(self) -> bytes:
"""Return CA certificate for trust store distribution."""
return self.ca_cert.public_bytes(serialization.Encoding.PEM)
def create_csr(common_name: str, san_domains: list[str]) -> tuple[bytes, bytes]:
"""Generate a private key and CSR."""
key = ec.generate_private_key(ec.SECP256R1())
san_list = [x509.DNSName(d) for d in san_domains]
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
]))
.add_extension(x509.SubjectAlternativeName(san_list), critical=False)
.sign(key, hashes.SHA256())
)
csr_pem = csr.public_bytes(serialization.Encoding.PEM)
key_pem = key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)
return csr_pem, key_pem
# Usage
ca = InternalCA()
csr_pem, service_key = create_csr("api.internal", ["api.internal", "api"])
cert_pem = ca.issue_certificate(csr_pem, days_valid=90)
ca_cert_pem = ca.get_ca_cert_pem()
Implementing mutual TLS in Python
With certificates from the internal CA, set up mutual authentication between services:
import ssl
import http.server
import urllib.request
def create_mtls_server_context(
cert_pem: str, key_pem: str, ca_cert_pem: str
) -> ssl.SSLContext:
"""Create SSL context for a server requiring client certificates."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(cert_pem, key_pem)
ctx.load_verify_locations(ca_cert_pem)
ctx.verify_mode = ssl.CERT_REQUIRED # Require client certificate
ctx.check_hostname = False # We verify via CA trust
ctx.minimum_version = ssl.TLSVersion.TLSv1_3 # Enforce TLS 1.3
return ctx
def create_mtls_client_context(
cert_pem: str, key_pem: str, ca_cert_pem: str
) -> ssl.SSLContext:
"""Create SSL context for a client presenting its certificate."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.load_cert_chain(cert_pem, key_pem)
ctx.load_verify_locations(ca_cert_pem)
ctx.check_hostname = True
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
return ctx
# Server example
server_ctx = create_mtls_server_context(
"server-cert.pem", "server-key.pem", "ca-cert.pem"
)
# Client example with requests library
import requests
def mtls_request(url: str, client_cert: str, client_key: str, ca_cert: str):
"""Make an mTLS-authenticated request."""
return requests.get(
url,
cert=(client_cert, client_key), # Client certificate + key
verify=ca_cert # CA certificate to verify server
)
ACME certificate automation with certbot integration
Automate Let’s Encrypt certificate issuance from Python:
import subprocess
import json
from pathlib import Path
class ACMECertificateManager:
"""Manage Let's Encrypt certificates via certbot."""
def __init__(self, cert_dir: str = "/etc/letsencrypt"):
self.cert_dir = Path(cert_dir)
def obtain_certificate(self, domains: list[str], email: str,
method: str = "standalone") -> dict:
"""Obtain a new certificate via ACME challenge."""
cmd = [
"certbot", "certonly",
"--non-interactive",
"--agree-tos",
f"--email={email}",
f"--preferred-challenges={'dns' if method == 'dns' else 'http'}",
f"--{method}",
]
for domain in domains:
cmd.extend(["-d", domain])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Certbot failed: {result.stderr}")
primary_domain = domains[0]
return self._get_cert_paths(primary_domain)
def renew_all(self) -> list[str]:
"""Renew all certificates that are near expiration."""
result = subprocess.run(
["certbot", "renew", "--non-interactive"],
capture_output=True, text=True
)
renewed = []
for line in result.stdout.splitlines():
if "success" in line.lower():
renewed.append(line.strip())
return renewed
def get_certificate_info(self, domain: str) -> dict:
"""Read certificate details for a domain."""
cert_path = self.cert_dir / "live" / domain / "fullchain.pem"
if not cert_path.exists():
raise FileNotFoundError(f"No certificate found for {domain}")
cert_data = cert_path.read_bytes()
cert = x509.load_pem_x509_certificate(cert_data)
return {
"domain": domain,
"issuer": cert.issuer.rfc4514_string(),
"not_before": cert.not_valid_before_utc.isoformat(),
"not_after": cert.not_valid_after_utc.isoformat(),
"serial": str(cert.serial_number),
"days_remaining": (cert.not_valid_after_utc - datetime.now(timezone.utc)).days,
"sans": [
name.value for name in
cert.extensions.get_extension_for_class(
x509.SubjectAlternativeName
).value
]
}
def _get_cert_paths(self, domain: str) -> dict:
live_dir = self.cert_dir / "live" / domain
return {
"cert": str(live_dir / "cert.pem"),
"chain": str(live_dir / "chain.pem"),
"fullchain": str(live_dir / "fullchain.pem"),
"privkey": str(live_dir / "privkey.pem"),
}
Certificate expiration monitoring
Proactively detect certificates approaching expiration across your infrastructure:
import ssl
import socket
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
@dataclass
class CertificateStatus:
hostname: str
port: int
days_remaining: int
expiry_date: str
issuer: str
subject: str
error: Optional[str] = None
class CertificateMonitor:
"""Monitor TLS certificate expiration across services."""
def __init__(self, warning_days: int = 30, critical_days: int = 7):
self.warning_days = warning_days
self.critical_days = critical_days
def check_host(self, hostname: str, port: int = 443,
timeout: int = 10) -> CertificateStatus:
"""Check a single host's certificate."""
try:
ctx = ssl.create_default_context()
with socket.create_connection((hostname, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
expiry_str = cert["notAfter"]
expiry = datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z").replace(
tzinfo=timezone.utc
)
days_remaining = (expiry - datetime.now(timezone.utc)).days
issuer = dict(x[0] for x in cert["issuer"])
subject = dict(x[0] for x in cert["subject"])
return CertificateStatus(
hostname=hostname,
port=port,
days_remaining=days_remaining,
expiry_date=expiry.isoformat(),
issuer=issuer.get("organizationName", "Unknown"),
subject=subject.get("commonName", hostname),
)
except Exception as e:
return CertificateStatus(
hostname=hostname, port=port, days_remaining=-1,
expiry_date="", issuer="", subject="",
error=str(e)
)
def check_multiple(self, hosts: list[tuple[str, int]]) -> list[CertificateStatus]:
"""Check multiple hosts concurrently."""
with ThreadPoolExecutor(max_workers=20) as pool:
futures = [pool.submit(self.check_host, h, p) for h, p in hosts]
return [f.result() for f in futures]
def generate_report(self, results: list[CertificateStatus]) -> str:
"""Generate a human-readable expiration report."""
lines = ["Certificate Expiration Report", "=" * 50]
critical = [r for r in results if 0 <= r.days_remaining <= self.critical_days]
warning = [r for r in results
if self.critical_days < r.days_remaining <= self.warning_days]
errors = [r for r in results if r.error]
if critical:
lines.append(f"\n🔴 CRITICAL ({len(critical)} certificates):")
for r in critical:
lines.append(f" {r.hostname}:{r.port} — {r.days_remaining} days remaining")
if warning:
lines.append(f"\n🟡 WARNING ({len(warning)} certificates):")
for r in warning:
lines.append(f" {r.hostname}:{r.port} — {r.days_remaining} days remaining")
if errors:
lines.append(f"\n❌ ERRORS ({len(errors)} hosts):")
for r in errors:
lines.append(f" {r.hostname}:{r.port} — {r.error}")
ok_count = len(results) - len(critical) - len(warning) - len(errors)
lines.append(f"\n✅ {ok_count} certificates OK")
return "\n".join(lines)
# Usage
monitor = CertificateMonitor(warning_days=30, critical_days=7)
hosts = [
("google.com", 443),
("github.com", 443),
("api.internal.example.com", 8443),
]
results = monitor.check_multiple(hosts)
print(monitor.generate_report(results))
Certificate pinning for high-security clients
When you need to trust a specific certificate rather than any certificate signed by any CA:
import hashlib
import ssl
def get_certificate_pin(cert_der: bytes) -> str:
"""Compute SPKI pin (base64 SHA-256 of the Subject Public Key Info)."""
from cryptography.x509 import load_der_x509_certificate
cert = load_der_x509_certificate(cert_der)
spki = cert.public_key().public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo
)
digest = hashlib.sha256(spki).digest()
import base64
return base64.b64encode(digest).decode()
def pinned_request(hostname: str, expected_pin: str, port: int = 443):
"""Make a connection that verifies the server's certificate pin."""
ctx = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
cert_der = ssock.getpeercert(binary_form=True)
actual_pin = get_certificate_pin(cert_der)
if actual_pin != expected_pin:
raise ssl.SSLError(
f"Certificate pin mismatch for {hostname}! "
f"Expected: {expected_pin}, Got: {actual_pin}"
)
return ssock # Connection is verified and pinned
The one thing to remember: Certificate management in Python spans the full lifecycle — generating keys and CSRs with cryptography, building internal CAs for service-to-service mTLS, automating public certificate renewal via ACME/certbot, and monitoring expiration across your infrastructure to prevent outages.
See Also
- Python Data Masking Techniques How companies hide real names, emails, and credit card numbers while keeping data useful for testing and analytics
- Python Homomorphic Encryption How you can do math on locked data without ever unlocking it — like solving a puzzle inside a sealed box
- Python Key Management Practices Why the key to your encryption is more important than the encryption itself — and how to keep it safe
- Python Secure Multiparty Computation How a group of friends can figure out who earns the most without anyone revealing their actual salary
- Python Tokenization Sensitive Data How companies replace your real credit card number with a random stand-in that's useless to hackers but works perfectly for the business