OAuth2 Integration in Python — Deep Dive

Authorization Code flow implementation

Here’s a complete OAuth2 client using Authlib with FastAPI, integrating Google as the provider:

from fastapi import FastAPI, Request
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.middleware.sessions import SessionMiddleware

app = FastAPI()
app.add_middleware(SessionMiddleware, secret_key="session-secret-change-me")

oauth = OAuth()
oauth.register(
    name="google",
    client_id="YOUR_CLIENT_ID",
    client_secret="YOUR_CLIENT_SECRET",
    server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
    client_kwargs={"scope": "openid email profile"},
)

@app.get("/login")
async def login(request: Request):
    redirect_uri = request.url_for("callback")
    return await oauth.google.authorize_redirect(request, redirect_uri)

@app.get("/callback")
async def callback(request: Request):
    token = await oauth.google.authorize_access_token(request)
    user_info = token.get("userinfo")
    # Store user in database, create session
    return {"email": user_info["email"], "name": user_info["name"]}

Authlib handles the state parameter, PKCE challenge, token exchange, and ID token verification automatically. The server_metadata_url points to the OpenID Connect discovery document, which tells Authlib where all the endpoints are.

The state parameter prevents CSRF

During the redirect to the authorization server, the client includes a random state parameter and stores it in the user’s session. When the authorization server redirects back, it includes the same state. The client verifies they match. Without this, an attacker could craft a URL that links a victim’s account to the attacker’s OAuth provider account.

Authlib generates and validates the state automatically. If you’re building manually with requests-oauthlib, it also handles state, but verify that your session backend persists it correctly.

PKCE implementation details

For public clients (SPAs, mobile apps, CLI tools), PKCE replaces the client secret:

import hashlib
import base64
import secrets

def generate_pkce_pair():
    # code_verifier: 43-128 chars, unreserved URI characters
    verifier = secrets.token_urlsafe(32)  # 43 chars

    # code_challenge: SHA256 hash of verifier, base64url encoded
    digest = hashlib.sha256(verifier.encode("ascii")).digest()
    challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")

    return verifier, challenge

verifier, challenge = generate_pkce_pair()
# Send challenge with authorization request
# Send verifier with token exchange request

The authorization server stores the challenge. During token exchange, it hashes the verifier and compares. Even if an attacker intercepts the authorization code, they can’t exchange it without the verifier (which never left the client).

Token management in production

Access tokens expire. Your app needs to handle this gracefully:

from datetime import datetime, timezone
from authlib.integrations.httpx_client import AsyncOAuth2Client

class TokenManager:
    def __init__(self, client_id: str, client_secret: str, token_endpoint: str):
        self.client = AsyncOAuth2Client(
            client_id=client_id,
            client_secret=client_secret,
            token_endpoint=token_endpoint,
        )
        self._token = None

    async def get_token(self) -> str:
        if self._token and not self._is_expired():
            return self._token["access_token"]
        if self._token and self._token.get("refresh_token"):
            self._token = await self.client.refresh_token(
                self.client.token_endpoint,
                refresh_token=self._token["refresh_token"],
            )
        else:
            raise ValueError("No valid token; re-authentication required")
        return self._token["access_token"]

    def _is_expired(self) -> bool:
        expires_at = self._token.get("expires_at", 0)
        # Refresh 60 seconds before actual expiry
        return datetime.now(timezone.utc).timestamp() > (expires_at - 60)

The 60-second buffer prevents race conditions where the token expires between the check and the API call.

Client Credentials for service-to-service auth

When no user is involved — a cron job calling an API, a microservice authenticating to another microservice — use the Client Credentials grant:

from authlib.integrations.httpx_client import AsyncOAuth2Client

async def get_service_token():
    client = AsyncOAuth2Client(
        client_id="service-client-id",
        client_secret="service-client-secret",
        token_endpoint="https://auth.example.com/oauth/token",
    )
    token = await client.fetch_token(
        url="https://auth.example.com/oauth/token",
        grant_type="client_credentials",
        scope="read:metrics write:alerts",
    )
    return token["access_token"]

