Key Management Practices in Python — Deep Dive

Secure key generation with the cryptography library

The cryptography library provides proper key generation for all common algorithms:

from cryptography.hazmat.primitives.asymmetric import rsa, ec, ed25519
from cryptography.fernet import Fernet
import os
import secrets

# Symmetric key generation
aes_key = os.urandom(32)             # 256-bit AES key from OS CSPRNG
fernet_key = Fernet.generate_key()    # Fernet-formatted 256-bit key
hmac_key = secrets.token_bytes(32)    # 256-bit HMAC key

# RSA key pair
rsa_private = rsa.generate_private_key(
    public_exponent=65537,
    key_size=4096   # 2048 minimum, 4096 recommended
)
rsa_public = rsa_private.public_key()

# Elliptic curve key pair (P-256)
ec_private = ec.generate_private_key(ec.SECP256R1())
ec_public = ec_private.public_key()

# Ed25519 key pair (modern, fast)
ed_private = ed25519.Ed25519PrivateKey.generate()
ed_public = ed_private.public_key()

Never use random.Random() for key material — it’s a Mersenne Twister PRNG that’s predictable given 624 outputs.

Key derivation from passwords

When encryption keys must derive from user passwords, use a proper key derivation function (KDF) that’s intentionally slow:

from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
from cryptography.hazmat.primitives import hashes
import base64

def derive_key_pbkdf2(password: str, salt: bytes = None,
                       iterations: int = 600_000) -> tuple[bytes, bytes]:
    """Derive a 256-bit key from a password using PBKDF2."""
    if salt is None:
        salt = os.urandom(16)

    kdf = PBKDF2HMAC(
        algorithm=hashes.SHA256(),
        length=32,
        salt=salt,
        iterations=iterations  # OWASP recommends 600,000 for SHA-256
    )
    key = kdf.derive(password.encode())
    return key, salt

