Starlette ASGI — Deep Dive
The ASGI protocol in depth
An ASGI application is a callable with the signature:
async def app(scope, receive, send):
...
scope is a dictionary describing the connection — its type (http, websocket, lifespan), path, headers, query string, and more. receive is an async callable that reads incoming messages. send is an async callable that sends outgoing messages.
For an HTTP request, the flow is:
- Server creates a scope with
type: "http" - App calls
await receive()to get the request body (http.requestmessage) - App calls
await send()twice: firsthttp.response.start(status + headers), thenhttp.response.body
This message-passing design is what enables streaming, chunked responses, and WebSockets — all through the same interface.
Starlette’s application architecture
Starlette wraps this raw protocol in a layered architecture:
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
async def homepage(request):
return JSONResponse({"status": "running"})
async def user_detail(request):
user_id = request.path_params["user_id"]
return JSONResponse({"user_id": user_id})
app = Starlette(routes=[
Route("/", homepage),
Route("/users/{user_id:int}", user_detail),
])
The Starlette class itself is an ASGI app. When a request arrives, it passes through the middleware stack, then the router matches a route, which calls your endpoint function with a Request object that wraps the raw scope/receive/send.
Middleware internals
Middleware in Starlette follows the ASGI middleware pattern — each middleware is an ASGI app that wraps the next app:
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import time
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
response.headers["X-Process-Time"] = f"{elapsed:.4f}"
return response
However, BaseHTTPMiddleware has a known limitation: it reads the entire response body into memory before your middleware can modify it. For streaming responses, this breaks the streaming behavior.
The recommended alternative for production middleware is the pure ASGI pattern:
class PureTimingMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
start = time.perf_counter()
async def send_with_timing(message):
if message["type"] == "http.response.start":
elapsed = time.perf_counter() - start
headers = dict(message.get("headers", []))
headers[b"x-process-time"] = str(elapsed).encode()
message["headers"] = list(headers.items())
await send(message)
await self.app(scope, receive, send_with_timing)
This preserves streaming and adds zero memory overhead.
Routing deep dive
Starlette’s routing supports converters for path parameters:
{param}— matches any string{param:int}— matches integers{param:float}— matches floats{param:path}— matches remaining path including slashes{param:uuid}— matches UUID format
Sub-applications use Mount:
from starlette.routing import Mount, Route
api_routes = [
Route("/users", list_users),
Route("/users/{user_id:int}", get_user),
]
app = Starlette(routes=[
Route("/", homepage),
Mount("/api/v1", routes=api_routes),
Mount("/static", app=StaticFiles(directory="static")),
])
Each Mount creates an isolated routing subtree. The mounted app receives requests with the prefix stripped from the path, so /api/v1/users arrives at list_users as /users.
WebSocket handling
from starlette.routing import WebSocketRoute
from starlette.websockets import WebSocket
async def ws_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
result = process(data)
await websocket.send_json({"result": result})
except Exception:
await websocket.close(code=1011)
app = Starlette(routes=[
WebSocketRoute("/ws", ws_endpoint),
])
Key production considerations for WebSockets:
- Always handle disconnection gracefully (
WebSocketDisconnectexception) - Implement heartbeat/ping mechanisms for detecting dead connections
- Use
websocket.stateto store per-connection data - Consider connection limits — each WebSocket holds an open connection and consumes memory
Background tasks and lifespan
Background tasks run after the response is sent:
from starlette.background import BackgroundTask
from starlette.responses import JSONResponse
async def send_notification(user_id: int, message: str):
# This runs after the response is delivered
await notification_service.send(user_id, message)
async def create_order(request):
order = await process_order(request)
task = BackgroundTask(send_notification, order.user_id, "Order confirmed")
return JSONResponse({"order_id": order.id}, background=task)
The lifespan protocol manages startup and shutdown:
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app):
# Startup: initialize connection pools, load ML models, etc.
pool = await create_db_pool()
app.state.db = pool
yield
# Shutdown: clean up resources
await pool.close()
app = Starlette(lifespan=lifespan, routes=[...])
Testing patterns
Starlette’s TestClient (based on httpx) provides synchronous testing for async apps:
from starlette.testclient import TestClient
def test_homepage():
client = TestClient(app)
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"status": "running"}
def test_websocket():
client = TestClient(app)
with client.websocket_connect("/ws") as ws:
ws.send_json({"action": "ping"})
data = ws.receive_json()
assert data["result"] == "pong"
For async test functions (with pytest-asyncio), use httpx.AsyncClient directly:
import httpx
import pytest
@pytest.mark.anyio
async def test_homepage_async():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/")
assert response.status_code == 200
Production deployment
Deploy Starlette with uvicorn (single process) or gunicorn + uvicorn workers (multi-process):
# Development
uvicorn myapp:app --reload
# Production: gunicorn with uvicorn workers
gunicorn myapp:app -k uvicorn.workers.UvicornWorker -w 4 --bind 0.0.0.0:8000
Worker count rule of thumb: 2 * CPU_cores + 1 for CPU-bound work. For I/O-bound async apps, fewer workers with higher concurrency often performs better since each worker handles thousands of concurrent connections via the event loop.
Performance tradeoffs
Starlette’s async nature shines when your endpoints spend time waiting: database queries, HTTP calls to other services, file I/O. For CPU-heavy work (image processing, complex calculations), async doesn’t help — the event loop is blocked. For those cases, offload to a thread pool (asyncio.to_thread()) or a task queue like Celery.
Memory usage is dramatically lower than thread-per-request models. A sync framework serving 5,000 concurrent long-polling connections might need 5,000 threads (each consuming ~8MB of stack), while Starlette handles the same with one thread and coroutines consuming kilobytes each.
The one thing to remember: Starlette’s power comes from understanding the ASGI protocol underneath — scope, receive, send — and knowing when to use pure ASGI middleware, proper lifespan management, and async-aware deployment configurations for production.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.