This flow is simple: the service authenticates with its own credentials and receives a token scoped to what it needs. No browser redirects, no user interaction.

Building an OAuth2 provider

If you need to be the authorization server (not just a client), Authlib provides server-side components:

from authlib.integrations.sqla_oauth2 import (
    create_save_token_func,
    create_query_client_func,
    create_bearer_token_validator,
)
from authlib.oauth2.rfc6749 import grants

class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
    TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post"]

    def save_authorization_code(self, code, request):
        # Store code in database with client_id, user_id, scope, redirect_uri
        auth_code = AuthCode(
            code=code,
            client_id=request.client.client_id,
            user_id=request.user.id,
            scope=request.scope,
            redirect_uri=request.redirect_uri,
        )
        db.session.add(auth_code)
        db.session.commit()
        return auth_code

    def query_authorization_code(self, code, client):
        return AuthCode.query.filter_by(
            code=code, client_id=client.client_id
        ).first()

    def delete_authorization_code(self, authorization_code):
        db.session.delete(authorization_code)
        db.session.commit()

    def authenticate_user(self, authorization_code):
        return User.query.get(authorization_code.user_id)

Building a provider is significantly more complex than being a client. You need to handle client registration, scope management, token storage, and revocation. Consider using a dedicated service like Keycloak or Auth0 unless you have specific requirements.

Security hardening checklist

Redirect URI validation. Register exact redirect URIs with the authorization server. Never use wildcard or partial matching. An open redirect lets attackers steal authorization codes.

Token storage. Server-side apps store tokens in encrypted database fields or server-side sessions. Never store tokens in browser localStorage for confidential clients. Use HTTP-only, secure, SameSite cookies.

HTTPS everywhere. OAuth2 security fundamentally depends on transport encryption. Authorization codes, tokens, and client secrets all travel over the network. Without TLS, they’re plaintext.

Scope minimization. Request only the scopes your app actually needs. Review scopes periodically — requirements change, and you might be requesting access you no longer use.

Token rotation. When refreshing, request a new refresh token and invalidate the old one. This limits the window of compromise if a refresh token leaks.

Error handling patterns

OAuth2 defines standard error responses: invalid_request, unauthorized_client, access_denied, unsupported_response_type, invalid_scope, server_error, and temporarily_unavailable.

@app.get("/callback")
async def callback(request: Request):
    error = request.query_params.get("error")
    if error:
        description = request.query_params.get("error_description", "Unknown error")
        if error == "access_denied":
            return RedirectResponse("/login?msg=permission-denied")
        logger.error(f"OAuth error: {error} - {description}")
        return RedirectResponse("/login?msg=auth-failed")

    token = await oauth.google.authorize_access_token(request)
    # ... proceed with user info

Always check for the error parameter before attempting token exchange. Users can deny permission, tokens can expire during slow redirects, and authorization servers can have transient failures.

Multi-provider patterns

Production apps often support multiple OAuth2 providers. Structure your code to handle this:

PROVIDERS = {
    "google": {
        "server_metadata_url": "https://accounts.google.com/.well-known/openid-configuration",
        "client_kwargs": {"scope": "openid email profile"},
    },
    "github": {
        "authorize_url": "https://github.com/login/oauth/authorize",
        "access_token_url": "https://github.com/login/oauth/access_token",
        "api_base_url": "https://api.github.com/",
        "client_kwargs": {"scope": "read:user user:email"},
    },
}

for name, config in PROVIDERS.items():
    oauth.register(
        name=name,
        client_id=os.environ[f"{name.upper()}_CLIENT_ID"],
        client_secret=os.environ[f"{name.upper()}_CLIENT_SECRET"],
        **config,
    )

Note that GitHub doesn’t support OpenID Connect discovery, so you specify endpoints manually. Each provider has its own quirks — Google returns user info in the ID token, GitHub requires a separate API call to /user.

The one thing to remember: OAuth2 integration in Python is straightforward with Authlib or requests-oauthlib, but production readiness requires PKCE for public clients, proper state validation, token rotation, redirect URI pinning, and graceful error handling across multiple providers.

pythonsecurityauthenticationweb

See Also