Python CQRS Pattern — Core Concepts
What CQRS Is
CQRS (Command Query Responsibility Segregation) separates the write model (commands) from the read model (queries) in your application. Instead of one model that handles both creating data and displaying it, you build two distinct paths.
This is an evolution of the simpler CQS (Command Query Separation) principle from Bertrand Meyer: a method should either change state (command) or return data (query), never both.
CQRS takes this to the architectural level — separate models, possibly separate databases, for reads and writes.
The Two Sides
Command Side (Write)
Commands represent intentions to change state. They’re validated, processed, and may produce events.
from pydantic import BaseModel
from datetime import datetime
class PlaceOrderCommand(BaseModel):
customer_id: str
items: list[dict]
shipping_address: str
class CommandHandler:
def __init__(self, repository, event_publisher):
self.repository = repository
self.event_publisher = event_publisher
async def handle_place_order(self, command: PlaceOrderCommand):
# Business logic and validation
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address,
)
# Persist to write store
await self.repository.save(order)
# Publish event for read side to consume
await self.event_publisher.publish("order.placed", {
"order_id": str(order.id),
"customer_id": order.customer_id,
"total": order.total,
"item_count": len(order.items),
"created_at": datetime.utcnow().isoformat(),
})
return order.id
Query Side (Read)
Queries return data from read-optimized stores. No business logic, no validation — just fast lookups.
class OrderQueryService:
def __init__(self, read_db):
self.read_db = read_db
async def get_order_summary(self, order_id: str) -> dict:
# Read from denormalized, pre-built view
return await self.read_db.find_one(
"order_summaries", {"order_id": order_id}
)
async def get_customer_orders(self, customer_id: str) -> list:
return await self.read_db.find(
"customer_order_list",
{"customer_id": customer_id},
sort=[("created_at", -1)],
limit=50,
)
async def get_dashboard_stats(self) -> dict:
return await self.read_db.find_one("dashboard_stats", {})
Read Model Projections
The read side builds projections — pre-computed views optimized for specific queries. These are updated by consuming events from the write side.
class OrderSummaryProjection:
"""Updates the order_summaries collection whenever order events occur."""
def __init__(self, read_db):
self.read_db = read_db
async def on_order_placed(self, event: dict):
await self.read_db.insert("order_summaries", {
"order_id": event["order_id"],
"customer_id": event["customer_id"],
"total": event["total"],
"item_count": event["item_count"],
"status": "placed",
"created_at": event["created_at"],
})
async def on_order_shipped(self, event: dict):
await self.read_db.update(
"order_summaries",
{"order_id": event["order_id"]},
{"status": "shipped", "tracking": event["tracking_number"]},
)
Each projection is purpose-built. The customer order list might store data differently than the admin dashboard. This eliminates complex joins and lets each view be exactly what the UI needs.
When the Read and Write Stores Differ
| Concern | Write Store | Read Store |
|---|---|---|
| Priority | Consistency, integrity | Speed, denormalization |
| Example | PostgreSQL with normalized tables | MongoDB or Redis with pre-built views |
| Schema | Normalized, relational | Denormalized, query-optimized |
| Scaling | Scale for write throughput | Scale for read throughput |
You don’t always need separate databases. A simpler version uses the same PostgreSQL instance but different tables — normalized tables for writes, materialized views or denormalized tables for reads.
Connecting Command and Query Sides
The typical flow:
User action → API endpoint → Command handler
→ Write to command store
→ Publish event
Event broker → Projection handler
→ Update read store
User query → API endpoint → Query handler
→ Read from read store → Return response
In FastAPI:
from fastapi import FastAPI, Depends
app = FastAPI()
@app.post("/orders")
async def create_order(
command: PlaceOrderCommand,
handler: CommandHandler = Depends(get_command_handler)
):
order_id = await handler.handle_place_order(command)
return {"order_id": order_id, "status": "accepted"}
@app.get("/orders/{order_id}")
async def get_order(
order_id: str,
query_service: OrderQueryService = Depends(get_query_service)
):
order = await query_service.get_order_summary(order_id)
if not order:
raise HTTPException(404, "Order not found")
return order
Common Misconception
“CQRS always means two separate databases.”
CQRS is a code-level pattern first. You can start with one database, separate read and write models in your Python code, and only introduce separate data stores when performance demands it. Many teams get 80% of the benefit just by separating command handlers from query handlers — no extra infrastructure needed.
The one thing to remember: CQRS separates the write path (commands with business logic) from the read path (queries with pre-built views), letting you optimize each independently — start simple with code separation, add infrastructure complexity only when needed.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.