API Key Management in Python — Deep Dive

API key anatomy

A well-designed API key has structure:

sk_live_EXAMPLE_KEY_REPLACE_ME_1234567890ab
│  │     └── random payload (32 bytes, base64url)
│  └── environment (live/test)
└── key type (sk=secret, pk=publishable)

The prefix is stored in plaintext for identification. The full key is shown to the user exactly once at creation, then only the hash is stored.

import secrets
import hashlib
from datetime import datetime, timezone

class APIKeyGenerator:
    def __init__(self, prefix: str = "sk", environment: str = "live"):
        self.prefix = prefix
        self.environment = environment

    def generate(self) -> tuple[str, str, str]:
        """Return (full_key, key_hash, key_prefix_display)."""
        random_bytes = secrets.token_urlsafe(32)
        full_key = f"{self.prefix}_{self.environment}_{random_bytes}"

        key_hash = hashlib.sha256(full_key.encode()).hexdigest()
        display_prefix = full_key[:12] + "..."

        return full_key, key_hash, display_prefix

Database model

from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Index
from sqlalchemy.orm import relationship, DeclarativeBase
from datetime import datetime, timezone

class Base(DeclarativeBase):
    pass

class APIKey(Base):
    __tablename__ = "api_keys"

    id = Column(Integer, primary_key=True)
    owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    name = Column(String(100))  # user-friendly label
    key_hash = Column(String(64), unique=True, nullable=False, index=True)
    key_prefix = Column(String(20), nullable=False)  # "sk_live_2fG8..."
    scopes = Column(String(500))  # comma-separated: "read,write"
    is_active = Column(Boolean, default=True)
    expires_at = Column(DateTime, nullable=True)
    last_used_at = Column(DateTime, nullable=True)
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    created_ip = Column(String(45))  # IPv4 or IPv6
    allowed_ips = Column(String(500), nullable=True)  # CIDR ranges

    owner = relationship("User", back_populates="api_keys")

    __table_args__ = (
        Index("ix_api_keys_owner_active", "owner_id", "is_active"),
    )

Key validation middleware

import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Depends
from datetime import datetime, timezone
from ipaddress import ip_address, ip_network

app = FastAPI()

async def validate_api_key(request: Request) -> APIKey:
    """FastAPI dependency that validates API keys."""
    # Extract key from header
    auth_header = request.headers.get("Authorization", "")
    if auth_header.startswith("Bearer "):
        raw_key = auth_header[7:]
    else:
        raw_key = request.headers.get("X-API-Key", "")

    if not raw_key:
        raise HTTPException(status_code=401, detail="API key required")

    # Hash and look up
    key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
    api_key = await get_api_key_by_hash(key_hash)

    if not api_key:
        raise HTTPException(status_code=401, detail="Invalid API key")

    if not api_key.is_active:
        raise HTTPException(status_code=403, detail="API key revoked")

    if api_key.expires_at and api_key.expires_at < datetime.now(timezone.utc):
        raise HTTPException(status_code=403, detail="API key expired")

    # IP allowlist check
    if api_key.allowed_ips:
        client_ip = ip_address(request.client.host)
        allowed = [ip_network(cidr.strip()) for cidr in api_key.allowed_ips.split(",")]
        if not any(client_ip in network for network in allowed):
            raise HTTPException(status_code=403, detail="IP not allowed")

    # Update last used timestamp (async, non-blocking)
    await update_last_used(api_key.id)

    return api_key

@app.get("/api/data")
async def get_data(api_key: APIKey = Depends(validate_api_key)):
    if "read" not in api_key.scopes.split(","):
        raise HTTPException(status_code=403, detail="Insufficient scope")
    return {"data": "sensitive info"}

Scoped permissions

Not all keys should have full access. Implement scoped keys:

from enum import Flag, auto

class APIScope(Flag):
    READ = auto()
    WRITE = auto()
    DELETE = auto()
    ADMIN = auto()

    # Convenience combinations
    READ_WRITE = READ | WRITE
    FULL = READ | WRITE | DELETE | ADMIN

class ScopedKeyManager:
    def create_key(self, owner_id: int, scopes: APIScope,
                   name: str = "", expires_days: int = None) -> dict:
        generator = APIKeyGenerator()
        full_key, key_hash, display = generator.generate()

        expiry = None
        if expires_days:
            expiry = datetime.now(timezone.utc) + timedelta(days=expires_days)

        # Store in database
        api_key = APIKey(
            owner_id=owner_id,
            name=name,
            key_hash=key_hash,
            key_prefix=display,
            scopes=",".join(s.name.lower() for s in APIScope if s in scopes),
            expires_at=expiry,
        )
        db.add(api_key)
        db.commit()

        return {
            "key": full_key,  # shown only once
            "prefix": display,
            "scopes": api_key.scopes,
            "expires_at": str(expiry) if expiry else None,
        }

    def check_scope(self, api_key: APIKey, required: APIScope) -> bool:
        key_scopes = APIScope(0)
        for scope_name in api_key.scopes.split(","):
            key_scopes |= APIScope[scope_name.strip().upper()]
        return required in key_scopes

Rate limiting per key

Different keys may have different rate limits based on the customer’s plan:

import redis
import time

redis_client = redis.Redis()

