Litestar Framework — Deep Dive
Application architecture
A production Litestar application uses the layered architecture for clean separation:
from litestar import Litestar, Router
from litestar.config.cors import CORSConfig
from litestar.config.csrf import CSRFConfig
from litestar.middleware.rate_limit import RateLimitConfig
from controllers.users import UserController
from controllers.orders import OrderController
from middleware.auth import AuthMiddleware
from dependencies import provide_db
# API router with shared config
api_router = Router(
path="/api/v1",
route_handlers=[UserController, OrderController],
dependencies={"db": provide_db},
middleware=[AuthMiddleware],
)
app = Litestar(
route_handlers=[api_router],
cors_config=CORSConfig(
allow_origins=["https://myapp.com"],
allow_methods=["GET", "POST", "PUT", "DELETE"],
),
csrf_config=CSRFConfig(secret="csrf-secret-key"),
middleware=[
RateLimitConfig(rate_limit=("minute", 60)).middleware,
],
)
The layered model means AuthMiddleware only applies to /api/v1 routes, not health checks or static files. Rate limiting applies globally but could be overridden per-controller.
Dependency injection system
Litestar’s DI system is more structured than FastAPI’s:
from litestar.di import Provide
from litestar import Controller, get, post
from dataclasses import dataclass
@dataclass
class DatabaseSession:
pool: asyncpg.Pool
async def provide_db(state: State) -> AsyncGenerator[DatabaseSession, None]:
"""Scoped dependency — new instance per request."""
async with state.db_pool.acquire() as conn:
yield DatabaseSession(conn)
async def provide_cache(state: State) -> Redis:
"""Singleton-like — reuses the same Redis instance."""
return state.redis
class UserController(Controller):
path = "/users"
dependencies = {"db": Provide(provide_db)}
@get()
async def list_users(self, db: DatabaseSession) -> list[User]:
rows = await db.pool.fetch("SELECT * FROM users")
return [User(**row) for row in rows]
@get("/{user_id:int}")
async def get_user(
self,
user_id: int,
db: DatabaseSession,
cache: Redis, # injected from app-level dependency
) -> User:
cached = await cache.get(f"user:{user_id}")
if cached:
return User.parse_raw(cached)
row = await db.pool.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
if not row:
raise NotFoundException(detail=f"User {user_id} not found")
user = User(**row)
await cache.set(f"user:{user_id}", user.json(), ex=300)
return user
Key DI features:
- Generator dependencies (
yield) automatically handle cleanup (closing connections, releasing locks) - Layered resolution — controller dependencies override router dependencies, which override app dependencies
- Type-safe — dependencies are resolved by parameter name and type
- No
Depends()wrapper — the mapping is explicit in thedependenciesdict
DTO (Data Transfer Object) system
Litestar’s DTO system controls what data goes in and out without modifying your domain models:
from litestar.dto import DTOConfig
from litestar.contrib.sqlalchemy.dto import SQLAlchemyDTO
class UserModel(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
email: Mapped[str]
password_hash: Mapped[str]
created_at: Mapped[datetime]
internal_notes: Mapped[str | None]
# Read DTO — exclude sensitive fields
class UserReadDTO(SQLAlchemyDTO[UserModel]):
config = DTOConfig(
exclude={"password_hash", "internal_notes"},
)
# Write DTO — only allow specific fields on creation
class UserWriteDTO(SQLAlchemyDTO[UserModel]):
config = DTOConfig(
include={"name", "email"},
)
# Patch DTO — all fields optional for partial updates
class UserPatchDTO(SQLAlchemyDTO[UserModel]):
config = DTOConfig(
include={"name", "email"},
partial=True,
)
class UserController(Controller):
path = "/users"
@get(return_dto=UserReadDTO)
async def list_users(self, db: AsyncSession) -> list[UserModel]:
result = await db.execute(select(UserModel))
return result.scalars().all()
@post(dto=UserWriteDTO, return_dto=UserReadDTO)
async def create_user(self, data: UserModel, db: AsyncSession) -> UserModel:
db.add(data)
await db.commit()
return data
@patch("/{user_id:int}", dto=UserPatchDTO, return_dto=UserReadDTO)
async def update_user(
self, user_id: int, data: DTOData[UserModel], db: AsyncSession
) -> UserModel:
user = await db.get(UserModel, user_id)
if not user:
raise NotFoundException()
user = data.update_instance(user)
await db.commit()
return user
DTOs solve a real pain point: separating API shapes from database models without writing duplicate dataclasses.
Custom middleware
Litestar middleware can be pure ASGI or use the AbstractMiddleware base:
from litestar.middleware import AbstractMiddleware
from litestar.types import ASGIApp, Receive, Scope, Send
class RequestTracingMiddleware(AbstractMiddleware):
scopes = {ScopeType.HTTP} # only HTTP, not WebSocket
exclude = ["health", "metrics"] # skip these paths
async def __call__(
self, scope: Scope, receive: Receive, send: Send
) -> None:
request = Request(scope)
trace_id = request.headers.get(
"x-trace-id", str(uuid.uuid4())
)
scope["state"]["trace_id"] = trace_id
start = time.perf_counter()
async def send_with_trace(message):
if message["type"] == "http.response.start":
headers = MutableHeaders(scope=message)
headers.append("x-trace-id", trace_id)
await send(message)
try:
await self.app(scope, receive, send_with_trace)
finally:
elapsed = time.perf_counter() - start
logger.info(
"request_complete",
trace_id=trace_id,
method=request.method,
path=request.url.path,
duration=elapsed,
)
The exclude and scopes class attributes let you control which requests hit the middleware without conditional logic inside the handler.
Channels (WebSocket pub/sub)
Litestar’s channels plugin provides managed WebSocket pub/sub:
from litestar.channels import ChannelsPlugin
from litestar.channels.backends.memory import MemoryChannelsBackend
from litestar.channels.backends.redis import RedisChannelsBackend
channels = ChannelsPlugin(
backend=RedisChannelsBackend(url="redis://localhost"),
arbitrary_channels_allowed=True,
create_ws_route_handlers=True,
ws_handler_base_path="/ws",
)
app = Litestar(
plugins=[channels],
route_handlers=[...],
)
# Publishing from anywhere in your app
@post("/notifications")
async def send_notification(
data: Notification, channels: ChannelsPlugin
) -> None:
await channels.publish(
data.dict(),
channels=[f"user:{data.user_id}"]
)
Clients connect to /ws/{channel_name} and automatically receive published messages. The Redis backend enables multi-process/multi-server pub/sub.
Plugin system
Litestar’s plugin system allows deep framework customization:
from litestar.plugins import InitPluginProtocol
from litestar.config.app import AppConfig
class MetricsPlugin(InitPluginProtocol):
def on_app_init(self, app_config: AppConfig) -> AppConfig:
# Add middleware
app_config.middleware.append(MetricsMiddleware)
# Add routes
app_config.route_handlers.append(MetricsController)
# Add lifecycle hooks
original_on_startup = app_config.on_startup
async def on_startup_with_metrics(app: Litestar) -> None:
if original_on_startup:
for hook in original_on_startup:
await hook(app)
await initialize_metrics_collector()
app_config.on_startup = [on_startup_with_metrics]
return app_config
app = Litestar(plugins=[MetricsPlugin()])
Testing
Litestar includes a test client:
from litestar.testing import TestClient, AsyncTestClient
def test_list_users():
with TestClient(app=app) as client:
response = client.get("/api/v1/users")
assert response.status_code == 200
assert isinstance(response.json(), list)
async def test_create_user():
async with AsyncTestClient(app=app) as client:
response = await client.post(
"/api/v1/users",
json={"name": "Alice", "email": "alice@test.com"},
)
assert response.status_code == 201
For testing with dependency overrides:
from litestar.testing import create_test_client
def test_with_mock_db():
mock_db = MockDatabase()
with create_test_client(
route_handlers=[UserController],
dependencies={"db": Provide(lambda: mock_db)},
) as client:
response = client.get("/users")
assert response.status_code == 200
SQLAlchemy integration
Litestar provides first-class SQLAlchemy support:
from litestar.contrib.sqlalchemy.plugins import (
SQLAlchemyAsyncConfig,
SQLAlchemyPlugin,
)
sqlalchemy_config = SQLAlchemyAsyncConfig(
connection_string="postgresql+asyncpg://user:pass@localhost/db",
session_config=AsyncSessionConfig(expire_on_commit=False),
)
app = Litestar(
plugins=[SQLAlchemyPlugin(config=sqlalchemy_config)],
route_handlers=[...],
)
# AsyncSession is automatically available as a dependency
class UserController(Controller):
@get("/users")
async def list_users(self, db_session: AsyncSession) -> list[User]:
result = await db_session.execute(select(UserModel))
return result.scalars().all()
Production deployment
# uvicorn
uvicorn app:app --workers 4 --host 0.0.0.0 --port 8000
# granian (Rust-based ASGI server, excellent with Litestar)
granian --interface asgi app:app --workers 4 --host 0.0.0.0 --port 8000
Granian, a Rust-based ASGI server, pairs particularly well with Litestar — both projects are community-driven and optimize for similar goals.
The one thing to remember: Litestar’s layered DI, DTO system, channels plugin, and structured middleware create a framework where complex API requirements — authorization, serialization control, real-time communication — are handled through first-class, composable abstractions rather than bolted-on patterns.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.