Certificate Management in Python — Deep Dive

Generating self-signed certificates with the cryptography library

For development and testing, create certificates programmatically:

from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from datetime import datetime, timedelta, timezone
import ipaddress

def generate_self_signed_cert(
    common_name: str,
    san_domains: list[str] = None,
    san_ips: list[str] = None,
    days_valid: int = 365
) -> tuple[bytes, bytes]:
    """Generate a self-signed certificate and private key."""
    # Generate EC private key (faster and smaller than RSA)
    private_key = ec.generate_private_key(ec.SECP256R1())

    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Internal"),
        x509.NameAttribute(NameOID.COMMON_NAME, common_name),
    ])

    # Build Subject Alternative Names
    san_list = []
    for domain in (san_domains or []):
        san_list.append(x509.DNSName(domain))
    for ip in (san_ips or []):
        san_list.append(x509.IPAddress(ipaddress.ip_address(ip)))
    if not san_list:
        san_list.append(x509.DNSName(common_name))

    now = datetime.now(timezone.utc)
    cert = (
        x509.CertificateBuilder()
        .subject_name(subject)
        .issuer_name(issuer)
        .public_key(private_key.public_key())
        .serial_number(x509.random_serial_number())
        .not_valid_before(now)
        .not_valid_after(now + timedelta(days=days_valid))
        .add_extension(x509.SubjectAlternativeName(san_list), critical=False)
        .add_extension(
            x509.BasicConstraints(ca=False, path_length=None), critical=True
        )
        .sign(private_key, hashes.SHA256())
    )

    cert_pem = cert.public_bytes(serialization.Encoding.PEM)
    key_pem = private_key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.PKCS8,
        serialization.NoEncryption()
    )
    return cert_pem, key_pem

cert_pem, key_pem = generate_self_signed_cert(
    "myservice.internal",
    san_domains=["myservice.internal", "localhost"],
    san_ips=["127.0.0.1"]
)

Building an internal Certificate Authority

For production internal PKI, create a root CA and issue signed certificates:

class InternalCA:
    """Minimal internal Certificate Authority for service-to-service TLS."""

    def __init__(self):
        self.ca_key = ec.generate_private_key(ec.SECP256R1())
        now = datetime.now(timezone.utc)
        self.ca_cert = (
            x509.CertificateBuilder()
            .subject_name(x509.Name([
                x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg Internal CA"),
                x509.NameAttribute(NameOID.COMMON_NAME, "MyOrg Root CA"),
            ]))
            .issuer_name(x509.Name([
                x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyOrg Internal CA"),
                x509.NameAttribute(NameOID.COMMON_NAME, "MyOrg Root CA"),
            ]))
            .public_key(self.ca_key.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(now)
            .not_valid_after(now + timedelta(days=3650))  # 10-year CA
            .add_extension(
                x509.BasicConstraints(ca=True, path_length=1), critical=True
            )
            .add_extension(
                x509.KeyUsage(
                    digital_signature=True, key_cert_sign=True, crl_sign=True,
                    content_commitment=False, key_encipherment=False,
                    data_encipherment=False, key_agreement=False,
                    encipher_only=False, decipher_only=False
                ), critical=True
            )
            .sign(self.ca_key, hashes.SHA256())
        )

    def issue_certificate(self, csr_pem: bytes, days_valid: int = 90) -> bytes:
        """Sign a CSR and return the certificate."""
        csr = x509.load_pem_x509_csr(csr_pem)

        # Verify CSR signature
        if not csr.is_signature_valid:
            raise ValueError("Invalid CSR signature")

        now = datetime.now(timezone.utc)
        cert = (
            x509.CertificateBuilder()
            .subject_name(csr.subject)
            .issuer_name(self.ca_cert.subject)
            .public_key(csr.public_key())
            .serial_number(x509.random_serial_number())
            .not_valid_before(now)
            .not_valid_after(now + timedelta(days=days_valid))
            .add_extension(
                x509.BasicConstraints(ca=False, path_length=None), critical=True
            )
        )

        # Copy SANs from CSR if present
        try:
            san_ext = csr.extensions.get_extension_for_class(
                x509.SubjectAlternativeName
            )
            cert = cert.add_extension(san_ext.value, critical=False)
        except x509.ExtensionNotFound:
            pass

        signed_cert = cert.sign(self.ca_key, hashes.SHA256())
        return signed_cert.public_bytes(serialization.Encoding.PEM)

    def get_ca_cert_pem(self) -> bytes:
        """Return CA certificate for trust store distribution."""
        return self.ca_cert.public_bytes(serialization.Encoding.PEM)


def create_csr(common_name: str, san_domains: list[str]) -> tuple[bytes, bytes]:
    """Generate a private key and CSR."""
    key = ec.generate_private_key(ec.SECP256R1())

    san_list = [x509.DNSName(d) for d in san_domains]

    csr = (
        x509.CertificateSigningRequestBuilder()
        .subject_name(x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
        ]))
        .add_extension(x509.SubjectAlternativeName(san_list), critical=False)
        .sign(key, hashes.SHA256())
    )

    csr_pem = csr.public_bytes(serialization.Encoding.PEM)
    key_pem = key.private_bytes(
        serialization.Encoding.PEM,
        serialization.PrivateFormat.PKCS8,
        serialization.NoEncryption()
    )
    return csr_pem, key_pem