def derive_key_scrypt(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
    """Derive key using scrypt (memory-hard, resistant to GPU attacks)."""
    if salt is None:
        salt = os.urandom(16)

    kdf = Scrypt(
        salt=salt,
        length=32,
        n=2**17,     # CPU/memory cost (131072)
        r=8,         # Block size
        p=1          # Parallelization
    )
    key = kdf.derive(password.encode())
    return key, salt

# For Argon2 (recommended for new applications), use argon2-cffi:
# pip install argon2-cffi
from argon2.low_level import hash_secret_raw, Type

def derive_key_argon2(password: str, salt: bytes = None) -> tuple[bytes, bytes]:
    """Derive key using Argon2id (winner of the Password Hashing Competition)."""
    if salt is None:
        salt = os.urandom(16)

    key = hash_secret_raw(
        secret=password.encode(),
        salt=salt,
        time_cost=3,        # Iterations
        memory_cost=65536,   # 64 MB
        parallelism=4,
        hash_len=32,
        type=Type.ID         # Argon2id — hybrid of Argon2i and Argon2d
    )
    return key, salt

Implementing envelope encryption

Envelope encryption wraps a data encryption key (DEK) with a key encryption key (KEK):

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import json
import base64
import os

class EnvelopeEncryption:
    """Encrypt data with a random DEK, then wrap the DEK with a KEK."""

    def __init__(self, kek: bytes):
        """Initialize with a 256-bit Key Encryption Key."""
        self.kek_cipher = AESGCM(kek)

    def encrypt(self, plaintext: bytes) -> dict:
        """Encrypt data using envelope encryption."""
        # Generate random DEK
        dek = AESGCM.generate_key(bit_length=256)
        dek_cipher = AESGCM(dek)

        # Encrypt data with DEK
        data_nonce = os.urandom(12)
        ciphertext = dek_cipher.encrypt(data_nonce, plaintext, None)

        # Wrap DEK with KEK
        dek_nonce = os.urandom(12)
        wrapped_dek = self.kek_cipher.encrypt(dek_nonce, dek, None)

        return {
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "data_nonce": base64.b64encode(data_nonce).decode(),
            "wrapped_dek": base64.b64encode(wrapped_dek).decode(),
            "dek_nonce": base64.b64encode(dek_nonce).decode(),
            "key_version": 1
        }

    def decrypt(self, envelope: dict) -> bytes:
        """Decrypt envelope-encrypted data."""
        # Unwrap DEK
        wrapped_dek = base64.b64decode(envelope["wrapped_dek"])
        dek_nonce = base64.b64decode(envelope["dek_nonce"])
        dek = self.kek_cipher.decrypt(dek_nonce, wrapped_dek, None)

        # Decrypt data
        ciphertext = base64.b64decode(envelope["ciphertext"])
        data_nonce = base64.b64decode(envelope["data_nonce"])
        dek_cipher = AESGCM(dek)
        return dek_cipher.decrypt(data_nonce, ciphertext, None)

# Usage
kek = AESGCM.generate_key(bit_length=256)
envelope = EnvelopeEncryption(kek)
encrypted = envelope.encrypt(b"sensitive patient record")
decrypted = envelope.decrypt(encrypted)
assert decrypted == b"sensitive patient record"

AWS KMS integration for master key management

Using AWS KMS as the top of the key hierarchy — the master key never leaves AWS hardware:

import boto3
import base64

class AWSKMSKeyManager:
    """Use AWS KMS for envelope encryption with cloud-managed master keys."""

    def __init__(self, kms_key_id: str, region: str = "us-east-1"):
        self.kms = boto3.client("kms", region_name=region)
        self.key_id = kms_key_id

    def generate_data_key(self) -> dict:
        """Generate a DEK encrypted under the KMS master key."""
        response = self.kms.generate_data_key(
            KeyId=self.key_id,
            KeySpec="AES_256"
        )
        return {
            "plaintext_key": response["Plaintext"],       # Use immediately, then discard
            "encrypted_key": response["CiphertextBlob"]   # Store alongside encrypted data
        }

    def encrypt_with_data_key(self, plaintext: bytes) -> dict:
        """Full envelope encryption using KMS-generated DEK."""
        # Get DEK from KMS
        data_key = self.generate_data_key()

        # Encrypt data with plaintext DEK
        dek_cipher = AESGCM(data_key["plaintext_key"])
        nonce = os.urandom(12)
        ciphertext = dek_cipher.encrypt(nonce, plaintext, None)

        # Return encrypted data + encrypted DEK (plaintext DEK is discarded)
        return {
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "nonce": base64.b64encode(nonce).decode(),
            "encrypted_dek": base64.b64encode(data_key["encrypted_key"]).decode()
        }

    def decrypt_with_data_key(self, envelope: dict) -> bytes:
        """Decrypt using KMS to unwrap the DEK."""
        # Ask KMS to decrypt the DEK
        encrypted_dek = base64.b64decode(envelope["encrypted_dek"])
        response = self.kms.decrypt(
            CiphertextBlob=encrypted_dek,
            KeyId=self.key_id
        )
        plaintext_dek = response["Plaintext"]

        # Decrypt data with the unwrapped DEK
        ciphertext = base64.b64decode(envelope["ciphertext"])
        nonce = base64.b64decode(envelope["nonce"])
        dek_cipher = AESGCM(plaintext_dek)
        return dek_cipher.decrypt(nonce, ciphertext, None)

    def rotate_master_key(self):
        """Enable automatic annual rotation of the KMS master key."""
        self.kms.enable_key_rotation(KeyId=self.key_id)
        print(f"Automatic rotation enabled for {self.key_id}")

Key rotation with versioning

Implementing a key rotation system that handles multiple active key versions:

from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class KeyVersion:
    version: int
    key: bytes
    created_at: datetime
    retired_at: Optional[datetime] = None
    destroyed_at: Optional[datetime] = None

class KeyRotationManager:
    """Manage multiple key versions with rotation and retirement."""

    def __init__(self):
        self.versions: dict[int, KeyVersion] = {}
        self.current_version: int = 0

    def create_initial_key(self) -> int:
        """Generate the first key version."""
        key = AESGCM.generate_key(bit_length=256)
        self.current_version = 1
        self.versions[1] = KeyVersion(
            version=1, key=key, created_at=datetime.utcnow()
        )
        return 1

    def rotate(self) -> int:
        """Create a new key version and retire the old one."""
        new_version = self.current_version + 1
        new_key = AESGCM.generate_key(bit_length=256)

        # Retire current key (still available for decryption)
        self.versions[self.current_version].retired_at = datetime.utcnow()

        # Install new key
        self.versions[new_version] = KeyVersion(
            version=new_version, key=new_key, created_at=datetime.utcnow()
        )
        self.current_version = new_version
        print(f"Rotated to version {new_version}")
        return new_version

    def get_encryption_key(self) -> tuple[bytes, int]:
        """Get current key for encryption. Returns (key, version)."""
        kv = self.versions[self.current_version]
        return kv.key, kv.version

    def get_decryption_key(self, version: int) -> bytes:
        """Get specific version for decryption."""
        kv = self.versions.get(version)
        if not kv:
            raise KeyError(f"Key version {version} not found")
        if kv.destroyed_at:
            raise KeyError(f"Key version {version} has been destroyed")
        return kv.key

    def destroy_old_versions(self, max_age_days: int = 365):
        """Destroy key versions older than threshold."""
        cutoff = datetime.utcnow() - timedelta(days=max_age_days)
        for version, kv in self.versions.items():
            if (version != self.current_version
                    and kv.retired_at
                    and kv.retired_at < cutoff
                    and not kv.destroyed_at):
                # Overwrite key material
                kv.key = b'\x00' * len(kv.key)
                kv.destroyed_at = datetime.utcnow()
                print(f"Destroyed key version {version}")

    def encrypt(self, plaintext: bytes) -> dict:
        """Encrypt with current key version."""
        key, version = self.get_encryption_key()
        cipher = AESGCM(key)
        nonce = os.urandom(12)
        ciphertext = cipher.encrypt(nonce, plaintext, None)
        return {
            "ciphertext": base64.b64encode(ciphertext).decode(),
            "nonce": base64.b64encode(nonce).decode(),
            "key_version": version
        }

    def decrypt(self, envelope: dict) -> bytes:
        """Decrypt using the version specified in the envelope."""
        key = self.get_decryption_key(envelope["key_version"])
        cipher = AESGCM(key)
        ciphertext = base64.b64decode(envelope["ciphertext"])
        nonce = base64.b64decode(envelope["nonce"])
        return cipher.decrypt(nonce, ciphertext, None)

Secure key serialization and export

When keys must be serialized (for backup or transport), use encrypted formats:

from cryptography.hazmat.primitives import serialization

def export_private_key_encrypted(private_key, password: str) -> bytes:
    """Export private key encrypted with a password."""
    return private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.BestAvailableEncryption(password.encode())
    )

