FastAPI Security & OAuth — Deep Dive
FastAPI’s security architecture
FastAPI’s security system is built on three layers:
- Security schemes: Classes like
OAuth2PasswordBearer,APIKeyHeader,HTTPBearerthat define how credentials are extracted from requests - Dependencies: Functions that use security schemes to validate credentials and return user objects
- OpenAPI integration: Security schemes automatically appear in the generated documentation, enabling “Authorize” buttons in Swagger UI
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/auth/token",
scopes={
"read:items": "Read items",
"write:items": "Create and update items",
"admin": "Full administrative access",
}
)
async def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db),
):
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": f'Bearer scope="{security_scopes.scope_str}"'},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
token_scopes = payload.get("scopes", [])
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = db.query(User).filter(User.username == username).first()
if user is None:
raise credentials_exception
# Verify required scopes
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=403,
detail=f"Missing required scope: {scope}",
)
return user
The SecurityScopes parameter is injected by FastAPI when you use Security() instead of Depends(). It carries the scopes required by the current route, enabling a single dependency to handle multiple authorization levels.
JWT implementation details
Token structure
from datetime import datetime, timedelta, timezone
from jose import jwt
SECRET_KEY = "your-256-bit-secret" # In production: load from env/vault
ALGORITHM = "HS256"
def create_access_token(data: dict, expires_delta: timedelta = timedelta(minutes=30)):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
"jti": str(uuid.uuid4()), # Unique token ID for revocation
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
The jti (JWT ID) claim enables token revocation — store revoked JTIs in a fast lookup (Redis) and check during validation.
Symmetric vs asymmetric signing
HS256 (symmetric): Same secret key signs and verifies. Simple. Both the issuer and validator must have the secret. Fine for monolithic apps.
RS256 (asymmetric): Private key signs, public key verifies. The API server only needs the public key. The auth server keeps the private key. Essential for microservices where you don’t want every service holding the signing key.
# RS256 token creation (auth service)
private_key = open("private.pem").read()
token = jwt.encode(payload, private_key, algorithm="RS256")
# RS256 token validation (any service)
public_key = open("public.pem").read()
payload = jwt.decode(token, public_key, algorithms=["RS256"])
In microservice architectures, the auth service publishes its public key at a JWKS (JSON Web Key Set) endpoint. Other services fetch and cache this key to validate tokens without shared secrets.
Token revocation strategies
JWTs are stateless by design — there’s no built-in way to invalidate them before expiry. Production systems need revocation:
Token blacklist (Redis):
import redis
revocation_store = redis.Redis()
async def revoke_token(jti: str, exp: datetime):
ttl = int((exp - datetime.now(timezone.utc)).total_seconds())
revocation_store.setex(f"revoked:{jti}", ttl, "1")
async def is_token_revoked(jti: str) -> bool:
return revocation_store.exists(f"revoked:{jti}")
# In get_current_user:
jti = payload.get("jti")
if await is_token_revoked(jti):
raise credentials_exception
The TTL ensures blacklist entries expire when the token would have expired anyway, keeping the store clean.
Token version (database):
# User model has a token_version field
class User(Base):
token_version: int = Column(Integer, default=0)
# Token includes the version
token_data = {"sub": user.username, "tv": user.token_version}
# Validation checks version matches
if payload.get("tv") != user.token_version:
raise credentials_exception
# To revoke all tokens: increment version
user.token_version += 1
db.commit()
Incrementing token_version invalidates all existing tokens for that user. This adds a database lookup per request but gives complete revocation control.
OAuth2 Authorization Code Flow with PKCE
For SPAs and mobile apps, the Authorization Code flow with PKCE (Proof Key for Code Exchange) is the recommended approach:
import hashlib
import base64
import secrets
# Client generates a code verifier and challenge
code_verifier = secrets.token_urlsafe(64)
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b"=").decode()
# Step 1: Redirect user to provider
# GET /authorize?response_type=code&client_id=...&code_challenge=...&code_challenge_method=S256
# Step 2: Provider redirects back with authorization code
# GET /callback?code=AUTH_CODE
# Step 3: Exchange code for token (server-side)
@app.get("/auth/callback")
async def oauth_callback(code: str, state: str):
# Verify state matches what we sent
# Exchange code + code_verifier for tokens
token_response = await httpx_client.post(
"https://provider.com/oauth/token",
data={
"grant_type": "authorization_code",
"code": code,
"code_verifier": code_verifier, # Retrieved from session
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
}
)
tokens = token_response.json()
# Create local user session
user = await get_or_create_user(tokens["id_token"])
local_token = create_access_token({"sub": user.username})
return {"access_token": local_token}
PKCE prevents authorization code interception attacks. The code verifier proves that the client requesting the token is the same one that initiated the flow.
Multi-provider authentication
Supporting Google, GitHub, and email/password simultaneously:
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name="google",
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
oauth.register(
name="github",
client_id=GITHUB_CLIENT_ID,
client_secret=GITHUB_CLIENT_SECRET,
authorize_url="https://github.com/login/oauth/authorize",
access_token_url="https://github.com/login/oauth/access_token",
client_kwargs={"scope": "user:email"},
)
@app.get("/auth/{provider}/login")
async def oauth_login(provider: str, request: Request):
client = oauth.create_client(provider)
redirect_uri = f"{BASE_URL}/auth/{provider}/callback"
return await client.authorize_redirect(request, redirect_uri)
@app.get("/auth/{provider}/callback")
async def oauth_callback(provider: str, request: Request):
client = oauth.create_client(provider)
token = await client.authorize_access_token(request)
if provider == "google":
user_info = token.get("userinfo")
elif provider == "github":
resp = await client.get("https://api.github.com/user", token=token)
user_info = resp.json()
# Normalize user info and create/update local user
user = await upsert_user(
email=user_info["email"],
provider=provider,
provider_id=str(user_info["id"]),
)
local_token = create_access_token({"sub": user.username})
# Redirect to frontend with token
return RedirectResponse(f"{FRONTEND_URL}/auth?token={local_token}")
Key architectural decision: always issue your own JWTs after provider authentication. Don’t use the provider’s access token for your API. This decouples your auth system from any single provider.
Rate limiting login endpoints
Login endpoints are brute-force targets. Implement rate limiting:
from fastapi import Request
from datetime import datetime
login_attempts: dict[str, list[datetime]] = {}
async def check_login_rate(request: Request):
ip = request.client.host
now = datetime.now(timezone.utc)
window = timedelta(minutes=15)
attempts = login_attempts.get(ip, [])
attempts = [t for t in attempts if now - t < window]
if len(attempts) >= 5:
raise HTTPException(
status_code=429,
detail="Too many login attempts. Try again in 15 minutes.",
headers={"Retry-After": "900"},
)
attempts.append(now)
login_attempts[ip] = attempts
@app.post("/auth/token", dependencies=[Depends(check_login_rate)])
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
...
For production, store attempt counts in Redis instead of in-memory dictionaries. In-memory stores don’t survive restarts and don’t work across multiple workers.
Security headers
Add security headers via middleware to protect against common attacks:
@app.middleware("http")
async def security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Cache-Control"] = "no-store" # For auth endpoints
return response
The Cache-Control: no-store header on authentication endpoints prevents browsers and proxies from caching tokens or login responses.
Password hashing
Never store plaintext passwords. Use bcrypt via passlib:
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
Bcrypt is intentionally slow (~100ms per hash), which makes brute-force attacks impractical. The deprecated="auto" setting lets you migrate to newer hash algorithms transparently — old hashes still verify, but new passwords use the latest scheme.
The one thing to remember: Production OAuth2 in FastAPI requires asymmetric JWT signing for microservices, token revocation via Redis blacklists or versioning, PKCE for browser/mobile flows, rate-limited login endpoints, and your own JWT issuance even when using third-party providers — security is a layered system where each piece covers a different attack vector.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.