Request-Response Lifecycle — Deep Dive
WSGI: the synchronous protocol
WSGI (PEP 3333) defines the interface between Python web servers and frameworks. A WSGI application is a callable that takes two arguments:
def simple_wsgi_app(environ, start_response):
"""Minimal WSGI application."""
# environ contains the request (method, path, headers, body)
method = environ["REQUEST_METHOD"]
path = environ["PATH_INFO"]
query = environ.get("QUERY_STRING", "")
# Process the request
if path == "/health":
status = "200 OK"
body = b'{"status": "ok"}'
else:
status = "404 Not Found"
body = b'{"error": "not found"}'
# start_response sets status and headers
headers = [
("Content-Type", "application/json"),
("Content-Length", str(len(body))),
]
start_response(status, headers)
# Return an iterable of byte strings
return [body]
Gunicorn calls this function for every request. Each call runs in its own thread or process. The environ dict contains everything: REQUEST_METHOD, PATH_INFO, HTTP_* headers (prefixed), wsgi.input (request body as a file-like object), and server variables.
Flask and Django build their request objects by parsing environ. When you access request.json in Flask, it reads from environ["wsgi.input"] and JSON-decodes it.
ASGI: the async evolution
ASGI supports async/await, long-lived connections (WebSockets), and HTTP/2. An ASGI application is an async callable with three arguments:
async def simple_asgi_app(scope, receive, send):
"""Minimal ASGI application."""
if scope["type"] == "http":
# Read the request body
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
path = scope["path"]
method = scope["method"]
if path == "/health":
response_body = b'{"status": "ok"}'
status = 200
else:
response_body = b'{"error": "not found"}'
status = 404
# Send response headers
await send({
"type": "http.response.start",
"status": status,
"headers": [
(b"content-type", b"application/json"),
(b"content-length", str(len(response_body)).encode()),
],
})
# Send response body
await send({
"type": "http.response.body",
"body": response_body,
})
The scope dict replaces WSGI’s environ with structured data: scope["type"] is "http" or "websocket", scope["path"] is the URL path, scope["headers"] is a list of byte tuples.
The receive and send callables enable streaming: you can await receive() multiple times for chunked request bodies, and await send() multiple times for streaming responses.
Middleware internals
ASGI middleware wraps the application by intercepting scope, receive, and send:
import time
import logging
logger = logging.getLogger(__name__)
class TimingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
start = time.perf_counter()
status_code = None
async def send_wrapper(message):
nonlocal status_code
if message["type"] == "http.response.start":
status_code = message["status"]
# Inject custom header
headers = list(message.get("headers", []))
elapsed = time.perf_counter() - start
headers.append(
(b"x-response-time", f"{elapsed*1000:.1f}ms".encode())
)
message = {**message, "headers": headers}
await send(message)
try:
await self.app(scope, receive, send_wrapper)
finally:
elapsed = time.perf_counter() - start
path = scope.get("path", "unknown")
method = scope.get("method", "?")
logger.info(
"request_completed",
extra={
"method": method,
"path": path,
"status": status_code,
"duration_ms": round(elapsed * 1000, 1),
},
)
The send_wrapper pattern intercepts the response before it reaches the client. This is how middleware adds headers, modifies status codes, or captures response metadata.
Starlette middleware chain execution
Starlette (which FastAPI builds on) processes middleware in onion order:
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.gzip import GZipMiddleware
app = FastAPI(
middleware=[
Middleware(TimingMiddleware), # Outermost
Middleware(CORSMiddleware, allow_origins=["*"]),
Middleware(AuthenticationMiddleware),
Middleware(GZipMiddleware), # Innermost
]
)
Request flow: Timing → CORS → Auth → GZip → Route Handler Response flow: Route Handler → GZip → Auth → CORS → Timing
GZip is innermost so it compresses the raw response. Timing is outermost so it captures total time including all middleware processing.
FastAPI dependency injection lifecycle
FastAPI resolves dependencies in topological order before the handler runs:
from fastapi import Depends, Request
async def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_current_user(
request: Request,
db: Session = Depends(get_db),
):
token = request.headers.get("authorization", "").removeprefix("Bearer ")
user = await authenticate(token, db)
if not user:
raise HTTPException(status_code=401)
return user
async def get_user_permissions(
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
return await fetch_permissions(user.id, db)
@app.get("/admin/dashboard")
async def admin_dashboard(
permissions: list[str] = Depends(get_user_permissions),
):
if "admin" not in permissions:
raise HTTPException(status_code=403)
return {"dashboard": "data"}
Resolution order:
get_db()— creates database sessionget_current_user()— uses db to authenticateget_user_permissions()— uses user and db to fetch permissionsadmin_dashboard()— receives resolved permissions
FastAPI caches dependency results within a request: get_db is called once even though both get_current_user and get_user_permissions depend on it. The yield in get_db creates a context manager — db.close() runs after the response is sent.
Django request lifecycle in detail
Django’s lifecycle is more structured:
Client Request
→ WSGIHandler.__call__()
→ SecurityMiddleware.process_request()
→ SessionMiddleware.process_request()
→ CommonMiddleware.process_request()
→ AuthenticationMiddleware.process_request()
→ URL Resolution (urls.py)
→ View function/class
→ Form/serializer validation
→ Database queries
→ Template rendering / JSON serialization
→ CommonMiddleware.process_response()
→ SessionMiddleware.process_response()
→ SecurityMiddleware.process_response()
→ WSGI Response
Client Response
Django’s middleware has distinct hooks: process_request, process_view (after URL resolution but before the view), process_exception, process_template_response, and process_response. This gives finer control than ASGI’s single-wrapper pattern.
Streaming responses
For large responses, streaming avoids loading the entire body into memory:
from fastapi.responses import StreamingResponse
import asyncio
async def generate_large_csv():
yield "id,name,email\n"
for i in range(1_000_000):
yield f"{i},user_{i},user_{i}@example.com\n"
if i % 10000 == 0:
await asyncio.sleep(0) # Yield control to event loop
@app.get("/export/users")
async def export_users():
return StreamingResponse(
generate_large_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"},
)
At the ASGI level, streaming sends multiple http.response.body messages with more_body=True:
# What StreamingResponse does internally
await send({"type": "http.response.start", "status": 200, "headers": [...]})
async for chunk in generate_large_csv():
await send({
"type": "http.response.body",
"body": chunk.encode(),
"more_body": True,
})
await send({"type": "http.response.body", "body": b"", "more_body": False})
Request lifecycle profiling
Measure where time is actually spent:
import time
from contextvars import ContextVar
request_timings: ContextVar[dict] = ContextVar("request_timings")
class ProfilingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
timings = {"start": time.perf_counter()}
token = request_timings.set(timings)
async def profiled_send(message):
if message["type"] == "http.response.start":
timings["first_byte"] = time.perf_counter()
elif message["type"] == "http.response.body" and not message.get("more_body"):
timings["last_byte"] = time.perf_counter()
# Log timing breakdown
total = timings["last_byte"] - timings["start"]
to_first_byte = timings.get("first_byte", timings["start"]) - timings["start"]
db_time = timings.get("db_total", 0)
headers = list(message.get("headers", []))
headers.append(
(b"server-timing",
f'total;dur={total*1000:.1f}, '
f'ttfb;dur={to_first_byte*1000:.1f}, '
f'db;dur={db_time*1000:.1f}'.encode())
)
await send(message)
try:
await self.app(scope, receive, profiled_send)
finally:
request_timings.reset(token)
# In your database layer:
async def timed_query(query, db):
timings = request_timings.get({})
start = time.perf_counter()
result = await db.execute(query)
elapsed = time.perf_counter() - start
timings["db_total"] = timings.get("db_total", 0) + elapsed
return result
The Server-Timing header is readable in browser DevTools, showing exactly where server time was spent without external monitoring tools.
Connection lifecycle
Beyond individual requests, understanding connection management matters for performance:
HTTP/1.1 keep-alive: The TCP connection persists across multiple requests. Gunicorn’s keepalive setting (default 2 seconds) controls how long idle connections stay open. Too short: connection setup overhead. Too long: worker threads blocked by idle connections.
HTTP/2 multiplexing: A single connection handles hundreds of concurrent requests as streams. The Python server needs fewer worker processes/threads since connections are shared.
Connection limits: Gunicorn’s --workers and --threads limit total concurrent connections. For I/O-bound APIs: workers = 2 * CPU_cores + 1 with --worker-class gthread --threads 4. For async (Uvicorn): one worker per core, each handling thousands of concurrent requests via the event loop.
One thing to remember: The request-response lifecycle spans seven layers from DNS to your Python handler — profiling with Server-Timing headers and structured middleware gives you visibility into where each millisecond goes, turning “the API is slow” into actionable diagnostics.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.