API Key Management in Python — Deep Dive
API key anatomy
A well-designed API key has structure:
sk_live_EXAMPLE_KEY_REPLACE_ME_1234567890ab
│ │ └── random payload (32 bytes, base64url)
│ └── environment (live/test)
└── key type (sk=secret, pk=publishable)
The prefix is stored in plaintext for identification. The full key is shown to the user exactly once at creation, then only the hash is stored.
import secrets
import hashlib
from datetime import datetime, timezone
class APIKeyGenerator:
def __init__(self, prefix: str = "sk", environment: str = "live"):
self.prefix = prefix
self.environment = environment
def generate(self) -> tuple[str, str, str]:
"""Return (full_key, key_hash, key_prefix_display)."""
random_bytes = secrets.token_urlsafe(32)
full_key = f"{self.prefix}_{self.environment}_{random_bytes}"
key_hash = hashlib.sha256(full_key.encode()).hexdigest()
display_prefix = full_key[:12] + "..."
return full_key, key_hash, display_prefix
Database model
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Index
from sqlalchemy.orm import relationship, DeclarativeBase
from datetime import datetime, timezone
class Base(DeclarativeBase):
pass
class APIKey(Base):
__tablename__ = "api_keys"
id = Column(Integer, primary_key=True)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
name = Column(String(100)) # user-friendly label
key_hash = Column(String(64), unique=True, nullable=False, index=True)
key_prefix = Column(String(20), nullable=False) # "sk_live_2fG8..."
scopes = Column(String(500)) # comma-separated: "read,write"
is_active = Column(Boolean, default=True)
expires_at = Column(DateTime, nullable=True)
last_used_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
created_ip = Column(String(45)) # IPv4 or IPv6
allowed_ips = Column(String(500), nullable=True) # CIDR ranges
owner = relationship("User", back_populates="api_keys")
__table_args__ = (
Index("ix_api_keys_owner_active", "owner_id", "is_active"),
)
Key validation middleware
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException, Depends
from datetime import datetime, timezone
from ipaddress import ip_address, ip_network
app = FastAPI()
async def validate_api_key(request: Request) -> APIKey:
"""FastAPI dependency that validates API keys."""
# Extract key from header
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
raw_key = auth_header[7:]
else:
raw_key = request.headers.get("X-API-Key", "")
if not raw_key:
raise HTTPException(status_code=401, detail="API key required")
# Hash and look up
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
api_key = await get_api_key_by_hash(key_hash)
if not api_key:
raise HTTPException(status_code=401, detail="Invalid API key")
if not api_key.is_active:
raise HTTPException(status_code=403, detail="API key revoked")
if api_key.expires_at and api_key.expires_at < datetime.now(timezone.utc):
raise HTTPException(status_code=403, detail="API key expired")
# IP allowlist check
if api_key.allowed_ips:
client_ip = ip_address(request.client.host)
allowed = [ip_network(cidr.strip()) for cidr in api_key.allowed_ips.split(",")]
if not any(client_ip in network for network in allowed):
raise HTTPException(status_code=403, detail="IP not allowed")
# Update last used timestamp (async, non-blocking)
await update_last_used(api_key.id)
return api_key
@app.get("/api/data")
async def get_data(api_key: APIKey = Depends(validate_api_key)):
if "read" not in api_key.scopes.split(","):
raise HTTPException(status_code=403, detail="Insufficient scope")
return {"data": "sensitive info"}
Scoped permissions
Not all keys should have full access. Implement scoped keys:
from enum import Flag, auto
class APIScope(Flag):
READ = auto()
WRITE = auto()
DELETE = auto()
ADMIN = auto()
# Convenience combinations
READ_WRITE = READ | WRITE
FULL = READ | WRITE | DELETE | ADMIN
class ScopedKeyManager:
def create_key(self, owner_id: int, scopes: APIScope,
name: str = "", expires_days: int = None) -> dict:
generator = APIKeyGenerator()
full_key, key_hash, display = generator.generate()
expiry = None
if expires_days:
expiry = datetime.now(timezone.utc) + timedelta(days=expires_days)
# Store in database
api_key = APIKey(
owner_id=owner_id,
name=name,
key_hash=key_hash,
key_prefix=display,
scopes=",".join(s.name.lower() for s in APIScope if s in scopes),
expires_at=expiry,
)
db.add(api_key)
db.commit()
return {
"key": full_key, # shown only once
"prefix": display,
"scopes": api_key.scopes,
"expires_at": str(expiry) if expiry else None,
}
def check_scope(self, api_key: APIKey, required: APIScope) -> bool:
key_scopes = APIScope(0)
for scope_name in api_key.scopes.split(","):
key_scopes |= APIScope[scope_name.strip().upper()]
return required in key_scopes
Rate limiting per key
Different keys may have different rate limits based on the customer’s plan:
import redis
import time
redis_client = redis.Redis()
class KeyRateLimiter:
def __init__(self):
self.default_limits = {
"free": (100, 3600), # 100 requests per hour
"pro": (1000, 3600), # 1,000 per hour
"enterprise": (10000, 3600), # 10,000 per hour
}
def check(self, key_id: int, plan: str = "free") -> tuple[bool, dict]:
"""Return (allowed, rate_limit_info)."""
max_requests, window = self.default_limits.get(plan, (100, 3600))
redis_key = f"ratelimit:{key_id}:{int(time.time()) // window}"
current = redis_client.incr(redis_key)
if current == 1:
redis_client.expire(redis_key, window)
remaining = max(0, max_requests - current)
allowed = current <= max_requests
return allowed, {
"X-RateLimit-Limit": str(max_requests),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str((int(time.time()) // window + 1) * window),
}
Key rotation workflow
from datetime import timedelta
class KeyRotationManager:
def __init__(self, overlap_hours: int = 48):
self.overlap = timedelta(hours=overlap_hours)
def rotate_key(self, old_key_id: int, owner_id: int) -> dict:
"""Create a new key and schedule old key expiration."""
old_key = get_api_key(old_key_id)
if not old_key or old_key.owner_id != owner_id:
raise ValueError("Key not found or unauthorized")
# Create new key with same scopes
generator = APIKeyGenerator()
full_key, key_hash, display = generator.generate()
new_key = APIKey(
owner_id=owner_id,
name=f"{old_key.name} (rotated)",
key_hash=key_hash,
key_prefix=display,
scopes=old_key.scopes,
allowed_ips=old_key.allowed_ips,
)
db.add(new_key)
# Old key gets a grace period
old_key.expires_at = datetime.now(timezone.utc) + self.overlap
old_key.name = f"{old_key.name} (rotating out)"
db.commit()
return {
"new_key": full_key,
"new_prefix": display,
"old_key_expires": str(old_key.expires_at),
}
def revoke_immediately(self, key_id: int, owner_id: int, reason: str = ""):
"""Instant revocation for compromised keys."""
api_key = get_api_key(key_id)
if not api_key or api_key.owner_id != owner_id:
raise ValueError("Key not found or unauthorized")
api_key.is_active = False
# Audit log
log_event("key_revoked", {
"key_id": key_id,
"key_prefix": api_key.key_prefix,
"reason": reason,
"revoked_at": datetime.now(timezone.utc).isoformat(),
})
db.commit()
Consuming third-party API keys securely
For keys your application uses (Stripe, AWS, etc.):
# Using HashiCorp Vault
import hvac
class VaultSecretManager:
def __init__(self, vault_url: str, token: str):
self.client = hvac.Client(url=vault_url, token=token)
def get_api_key(self, path: str) -> str:
"""Fetch a secret from Vault's KV store."""
secret = self.client.secrets.kv.v2.read_secret_version(path=path)
return secret["data"]["data"]["api_key"]
# Usage
vault = VaultSecretManager("https://vault.internal:8200", os.environ["VAULT_TOKEN"])
stripe_key = vault.get_api_key("services/stripe")
# Using AWS Secrets Manager
import boto3
import json
def get_secret(secret_name: str) -> str:
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=secret_name)
secret = json.loads(response["SecretString"])
return secret["api_key"]
Audit logging
Every key operation should be logged:
from datetime import datetime, timezone
import json
class APIKeyAuditLog:
EVENTS = [
"key_created", "key_rotated", "key_revoked",
"key_used", "key_rate_limited", "key_ip_blocked",
]
def log(self, event: str, details: dict):
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": event,
**details,
}
# Write to your logging infrastructure
# Never log the full key — only the prefix
if "key" in entry:
entry["key"] = entry["key"][:12] + "..."
audit_logger.info(json.dumps(entry))
audit = APIKeyAuditLog()
# Log on every authentication
audit.log("key_used", {
"key_prefix": api_key.key_prefix,
"owner_id": api_key.owner_id,
"endpoint": request.url.path,
"ip": request.client.host,
"status": "success",
})
Preventing key leakage
Automated scanning catches accidental exposure:
import re
# Pre-commit hook pattern matching
API_KEY_PATTERNS = [
r'sk_live_[a-zA-Z0-9_-]{20,}', # Stripe-style live keys
r'sk_test_[a-zA-Z0-9_-]{20,}', # Stripe-style test keys
r'AKIA[0-9A-Z]{16}', # AWS access key IDs
r'ghp_[a-zA-Z0-9]{36,}', # GitHub personal access tokens
r'xoxb-[0-9]+-[a-zA-Z0-9]+', # Slack bot tokens
]
def scan_for_keys(content: str) -> list[str]:
"""Scan text content for potential API key leaks."""
findings = []
for pattern in API_KEY_PATTERNS:
matches = re.findall(pattern, content)
findings.extend(matches)
return findings
# Use in CI/CD pipeline or pre-commit hook
# Also consider: trufflehog, gitleaks, detect-secrets
The one thing to remember: A complete API key system in Python means generating cryptographically random keys with identifiable prefixes, storing only hashes in the database, enforcing scopes and IP restrictions, supporting graceful rotation with overlap periods, and logging every operation — all while preventing the most common failure: accidentally exposing keys in source code.
See Also
- Python Attribute Based Access Control How apps make fine-grained permission decisions based on who you are, what you're accessing, and the circumstances — explained with an airport analogy
- Python Audit Logging Learn Audit Logging with a clear mental model so your Python code is easier to trust and maintain.
- Python Bandit Security Scanning Why Bandit Security Scanning helps Python teams catch painful mistakes early without slowing daily development.
- Python Clickjacking Prevention How invisible website layers trick you into clicking the wrong thing, and how Python apps stop it
- Python Content Security Policy How websites create a guest list for scripts and styles to block hackers from sneaking in malicious code