class KeyRateLimiter:
    def __init__(self):
        self.default_limits = {
            "free": (100, 3600),      # 100 requests per hour
            "pro": (1000, 3600),      # 1,000 per hour
            "enterprise": (10000, 3600),  # 10,000 per hour
        }

    def check(self, key_id: int, plan: str = "free") -> tuple[bool, dict]:
        """Return (allowed, rate_limit_info)."""
        max_requests, window = self.default_limits.get(plan, (100, 3600))
        redis_key = f"ratelimit:{key_id}:{int(time.time()) // window}"

        current = redis_client.incr(redis_key)
        if current == 1:
            redis_client.expire(redis_key, window)

        remaining = max(0, max_requests - current)
        allowed = current <= max_requests

        return allowed, {
            "X-RateLimit-Limit": str(max_requests),
            "X-RateLimit-Remaining": str(remaining),
            "X-RateLimit-Reset": str((int(time.time()) // window + 1) * window),
        }

Key rotation workflow

from datetime import timedelta

class KeyRotationManager:
    def __init__(self, overlap_hours: int = 48):
        self.overlap = timedelta(hours=overlap_hours)

    def rotate_key(self, old_key_id: int, owner_id: int) -> dict:
        """Create a new key and schedule old key expiration."""
        old_key = get_api_key(old_key_id)
        if not old_key or old_key.owner_id != owner_id:
            raise ValueError("Key not found or unauthorized")

        # Create new key with same scopes
        generator = APIKeyGenerator()
        full_key, key_hash, display = generator.generate()

        new_key = APIKey(
            owner_id=owner_id,
            name=f"{old_key.name} (rotated)",
            key_hash=key_hash,
            key_prefix=display,
            scopes=old_key.scopes,
            allowed_ips=old_key.allowed_ips,
        )
        db.add(new_key)

        # Old key gets a grace period
        old_key.expires_at = datetime.now(timezone.utc) + self.overlap
        old_key.name = f"{old_key.name} (rotating out)"

        db.commit()

        return {
            "new_key": full_key,
            "new_prefix": display,
            "old_key_expires": str(old_key.expires_at),
        }

    def revoke_immediately(self, key_id: int, owner_id: int, reason: str = ""):
        """Instant revocation for compromised keys."""
        api_key = get_api_key(key_id)
        if not api_key or api_key.owner_id != owner_id:
            raise ValueError("Key not found or unauthorized")

        api_key.is_active = False

        # Audit log
        log_event("key_revoked", {
            "key_id": key_id,
            "key_prefix": api_key.key_prefix,
            "reason": reason,
            "revoked_at": datetime.now(timezone.utc).isoformat(),
        })

        db.commit()

Consuming third-party API keys securely

For keys your application uses (Stripe, AWS, etc.):

# Using HashiCorp Vault
import hvac

class VaultSecretManager:
    def __init__(self, vault_url: str, token: str):
        self.client = hvac.Client(url=vault_url, token=token)

    def get_api_key(self, path: str) -> str:
        """Fetch a secret from Vault's KV store."""
        secret = self.client.secrets.kv.v2.read_secret_version(path=path)
        return secret["data"]["data"]["api_key"]

# Usage
vault = VaultSecretManager("https://vault.internal:8200", os.environ["VAULT_TOKEN"])
stripe_key = vault.get_api_key("services/stripe")

# Using AWS Secrets Manager
import boto3
import json

def get_secret(secret_name: str) -> str:
    client = boto3.client("secretsmanager")
    response = client.get_secret_value(SecretId=secret_name)
    secret = json.loads(response["SecretString"])
    return secret["api_key"]

Audit logging

Every key operation should be logged:

from datetime import datetime, timezone
import json

class APIKeyAuditLog:
    EVENTS = [
        "key_created", "key_rotated", "key_revoked",
        "key_used", "key_rate_limited", "key_ip_blocked",
    ]

    def log(self, event: str, details: dict):
        entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "event": event,
            **details,
        }
        # Write to your logging infrastructure
        # Never log the full key — only the prefix
        if "key" in entry:
            entry["key"] = entry["key"][:12] + "..."

        audit_logger.info(json.dumps(entry))

audit = APIKeyAuditLog()

# Log on every authentication
audit.log("key_used", {
    "key_prefix": api_key.key_prefix,
    "owner_id": api_key.owner_id,
    "endpoint": request.url.path,
    "ip": request.client.host,
    "status": "success",
})

Preventing key leakage

Automated scanning catches accidental exposure:

import re

# Pre-commit hook pattern matching
API_KEY_PATTERNS = [
    r'sk_live_[a-zA-Z0-9_-]{20,}',    # Stripe-style live keys
    r'sk_test_[a-zA-Z0-9_-]{20,}',    # Stripe-style test keys
    r'AKIA[0-9A-Z]{16}',               # AWS access key IDs
    r'ghp_[a-zA-Z0-9]{36,}',           # GitHub personal access tokens
    r'xoxb-[0-9]+-[a-zA-Z0-9]+',       # Slack bot tokens
]

def scan_for_keys(content: str) -> list[str]:
    """Scan text content for potential API key leaks."""
    findings = []
    for pattern in API_KEY_PATTERNS:
        matches = re.findall(pattern, content)
        findings.extend(matches)
    return findings

# Use in CI/CD pipeline or pre-commit hook
# Also consider: trufflehog, gitleaks, detect-secrets

The one thing to remember: A complete API key system in Python means generating cryptographically random keys with identifiable prefixes, storing only hashes in the database, enforcing scopes and IP restrictions, supporting graceful rotation with overlap periods, and logging every operation — all while preventing the most common failure: accidentally exposing keys in source code.

pythonsecurityauthenticationapi

See Also