# Usage
ca = InternalCA()
csr_pem, service_key = create_csr("api.internal", ["api.internal", "api"])
cert_pem = ca.issue_certificate(csr_pem, days_valid=90)
ca_cert_pem = ca.get_ca_cert_pem()

Implementing mutual TLS in Python

With certificates from the internal CA, set up mutual authentication between services:

import ssl
import http.server
import urllib.request

def create_mtls_server_context(
    cert_pem: str, key_pem: str, ca_cert_pem: str
) -> ssl.SSLContext:
    """Create SSL context for a server requiring client certificates."""
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
    ctx.load_cert_chain(cert_pem, key_pem)
    ctx.load_verify_locations(ca_cert_pem)
    ctx.verify_mode = ssl.CERT_REQUIRED          # Require client certificate
    ctx.check_hostname = False                     # We verify via CA trust
    ctx.minimum_version = ssl.TLSVersion.TLSv1_3  # Enforce TLS 1.3
    return ctx

def create_mtls_client_context(
    cert_pem: str, key_pem: str, ca_cert_pem: str
) -> ssl.SSLContext:
    """Create SSL context for a client presenting its certificate."""
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ctx.load_cert_chain(cert_pem, key_pem)
    ctx.load_verify_locations(ca_cert_pem)
    ctx.check_hostname = True
    ctx.minimum_version = ssl.TLSVersion.TLSv1_3
    return ctx

# Server example
server_ctx = create_mtls_server_context(
    "server-cert.pem", "server-key.pem", "ca-cert.pem"
)

# Client example with requests library
import requests

def mtls_request(url: str, client_cert: str, client_key: str, ca_cert: str):
    """Make an mTLS-authenticated request."""
    return requests.get(
        url,
        cert=(client_cert, client_key),  # Client certificate + key
        verify=ca_cert                    # CA certificate to verify server
    )

ACME certificate automation with certbot integration

Automate Let’s Encrypt certificate issuance from Python:

import subprocess
import json
from pathlib import Path

class ACMECertificateManager:
    """Manage Let's Encrypt certificates via certbot."""

    def __init__(self, cert_dir: str = "/etc/letsencrypt"):
        self.cert_dir = Path(cert_dir)

    def obtain_certificate(self, domains: list[str], email: str,
                           method: str = "standalone") -> dict:
        """Obtain a new certificate via ACME challenge."""
        cmd = [
            "certbot", "certonly",
            "--non-interactive",
            "--agree-tos",
            f"--email={email}",
            f"--preferred-challenges={'dns' if method == 'dns' else 'http'}",
            f"--{method}",
        ]
        for domain in domains:
            cmd.extend(["-d", domain])

        result = subprocess.run(cmd, capture_output=True, text=True)
        if result.returncode != 0:
            raise RuntimeError(f"Certbot failed: {result.stderr}")

        primary_domain = domains[0]
        return self._get_cert_paths(primary_domain)

    def renew_all(self) -> list[str]:
        """Renew all certificates that are near expiration."""
        result = subprocess.run(
            ["certbot", "renew", "--non-interactive"],
            capture_output=True, text=True
        )
        renewed = []
        for line in result.stdout.splitlines():
            if "success" in line.lower():
                renewed.append(line.strip())
        return renewed

    def get_certificate_info(self, domain: str) -> dict:
        """Read certificate details for a domain."""
        cert_path = self.cert_dir / "live" / domain / "fullchain.pem"
        if not cert_path.exists():
            raise FileNotFoundError(f"No certificate found for {domain}")

        cert_data = cert_path.read_bytes()
        cert = x509.load_pem_x509_certificate(cert_data)

        return {
            "domain": domain,
            "issuer": cert.issuer.rfc4514_string(),
            "not_before": cert.not_valid_before_utc.isoformat(),
            "not_after": cert.not_valid_after_utc.isoformat(),
            "serial": str(cert.serial_number),
            "days_remaining": (cert.not_valid_after_utc - datetime.now(timezone.utc)).days,
            "sans": [
                name.value for name in
                cert.extensions.get_extension_for_class(
                    x509.SubjectAlternativeName
                ).value
            ]
        }

    def _get_cert_paths(self, domain: str) -> dict:
        live_dir = self.cert_dir / "live" / domain
        return {
            "cert": str(live_dir / "cert.pem"),
            "chain": str(live_dir / "chain.pem"),
            "fullchain": str(live_dir / "fullchain.pem"),
            "privkey": str(live_dir / "privkey.pem"),
        }

Certificate expiration monitoring

Proactively detect certificates approaching expiration across your infrastructure:

import ssl
import socket
from datetime import datetime, timezone
from dataclasses import dataclass
from typing import Optional
from concurrent.futures import ThreadPoolExecutor

@dataclass
class CertificateStatus:
    hostname: str
    port: int
    days_remaining: int
    expiry_date: str
    issuer: str
    subject: str
    error: Optional[str] = None

class CertificateMonitor:
    """Monitor TLS certificate expiration across services."""

    def __init__(self, warning_days: int = 30, critical_days: int = 7):
        self.warning_days = warning_days
        self.critical_days = critical_days

    def check_host(self, hostname: str, port: int = 443,
                   timeout: int = 10) -> CertificateStatus:
        """Check a single host's certificate."""
        try:
            ctx = ssl.create_default_context()
            with socket.create_connection((hostname, port), timeout=timeout) as sock:
                with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
                    cert = ssock.getpeercert()

            expiry_str = cert["notAfter"]
            expiry = datetime.strptime(expiry_str, "%b %d %H:%M:%S %Y %Z").replace(
                tzinfo=timezone.utc
            )
            days_remaining = (expiry - datetime.now(timezone.utc)).days

            issuer = dict(x[0] for x in cert["issuer"])
            subject = dict(x[0] for x in cert["subject"])

            return CertificateStatus(
                hostname=hostname,
                port=port,
                days_remaining=days_remaining,
                expiry_date=expiry.isoformat(),
                issuer=issuer.get("organizationName", "Unknown"),
                subject=subject.get("commonName", hostname),
            )
        except Exception as e:
            return CertificateStatus(
                hostname=hostname, port=port, days_remaining=-1,
                expiry_date="", issuer="", subject="",
                error=str(e)
            )

    def check_multiple(self, hosts: list[tuple[str, int]]) -> list[CertificateStatus]:
        """Check multiple hosts concurrently."""
        with ThreadPoolExecutor(max_workers=20) as pool:
            futures = [pool.submit(self.check_host, h, p) for h, p in hosts]
            return [f.result() for f in futures]

    def generate_report(self, results: list[CertificateStatus]) -> str:
        """Generate a human-readable expiration report."""
        lines = ["Certificate Expiration Report", "=" * 50]

        critical = [r for r in results if 0 <= r.days_remaining <= self.critical_days]
        warning = [r for r in results
                   if self.critical_days < r.days_remaining <= self.warning_days]
        errors = [r for r in results if r.error]

        if critical:
            lines.append(f"\n🔴 CRITICAL ({len(critical)} certificates):")
            for r in critical:
                lines.append(f"  {r.hostname}:{r.port}{r.days_remaining} days remaining")

        if warning:
            lines.append(f"\n🟡 WARNING ({len(warning)} certificates):")
            for r in warning:
                lines.append(f"  {r.hostname}:{r.port}{r.days_remaining} days remaining")

        if errors:
            lines.append(f"\n❌ ERRORS ({len(errors)} hosts):")
            for r in errors:
                lines.append(f"  {r.hostname}:{r.port}{r.error}")

        ok_count = len(results) - len(critical) - len(warning) - len(errors)
        lines.append(f"\n{ok_count} certificates OK")
        return "\n".join(lines)

# Usage
monitor = CertificateMonitor(warning_days=30, critical_days=7)
hosts = [
    ("google.com", 443),
    ("github.com", 443),
    ("api.internal.example.com", 8443),
]
results = monitor.check_multiple(hosts)
print(monitor.generate_report(results))

Certificate pinning for high-security clients

When you need to trust a specific certificate rather than any certificate signed by any CA:

import hashlib
import ssl

def get_certificate_pin(cert_der: bytes) -> str:
    """Compute SPKI pin (base64 SHA-256 of the Subject Public Key Info)."""
    from cryptography.x509 import load_der_x509_certificate
    cert = load_der_x509_certificate(cert_der)
    spki = cert.public_key().public_bytes(
        serialization.Encoding.DER,
        serialization.PublicFormat.SubjectPublicKeyInfo
    )
    digest = hashlib.sha256(spki).digest()
    import base64
    return base64.b64encode(digest).decode()

def pinned_request(hostname: str, expected_pin: str, port: int = 443):
    """Make a connection that verifies the server's certificate pin."""
    ctx = ssl.create_default_context()
    with socket.create_connection((hostname, port)) as sock:
        with ctx.wrap_socket(sock, server_hostname=hostname) as ssock:
            cert_der = ssock.getpeercert(binary_form=True)
            actual_pin = get_certificate_pin(cert_der)

            if actual_pin != expected_pin:
                raise ssl.SSLError(
                    f"Certificate pin mismatch for {hostname}! "
                    f"Expected: {expected_pin}, Got: {actual_pin}"
                )

            return ssock  # Connection is verified and pinned

The one thing to remember: Certificate management in Python spans the full lifecycle — generating keys and CSRs with cryptography, building internal CAs for service-to-service mTLS, automating public certificate renewal via ACME/certbot, and monitoring expiration across your infrastructure to prevent outages.

pythonsecuritycertificatestls

See Also