OAuth2 Integration in Python — Deep Dive
Authorization Code flow implementation
Here’s a complete OAuth2 client using Authlib with FastAPI, integrating Google as the provider:
from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.middleware.sessions import SessionMiddleware
app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="session-secret-change-me")
oauth = OAuth()
oauth.register(
name="google",
client_id="YOUR_CLIENT_ID",
client_secret="YOUR_CLIENT_SECRET",
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_kwargs={"scope": "openid email profile"},
)
@app.get("/login")
async def login(request: Request):
redirect_uri = request.url_for("callback")
return await oauth.google.authorize_redirect(request, redirect_uri)
@app.get("/callback")
async def callback(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get("userinfo")
# Store user in database, create session
return {"email": user_info["email"], "name": user_info["name"]}
Authlib handles the state parameter, PKCE challenge, token exchange, and ID token verification automatically. The server_metadata_url points to the OpenID Connect discovery document, which tells Authlib where all the endpoints are.
The state parameter prevents CSRF
During the redirect to the authorization server, the client includes a random state parameter and stores it in the user’s session. When the authorization server redirects back, it includes the same state. The client verifies they match. Without this, an attacker could craft a URL that links a victim’s account to the attacker’s OAuth provider account.
Authlib generates and validates the state automatically. If you’re building manually with requests-oauthlib, it also handles state, but verify that your session backend persists it correctly.
PKCE implementation details
For public clients (SPAs, mobile apps, CLI tools), PKCE replaces the client secret:
import hashlib
import base64
import secrets
def generate_pkce_pair():
# code_verifier: 43-128 chars, unreserved URI characters
verifier = secrets.token_urlsafe(32) # 43 chars
# code_challenge: SHA256 hash of verifier, base64url encoded
digest = hashlib.sha256(verifier.encode("ascii")).digest()
challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
return verifier, challenge
verifier, challenge = generate_pkce_pair()
# Send challenge with authorization request
# Send verifier with token exchange request
The authorization server stores the challenge. During token exchange, it hashes the verifier and compares. Even if an attacker intercepts the authorization code, they can’t exchange it without the verifier (which never left the client).
Token management in production
Access tokens expire. Your app needs to handle this gracefully:
from datetime import datetime, timezone
from authlib.integrations.httpx_client import AsyncOAuth2Client
class TokenManager:
def __init__(self, client_id: str, client_secret: str, token_endpoint: str):
self.client = AsyncOAuth2Client(
client_id=client_id,
client_secret=client_secret,
token_endpoint=token_endpoint,
)
self._token = None
async def get_token(self) -> str:
if self._token and not self._is_expired():
return self._token["access_token"]
if self._token and self._token.get("refresh_token"):
self._token = await self.client.refresh_token(
self.client.token_endpoint,
refresh_token=self._token["refresh_token"],
)
else:
raise ValueError("No valid token; re-authentication required")
return self._token["access_token"]
def _is_expired(self) -> bool:
expires_at = self._token.get("expires_at", 0)
# Refresh 60 seconds before actual expiry
return datetime.now(timezone.utc).timestamp() > (expires_at - 60)
The 60-second buffer prevents race conditions where the token expires between the check and the API call.
Client Credentials for service-to-service auth
When no user is involved — a cron job calling an API, a microservice authenticating to another microservice — use the Client Credentials grant:
from authlib.integrations.httpx_client import AsyncOAuth2Client
async def get_service_token():
client = AsyncOAuth2Client(
client_id="service-client-id",
client_secret="service-client-secret",
token_endpoint="https://auth.example.com/oauth/token",
)
token = await client.fetch_token(
url="https://auth.example.com/oauth/token",
grant_type="client_credentials",
scope="read:metrics write:alerts",
)
return token["access_token"]
This flow is simple: the service authenticates with its own credentials and receives a token scoped to what it needs. No browser redirects, no user interaction.
Building an OAuth2 provider
If you need to be the authorization server (not just a client), Authlib provides server-side components:
from authlib.integrations.sqla_oauth2 import (
create_save_token_func,
create_query_client_func,
create_bearer_token_validator,
)
from authlib.oauth2.rfc6749 import grants
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post"]
def save_authorization_code(self, code, request):
# Store code in database with client_id, user_id, scope, redirect_uri
auth_code = AuthCode(
code=code,
client_id=request.client.client_id,
user_id=request.user.id,
scope=request.scope,
redirect_uri=request.redirect_uri,
)
db.session.add(auth_code)
db.session.commit()
return auth_code
def query_authorization_code(self, code, client):
return AuthCode.query.filter_by(
code=code, client_id=client.client_id
).first()
def delete_authorization_code(self, authorization_code):
db.session.delete(authorization_code)
db.session.commit()
def authenticate_user(self, authorization_code):
return User.query.get(authorization_code.user_id)
Building a provider is significantly more complex than being a client. You need to handle client registration, scope management, token storage, and revocation. Consider using a dedicated service like Keycloak or Auth0 unless you have specific requirements.
Security hardening checklist
Redirect URI validation. Register exact redirect URIs with the authorization server. Never use wildcard or partial matching. An open redirect lets attackers steal authorization codes.
Token storage. Server-side apps store tokens in encrypted database fields or server-side sessions. Never store tokens in browser localStorage for confidential clients. Use HTTP-only, secure, SameSite cookies.
HTTPS everywhere. OAuth2 security fundamentally depends on transport encryption. Authorization codes, tokens, and client secrets all travel over the network. Without TLS, they’re plaintext.
Scope minimization. Request only the scopes your app actually needs. Review scopes periodically — requirements change, and you might be requesting access you no longer use.
Token rotation. When refreshing, request a new refresh token and invalidate the old one. This limits the window of compromise if a refresh token leaks.
Error handling patterns
OAuth2 defines standard error responses: invalid_request, unauthorized_client, access_denied, unsupported_response_type, invalid_scope, server_error, and temporarily_unavailable.
@app.get("/callback")
async def callback(request: Request):
error = request.query_params.get("error")
if error:
description = request.query_params.get("error_description", "Unknown error")
if error == "access_denied":
return RedirectResponse("/login?msg=permission-denied")
logger.error(f"OAuth error: {error} - {description}")
return RedirectResponse("/login?msg=auth-failed")
token = await oauth.google.authorize_access_token(request)
# ... proceed with user info
Always check for the error parameter before attempting token exchange. Users can deny permission, tokens can expire during slow redirects, and authorization servers can have transient failures.
Multi-provider patterns
Production apps often support multiple OAuth2 providers. Structure your code to handle this:
PROVIDERS = {
"google": {
"server_metadata_url": "https://accounts.google.com/.well-known/openid-configuration",
"client_kwargs": {"scope": "openid email profile"},
},
"github": {
"authorize_url": "https://github.com/login/oauth/authorize",
"access_token_url": "https://github.com/login/oauth/access_token",
"api_base_url": "https://api.github.com/",
"client_kwargs": {"scope": "read:user user:email"},
},
}
for name, config in PROVIDERS.items():
oauth.register(
name=name,
client_id=os.environ[f"{name.upper()}_CLIENT_ID"],
client_secret=os.environ[f"{name.upper()}_CLIENT_SECRET"],
**config,
)
Note that GitHub doesn’t support OpenID Connect discovery, so you specify endpoints manually. Each provider has its own quirks — Google returns user info in the ID token, GitHub requires a separate API call to /user.
The one thing to remember: OAuth2 integration in Python is straightforward with Authlib or requests-oauthlib, but production readiness requires PKCE for public clients, proper state validation, token rotation, redirect URI pinning, and graceful error handling across multiple providers.
See Also
- Python Api Key Management Why apps use special passwords called API keys, and how to keep them safe — explained with a library card analogy
- Python Attribute Based Access Control How apps make fine-grained permission decisions based on who you are, what you're accessing, and the circumstances — explained with an airport analogy
- Python Audit Logging Learn Audit Logging with a clear mental model so your Python code is easier to trust and maintain.
- Python Bandit Security Scanning Why Bandit Security Scanning helps Python teams catch painful mistakes early without slowing daily development.
- Python Clickjacking Prevention How invisible website layers trick you into clicking the wrong thing, and how Python apps stop it