Python CQRS Pattern — Core Concepts

What CQRS Is

CQRS (Command Query Responsibility Segregation) separates the write model (commands) from the read model (queries) in your application. Instead of one model that handles both creating data and displaying it, you build two distinct paths.

This is an evolution of the simpler CQS (Command Query Separation) principle from Bertrand Meyer: a method should either change state (command) or return data (query), never both.

CQRS takes this to the architectural level — separate models, possibly separate databases, for reads and writes.

The Two Sides

Command Side (Write)

Commands represent intentions to change state. They’re validated, processed, and may produce events.

from pydantic import BaseModel
from datetime import datetime

class PlaceOrderCommand(BaseModel):
    customer_id: str
    items: list[dict]
    shipping_address: str

class CommandHandler:
    def __init__(self, repository, event_publisher):
        self.repository = repository
        self.event_publisher = event_publisher

    async def handle_place_order(self, command: PlaceOrderCommand):
        # Business logic and validation
        order = Order.create(
            customer_id=command.customer_id,
            items=command.items,
            shipping_address=command.shipping_address,
        )

        # Persist to write store
        await self.repository.save(order)

        # Publish event for read side to consume
        await self.event_publisher.publish("order.placed", {
            "order_id": str(order.id),
            "customer_id": order.customer_id,
            "total": order.total,
            "item_count": len(order.items),
            "created_at": datetime.utcnow().isoformat(),
        })

        return order.id

Query Side (Read)

Queries return data from read-optimized stores. No business logic, no validation — just fast lookups.

class OrderQueryService:
    def __init__(self, read_db):
        self.read_db = read_db

    async def get_order_summary(self, order_id: str) -> dict:
        # Read from denormalized, pre-built view
        return await self.read_db.find_one(
            "order_summaries", {"order_id": order_id}
        )

    async def get_customer_orders(self, customer_id: str) -> list:
        return await self.read_db.find(
            "customer_order_list",
            {"customer_id": customer_id},
            sort=[("created_at", -1)],
            limit=50,
        )

    async def get_dashboard_stats(self) -> dict:
        return await self.read_db.find_one("dashboard_stats", {})

Read Model Projections

The read side builds projections — pre-computed views optimized for specific queries. These are updated by consuming events from the write side.

class OrderSummaryProjection:
    """Updates the order_summaries collection whenever order events occur."""

    def __init__(self, read_db):
        self.read_db = read_db

    async def on_order_placed(self, event: dict):
        await self.read_db.insert("order_summaries", {
            "order_id": event["order_id"],
            "customer_id": event["customer_id"],
            "total": event["total"],
            "item_count": event["item_count"],
            "status": "placed",
            "created_at": event["created_at"],
        })

    async def on_order_shipped(self, event: dict):
        await self.read_db.update(
            "order_summaries",
            {"order_id": event["order_id"]},
            {"status": "shipped", "tracking": event["tracking_number"]},
        )

Each projection is purpose-built. The customer order list might store data differently than the admin dashboard. This eliminates complex joins and lets each view be exactly what the UI needs.

When the Read and Write Stores Differ

ConcernWrite StoreRead Store
PriorityConsistency, integritySpeed, denormalization
ExamplePostgreSQL with normalized tablesMongoDB or Redis with pre-built views
SchemaNormalized, relationalDenormalized, query-optimized
ScalingScale for write throughputScale for read throughput

You don’t always need separate databases. A simpler version uses the same PostgreSQL instance but different tables — normalized tables for writes, materialized views or denormalized tables for reads.

Connecting Command and Query Sides

The typical flow:

User action → API endpoint → Command handler
    → Write to command store
    → Publish event

Event broker → Projection handler
    → Update read store

User query → API endpoint → Query handler
    → Read from read store → Return response

In FastAPI:

from fastapi import FastAPI, Depends

app = FastAPI()

@app.post("/orders")
async def create_order(
    command: PlaceOrderCommand,
    handler: CommandHandler = Depends(get_command_handler)
):
    order_id = await handler.handle_place_order(command)
    return {"order_id": order_id, "status": "accepted"}

@app.get("/orders/{order_id}")
async def get_order(
    order_id: str,
    query_service: OrderQueryService = Depends(get_query_service)
):
    order = await query_service.get_order_summary(order_id)
    if not order:
        raise HTTPException(404, "Order not found")
    return order

Common Misconception

“CQRS always means two separate databases.”

CQRS is a code-level pattern first. You can start with one database, separate read and write models in your Python code, and only introduce separate data stores when performance demands it. Many teams get 80% of the benefit just by separating command handlers from query handlers — no extra infrastructure needed.

The one thing to remember: CQRS separates the write path (commands with business logic) from the read path (queries with pre-built views), letting you optimize each independently — start simple with code separation, add infrastructure complexity only when needed.

pythonarchitecturepatterns

See Also