def import_private_key_encrypted(pem_data: bytes, password: str):
    """Import password-encrypted private key."""
    return serialization.load_pem_private_key(
        pem_data,
        password=password.encode()
    )

# For symmetric keys, use a KEK or KMS rather than password-based encryption
def export_symmetric_key(dek: bytes, kek: bytes) -> dict:
    """Export symmetric key wrapped with a KEK."""
    cipher = AESGCM(kek)
    nonce = os.urandom(12)
    wrapped = cipher.encrypt(nonce, dek, None)
    return {
        "wrapped_key": base64.b64encode(wrapped).decode(),
        "nonce": base64.b64encode(nonce).decode()
    }

Memory safety — clearing key material

Keys in memory can be read through memory dumps, swap files, or heap inspection. While Python’s garbage collector makes deterministic clearing difficult, minimize exposure:

import ctypes

def secure_clear(buffer: bytearray):
    """Overwrite buffer contents in memory."""
    ctypes.memset(ctypes.addressof((ctypes.c_char * len(buffer)).from_buffer(buffer)), 0, len(buffer))

# Use bytearrays (mutable) instead of bytes (immutable) for key material
key = bytearray(os.urandom(32))
try:
    cipher = AESGCM(bytes(key))
    # ... use cipher ...
finally:
    secure_clear(key)  # Overwrite key material

# Note: Python bytes objects are immutable and can't be reliably cleared.
# For high-security applications, use cffi/ctypes to manage key memory in C,
# or use a KMS where plaintext keys never exist in your process.

The one thing to remember: Production key management in Python combines proper generation (os.urandom/secrets), envelope encryption (DEKs wrapped by KEKs), cloud KMS integration for master keys that never leave hardware, and versioned rotation that allows decryption with old keys while encrypting only with the current one.

pythonsecuritykey-managementcryptography

See Also