Request-Response Lifecycle — Deep Dive

WSGI: the synchronous protocol

WSGI (PEP 3333) defines the interface between Python web servers and frameworks. A WSGI application is a callable that takes two arguments:

def simple_wsgi_app(environ, start_response):
    """Minimal WSGI application."""
    # environ contains the request (method, path, headers, body)
    method = environ["REQUEST_METHOD"]
    path = environ["PATH_INFO"]
    query = environ.get("QUERY_STRING", "")

    # Process the request
    if path == "/health":
        status = "200 OK"
        body = b'{"status": "ok"}'
    else:
        status = "404 Not Found"
        body = b'{"error": "not found"}'

    # start_response sets status and headers
    headers = [
        ("Content-Type", "application/json"),
        ("Content-Length", str(len(body))),
    ]
    start_response(status, headers)

    # Return an iterable of byte strings
    return [body]

Gunicorn calls this function for every request. Each call runs in its own thread or process. The environ dict contains everything: REQUEST_METHOD, PATH_INFO, HTTP_* headers (prefixed), wsgi.input (request body as a file-like object), and server variables.

Flask and Django build their request objects by parsing environ. When you access request.json in Flask, it reads from environ["wsgi.input"] and JSON-decodes it.

ASGI: the async evolution

ASGI supports async/await, long-lived connections (WebSockets), and HTTP/2. An ASGI application is an async callable with three arguments:

async def simple_asgi_app(scope, receive, send):
    """Minimal ASGI application."""
    if scope["type"] == "http":
        # Read the request body
        body = b""
        while True:
            message = await receive()
            body += message.get("body", b"")
            if not message.get("more_body", False):
                break

        path = scope["path"]
        method = scope["method"]

        if path == "/health":
            response_body = b'{"status": "ok"}'
            status = 200
        else:
            response_body = b'{"error": "not found"}'
            status = 404

        # Send response headers
        await send({
            "type": "http.response.start",
            "status": status,
            "headers": [
                (b"content-type", b"application/json"),
                (b"content-length", str(len(response_body)).encode()),
            ],
        })

        # Send response body
        await send({
            "type": "http.response.body",
            "body": response_body,
        })

The scope dict replaces WSGI’s environ with structured data: scope["type"] is "http" or "websocket", scope["path"] is the URL path, scope["headers"] is a list of byte tuples.

The receive and send callables enable streaming: you can await receive() multiple times for chunked request bodies, and await send() multiple times for streaming responses.

Middleware internals

ASGI middleware wraps the application by intercepting scope, receive, and send:

import time
import logging

logger = logging.getLogger(__name__)

class TimingMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        start = time.perf_counter()
        status_code = None

        async def send_wrapper(message):
            nonlocal status_code
            if message["type"] == "http.response.start":
                status_code = message["status"]
                # Inject custom header
                headers = list(message.get("headers", []))
                elapsed = time.perf_counter() - start
                headers.append(
                    (b"x-response-time", f"{elapsed*1000:.1f}ms".encode())
                )
                message = {**message, "headers": headers}
            await send(message)

        try:
            await self.app(scope, receive, send_wrapper)
        finally:
            elapsed = time.perf_counter() - start
            path = scope.get("path", "unknown")
            method = scope.get("method", "?")
            logger.info(
                "request_completed",
                extra={
                    "method": method,
                    "path": path,
                    "status": status_code,
                    "duration_ms": round(elapsed * 1000, 1),
                },
            )

The send_wrapper pattern intercepts the response before it reaches the client. This is how middleware adds headers, modifies status codes, or captures response metadata.

Starlette middleware chain execution

Starlette (which FastAPI builds on) processes middleware in onion order:

from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware

app = FastAPI(
    middleware=[
        Middleware(TimingMiddleware),          # Outermost
        Middleware(CORSMiddleware, allow_origins=["*"]),
        Middleware(AuthenticationMiddleware),
        Middleware(GZipMiddleware),            # Innermost
    ]
)

Request flow: Timing → CORS → Auth → GZip → Route Handler Response flow: Route Handler → GZip → Auth → CORS → Timing

GZip is innermost so it compresses the raw response. Timing is outermost so it captures total time including all middleware processing.

FastAPI dependency injection lifecycle

FastAPI resolves dependencies in topological order before the handler runs:

from fastapi import Depends, Request

async def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

async def get_current_user(
    request: Request,
    db: Session = Depends(get_db),
):
    token = request.headers.get("authorization", "").removeprefix("Bearer ")
    user = await authenticate(token, db)
    if not user:
        raise HTTPException(status_code=401)
    return user

async def get_user_permissions(
    user: User = Depends(get_current_user),
    db: Session = Depends(get_db),
):
    return await fetch_permissions(user.id, db)

