FastAPI WebSocket Patterns — Deep Dive

FastAPI’s WebSocket implementation

FastAPI delegates WebSocket handling to Starlette, which implements the ASGI WebSocket spec. Each WebSocket connection is an independent ASGI scope with its own receive and send callables. Unlike HTTP, where a scope covers a single request-response cycle, a WebSocket scope lives for the entire connection duration.

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass

Under the hood, receive_text() awaits an ASGI message of type websocket.receive with a text key. send_text() sends a websocket.send message. The event loop handles concurrency — while one connection awaits a message, other connections can process theirs.

Production connection manager

A robust connection manager needs more than a list:

import asyncio
from dataclasses import dataclass, field
from typing import Dict, Set
from fastapi import WebSocket

@dataclass
class ConnectionManager:
    _rooms: Dict[str, Set[WebSocket]] = field(default_factory=dict)
    _user_connections: Dict[str, WebSocket] = field(default_factory=dict)
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)

    async def connect(self, websocket: WebSocket, room: str, user_id: str):
        await websocket.accept()
        async with self._lock:
            if room not in self._rooms:
                self._rooms[room] = set()
            self._rooms[room].add(websocket)
            self._user_connections[user_id] = websocket

    async def disconnect(self, websocket: WebSocket, room: str, user_id: str):
        async with self._lock:
            self._rooms.get(room, set()).discard(websocket)
            if room in self._rooms and not self._rooms[room]:
                del self._rooms[room]
            self._user_connections.pop(user_id, None)

    async def broadcast(self, room: str, message: str, exclude: WebSocket = None):
        connections = self._rooms.get(room, set()).copy()
        dead = []
        for conn in connections:
            if conn is exclude:
                continue
            try:
                await conn.send_text(message)
            except Exception:
                dead.append(conn)
        # Clean up dead connections
        if dead:
            async with self._lock:
                for conn in dead:
                    self._rooms.get(room, set()).discard(conn)

    async def send_to_user(self, user_id: str, message: str):
        conn = self._user_connections.get(user_id)
        if conn:
            try:
                await conn.send_text(message)
            except Exception:
                self._user_connections.pop(user_id, None)

Key design decisions:

  • AsyncIO lock prevents race conditions when multiple connections join/leave simultaneously
  • Dead connection cleanup during broadcast avoids accumulating zombie connections
  • User-to-connection mapping enables targeted messaging (e.g., direct messages, notifications)

Scaling beyond a single process

A single Uvicorn worker keeps all connections in memory. This doesn’t scale. The standard solution: Redis pub/sub as a message relay between workers.

import aioredis

class RedisConnectionManager:
    def __init__(self):
        self.local_connections: Dict[str, Set[WebSocket]] = {}
        self.redis = None
        self.pubsub = None

    async def init(self):
        self.redis = await aioredis.from_url("redis://localhost")
        self.pubsub = self.redis.pubsub()

    async def subscribe(self, room: str):
        await self.pubsub.subscribe(f"ws:room:{room}")

    async def publish(self, room: str, message: str):
        # Publish to Redis — all workers receive it
        await self.redis.publish(f"ws:room:{room}", message)

    async def listen(self):
        """Background task: relay Redis messages to local connections"""
        async for message in self.pubsub.listen():
            if message["type"] == "message":
                channel = message["channel"].decode()
                room = channel.split(":")[-1]
                data = message["data"].decode()
                # Send to local connections only
                for conn in self.local_connections.get(room, set()):
                    try:
                        await conn.send_text(data)
                    except Exception:
                        pass

Architecture:

  1. Client sends message to their connected worker
  2. Worker publishes to Redis channel
  3. All workers (including the sender) receive the message via subscription
  4. Each worker sends to its local connections only

This scales horizontally. You can run 20 Uvicorn workers across 5 servers, and a message in Room A reaches every client in Room A regardless of which worker they’re connected to.

Backpressure and slow consumers

A fast producer (e.g., a live data feed sending 100 messages/second) paired with a slow consumer (mobile client on 3G) creates backpressure. Messages queue up in the server’s send buffer, consuming memory.

Strategies:

Message dropping: Track a per-connection send queue. If it exceeds a threshold, drop older messages or disconnect the slow client:

async def rate_limited_send(websocket: WebSocket, message: str, timeout: float = 1.0):
    try:
        await asyncio.wait_for(websocket.send_text(message), timeout=timeout)
    except asyncio.TimeoutError:
        # Client too slow — disconnect
        await websocket.close(code=1008, reason="Slow consumer")

Message coalescing: Instead of sending every update, accumulate changes and send batched snapshots at fixed intervals (e.g., every 100ms). This naturally limits the send rate.

Priority queues: Critical messages (errors, disconnection warnings) skip the queue. Non-critical messages (position updates in a game) can be dropped.

Authentication patterns

Token validation during handshake

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    token = websocket.query_params.get("token")
    if not token:
        await websocket.close(code=1008, reason="Missing token")
        return

    try:
        user = verify_jwt(token)
    except InvalidTokenError:
        await websocket.close(code=1008, reason="Invalid token")
        return

    await websocket.accept()
    # Connection is now authenticated as `user`

Mid-connection re-authentication

Long-lived connections outlast token expiry. Options:

  • Client sends a refresh message with a new token before expiry
  • Server tracks token expiry and sends a “re-authenticate” message
  • Use a session-based approach where the session is refreshed server-side

Structured message protocol

Production WebSocket applications need a message protocol beyond raw text:

import json
from pydantic import BaseModel
from typing import Literal

class WSMessage(BaseModel):
    type: Literal["chat", "join", "leave", "typing", "error"]
    room: str | None = None
    content: str | None = None
    user_id: str | None = None
    timestamp: float | None = None

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            raw = await websocket.receive_text()
            try:
                msg = WSMessage.model_validate_json(raw)
            except ValidationError as e:
                await websocket.send_text(
                    WSMessage(type="error", content=str(e)).model_dump_json()
                )
                continue

            if msg.type == "chat":
                await manager.broadcast(msg.room, msg.model_dump_json())
            elif msg.type == "join":
                await manager.connect(websocket, msg.room, msg.user_id)
    except WebSocketDisconnect:
        await manager.disconnect(websocket, msg.room, msg.user_id)

Using Pydantic for WebSocket messages gives you validation, serialization, and documentation consistency with your HTTP endpoints.

Health monitoring and metrics

Track WebSocket health in production:

# Prometheus-style metrics
ws_connections_active = Gauge("ws_connections_active", "Active connections", ["room"])
ws_messages_total = Counter("ws_messages_total", "Messages sent", ["direction", "room"])
ws_errors_total = Counter("ws_errors_total", "WebSocket errors", ["type"])

async def monitored_broadcast(room: str, message: str):
    connections = manager.get_connections(room)
    for conn in connections:
        try:
            await conn.send_text(message)
            ws_messages_total.labels(direction="outbound", room=room).inc()
        except Exception as e:
            ws_errors_total.labels(type=type(e).__name__).inc()

Key metrics to track: active connections (total and per room), message throughput, error rates, connection duration distribution, and memory usage per worker.

Deployment considerations

Load balancer sticky sessions: WebSocket connections are long-lived. If a load balancer distributes the upgrade request to server A but then routes subsequent TCP packets to server B, the connection breaks. Use sticky sessions (session affinity) or ensure your load balancer supports WebSocket protocol awareness (nginx, HAProxy, AWS ALB all do).

Connection limits: Each WebSocket holds a file descriptor. A Linux process defaults to 1024 open files. For high-connection servers, increase ulimit -n to 65535 or higher. Each connection also uses ~10-50KB of memory depending on buffer sizes.

Nginx proxy configuration:

location /ws {
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_read_timeout 86400;  # 24 hours
}

The proxy_read_timeout must be long enough for idle connections. The default 60 seconds will silently close idle WebSockets.

The one thing to remember: Production WebSocket architectures need Redis pub/sub (or equivalent) for multi-process scaling, structured message protocols with validation, backpressure handling for slow consumers, and careful deployment configuration for sticky sessions and connection limits — the in-memory connection manager is just the starting point.

pythonwebapiswebsockets

See Also

  • Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
  • Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
  • Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
  • Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
  • Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.