@app.get("/admin/dashboard")
async def admin_dashboard(
    permissions: list[str] = Depends(get_user_permissions),
):
    if "admin" not in permissions:
        raise HTTPException(status_code=403)
    return {"dashboard": "data"}

Resolution order:

  1. get_db() — creates database session
  2. get_current_user() — uses db to authenticate
  3. get_user_permissions() — uses user and db to fetch permissions
  4. admin_dashboard() — receives resolved permissions

FastAPI caches dependency results within a request: get_db is called once even though both get_current_user and get_user_permissions depend on it. The yield in get_db creates a context manager — db.close() runs after the response is sent.

Django request lifecycle in detail

Django’s lifecycle is more structured:

Client Request
  → WSGIHandler.__call__()
    → SecurityMiddleware.process_request()
    → SessionMiddleware.process_request()
    → CommonMiddleware.process_request()
    → AuthenticationMiddleware.process_request()
    → URL Resolution (urls.py)
    → View function/class
      → Form/serializer validation
      → Database queries
      → Template rendering / JSON serialization
    → CommonMiddleware.process_response()
    → SessionMiddleware.process_response()
    → SecurityMiddleware.process_response()
  → WSGI Response
Client Response

Django’s middleware has distinct hooks: process_request, process_view (after URL resolution but before the view), process_exception, process_template_response, and process_response. This gives finer control than ASGI’s single-wrapper pattern.

Streaming responses

For large responses, streaming avoids loading the entire body into memory:

from fastapi.responses import StreamingResponse
import asyncio

async def generate_large_csv():
    yield "id,name,email\n"
    for i in range(1_000_000):
        yield f"{i},user_{i},user_{i}@example.com\n"
        if i % 10000 == 0:
            await asyncio.sleep(0)  # Yield control to event loop

@app.get("/export/users")
async def export_users():
    return StreamingResponse(
        generate_large_csv(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=users.csv"},
    )

At the ASGI level, streaming sends multiple http.response.body messages with more_body=True:

# What StreamingResponse does internally
await send({"type": "http.response.start", "status": 200, "headers": [...]})

async for chunk in generate_large_csv():
    await send({
        "type": "http.response.body",
        "body": chunk.encode(),
        "more_body": True,
    })

await send({"type": "http.response.body", "body": b"", "more_body": False})

Request lifecycle profiling

Measure where time is actually spent:

import time
from contextvars import ContextVar

request_timings: ContextVar[dict] = ContextVar("request_timings")

class ProfilingMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        timings = {"start": time.perf_counter()}
        token = request_timings.set(timings)

        async def profiled_send(message):
            if message["type"] == "http.response.start":
                timings["first_byte"] = time.perf_counter()
            elif message["type"] == "http.response.body" and not message.get("more_body"):
                timings["last_byte"] = time.perf_counter()

                # Log timing breakdown
                total = timings["last_byte"] - timings["start"]
                to_first_byte = timings.get("first_byte", timings["start"]) - timings["start"]
                db_time = timings.get("db_total", 0)

                headers = list(message.get("headers", []))
                headers.append(
                    (b"server-timing",
                     f'total;dur={total*1000:.1f}, '
                     f'ttfb;dur={to_first_byte*1000:.1f}, '
                     f'db;dur={db_time*1000:.1f}'.encode())
                )
            await send(message)

        try:
            await self.app(scope, receive, profiled_send)
        finally:
            request_timings.reset(token)

# In your database layer:
async def timed_query(query, db):
    timings = request_timings.get({})
    start = time.perf_counter()
    result = await db.execute(query)
    elapsed = time.perf_counter() - start
    timings["db_total"] = timings.get("db_total", 0) + elapsed
    return result

The Server-Timing header is readable in browser DevTools, showing exactly where server time was spent without external monitoring tools.

Connection lifecycle

Beyond individual requests, understanding connection management matters for performance:

HTTP/1.1 keep-alive: The TCP connection persists across multiple requests. Gunicorn’s keepalive setting (default 2 seconds) controls how long idle connections stay open. Too short: connection setup overhead. Too long: worker threads blocked by idle connections.

HTTP/2 multiplexing: A single connection handles hundreds of concurrent requests as streams. The Python server needs fewer worker processes/threads since connections are shared.

Connection limits: Gunicorn’s --workers and --threads limit total concurrent connections. For I/O-bound APIs: workers = 2 * CPU_cores + 1 with --worker-class gthread --threads 4. For async (Uvicorn): one worker per core, each handling thousands of concurrent requests via the event loop.

One thing to remember: The request-response lifecycle spans seven layers from DNS to your Python handler — profiling with Server-Timing headers and structured middleware gives you visibility into where each millisecond goes, turning “the API is slow” into actionable diagnostics.

pythonwebhttpwsgiasgimiddlewarefastapidjango

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.