Python CQRS Pattern — Deep Dive

CQRS with Event Sourcing

CQRS pairs naturally with event sourcing. The write side stores events; the read side projects those events into query-optimized views. This gives you a complete audit trail and the ability to rebuild read models from scratch.

The Full Architecture

Command → Validate → Store Events (append-only)

                    Event Bus (async)

              ┌───────────┼───────────┐
              ↓           ↓           ↓
         Projection A  Projection B  Projection C
         (order list)  (dashboard)   (search index)
              ↓           ↓           ↓
          Read DB A    Read DB B   Elasticsearch

Python Implementation

# Command bus
from typing import Callable, Dict, Type
from dataclasses import dataclass

class CommandBus:
    def __init__(self):
        self._handlers: Dict[Type, Callable] = {}

    def register(self, command_type: Type, handler: Callable):
        self._handlers[command_type] = handler

    async def dispatch(self, command) -> any:
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler for {type(command).__name__}")
        return await handler(command)

# Commands
@dataclass
class CreateProduct:
    name: str
    price: float
    category: str
    sku: str

@dataclass
class UpdatePrice:
    product_id: str
    new_price: float
    reason: str

# Command handler with event sourcing
class ProductCommandHandler:
    def __init__(self, event_store, event_publisher):
        self.event_store = event_store
        self.event_publisher = event_publisher

    async def handle_create_product(self, command: CreateProduct) -> str:
        product = Product.create(
            name=command.name,
            price=command.price,
            category=command.category,
            sku=command.sku,
        )
        events = product.uncommitted_events()

        await self.event_store.save(
            aggregate_id=product.id,
            events=events,
            expected_version=0,
        )

        for event in events:
            await self.event_publisher.publish(event)

        return product.id

    async def handle_update_price(self, command: UpdatePrice):
        # Load aggregate from events
        events = await self.event_store.load(command.product_id)
        product = Product.from_events(events)

        # Apply domain logic
        product.update_price(command.new_price, command.reason)

        new_events = product.uncommitted_events()
        await self.event_store.save(
            aggregate_id=product.id,
            events=new_events,
            expected_version=len(events),
        )

        for event in new_events:
            await self.event_publisher.publish(event)

Projection Strategies

Synchronous Projections (Simple)

Update the read model in the same process, immediately after persisting events:

class SyncProjectionRunner:
    def __init__(self, projections: list):
        self.projections = projections

    async def project(self, events: list):
        for event in events:
            for projection in self.projections:
                handler = getattr(
                    projection,
                    f"on_{type(event).__name__}",
                    None
                )
                if handler:
                    await handler(event)

# Usage in command handler
async def handle_create_product(self, command: CreateProduct):
    # ... save events ...
    await self.projection_runner.project(events)  # Immediate consistency

Tradeoff: Read model is always consistent with write model, but command processing is slower (must wait for projections).

Asynchronous Projections (Scalable)

Projections consume events from a broker with independent processing:

class AsyncProjectionWorker:
    def __init__(self, consumer, projections: list, checkpoint_store):
        self.consumer = consumer
        self.projections = projections
        self.checkpoint_store = checkpoint_store

    async def run(self):
        last_position = await self.checkpoint_store.get_position()

        async for event in self.consumer.stream(from_position=last_position):
            for projection in self.projections:
                handler = getattr(
                    projection,
                    f"on_{event.event_type}",
                    None
                )
                if handler:
                    await handler(event.data)

            # Checkpoint periodically, not every event
            if event.position % 100 == 0:
                await self.checkpoint_store.save_position(event.position)

# Multiple projection workers can run independently
# Each tracks its own checkpoint

Tradeoff: Read model may lag behind writes (eventual consistency), but commands are fast and projections scale independently.

Rebuilding Projections

One of the greatest benefits of event sourcing + CQRS: you can rebuild any read model from scratch by replaying all events.

class ProjectionRebuilder:
    def __init__(self, event_store, projection, read_db):
        self.event_store = event_store
        self.projection = projection
        self.read_db = read_db

    async def rebuild(self, batch_size: int = 1000):
        # Clear existing read model
        await self.read_db.drop_collection(self.projection.collection_name)

        # Replay all events
        position = 0
        while True:
            events = await self.event_store.load_all(
                from_position=position,
                limit=batch_size,
            )
            if not events:
                break

            for event in events:
                handler = getattr(
                    self.projection,
                    f"on_{event.event_type}",
                    None
                )
                if handler:
                    await handler(event.data)

            position = events[-1].position + 1
            logging.info(f"Rebuilt up to position {position}")

This is invaluable when you need a new read model (e.g., a search index or a new dashboard) — you replay the event history into the new projection without touching the write side.

Handling Eventual Consistency

Read-Your-Own-Writes

After a user creates an order, they expect to see it immediately. But with async projections, the read model might lag.

@app.post("/orders")
async def create_order(command: PlaceOrderCommand, response: Response):
    order_id = await command_bus.dispatch(command)

    # Return a version token
    version = await event_store.get_latest_version(order_id)
    response.headers["X-Version"] = str(version)
    return {"order_id": order_id}

@app.get("/orders/{order_id}")
async def get_order(order_id: str, request: Request):
    expected_version = request.headers.get("X-Version")

    order = await query_service.get_order_summary(order_id)

    if expected_version and order:
        if order.get("version", 0) < int(expected_version):
            # Read model hasn't caught up yet
            # Option 1: Read directly from write store (fallback)
            return await write_store_query.get_order(order_id)
            # Option 2: Return 202 Accepted with retry header
            # response.status_code = 202
            # response.headers["Retry-After"] = "1"

    return order

Causal Consistency

Ensure related reads see causally related writes:

class CausalConsistencyMiddleware:
    async def __call__(self, request, call_next):
        # Client sends the last event version it saw
        causal_token = request.headers.get("X-Causal-Token")

        if causal_token:
            # Wait (with timeout) for read model to reach this version
            version = int(causal_token)
            await self.wait_for_projection(version, timeout=5.0)

        response = await call_next(request)

        # Attach current projection version for client to track
        current = await self.get_projection_version()
        response.headers["X-Causal-Token"] = str(current)
        return response

Multiple Read Models for Different Consumers

# Projection 1: Fast order lookup (MongoDB)
class OrderLookupProjection:
    collection_name = "order_lookup"

    async def on_OrderPlaced(self, data):
        await self.db.insert("order_lookup", {
            "order_id": data["order_id"],
            "customer_id": data["customer_id"],
            "total": data["total"],
            "status": "placed",
        })

# Projection 2: Full-text search (Elasticsearch)
class OrderSearchProjection:
    collection_name = "order_search"

    async def on_OrderPlaced(self, data):
        await self.es.index(
            index="orders",
            id=data["order_id"],
            body={
                "customer_name": data["customer_name"],
                "items": data["item_descriptions"],
                "total": data["total"],
                "status": "placed",
            }
        )

# Projection 3: Analytics (ClickHouse/BigQuery)
class OrderAnalyticsProjection:
    collection_name = "order_analytics"

    async def on_OrderPlaced(self, data):
        await self.analytics.insert("orders_fact", {
            "order_id": data["order_id"],
            "customer_segment": data["customer_segment"],
            "total": data["total"],
            "item_count": data["item_count"],
            "region": data["region"],
            "placed_at": data["timestamp"],
        })

Each projection serves a different consumer: the web app uses the MongoDB lookup, the search page uses Elasticsearch, and the BI team uses the analytics projection. All built from the same event stream.

Testing CQRS Systems

Command Side Tests

async def test_create_order_emits_event():
    event_store = InMemoryEventStore()
    publisher = InMemoryPublisher()
    handler = OrderCommandHandler(event_store, publisher)

    order_id = await handler.handle_create_order(CreateOrder(
        customer_id="cust-1",
        items=[{"sku": "WIDGET-1", "qty": 2}],
    ))

    events = await event_store.load(order_id)
    assert len(events) == 1
    assert events[0].event_type == "OrderCreated"
    assert events[0].data["customer_id"] == "cust-1"

async def test_price_update_requires_positive_amount():
    event_store = InMemoryEventStore()
    # Pre-populate with product creation events
    await event_store.save("prod-1", [ProductCreated(price=10.0)], 0)

    handler = ProductCommandHandler(event_store, InMemoryPublisher())

    with pytest.raises(ValueError, match="Price must be positive"):
        await handler.handle_update_price(UpdatePrice(
            product_id="prod-1", new_price=-5.0, reason="test"
        ))

Projection Tests

async def test_order_summary_projection():
    read_db = InMemoryReadDB()
    projection = OrderSummaryProjection(read_db)

    await projection.on_OrderPlaced({
        "order_id": "o-1",
        "customer_id": "c-1",
        "total": 99.99,
        "item_count": 3,
        "created_at": "2026-03-27T12:00:00Z",
    })

    summary = await read_db.find_one("order_summaries", {"order_id": "o-1"})
    assert summary["status"] == "placed"
    assert summary["total"] == 99.99

    await projection.on_OrderShipped({"order_id": "o-1", "tracking": "TRK123"})

    summary = await read_db.find_one("order_summaries", {"order_id": "o-1"})
    assert summary["status"] == "shipped"
    assert summary["tracking"] == "TRK123"

Performance Characteristics

MetricTraditional CRUDCQRS
Write latencyLow (single DB write)Low (append-only + async publish)
Read latencyMedium (joins, aggregations)Very low (pre-computed views)
Read scalabilityLimited by write DBIndependent (separate read store)
StorageEfficient (current state)Higher (events + multiple read models)
ComplexityLowHigher (two models, projections, eventual consistency)

When to Use CQRS

Good fit:

  • Read and write workloads have very different scaling needs
  • Complex domain logic on the write side
  • Multiple read-optimized views needed for different consumers
  • Event sourcing is already in use
  • High read-to-write ratio (100:1 or more)

Poor fit:

  • Simple CRUD with uniform read/write patterns
  • Small team that can’t maintain the operational complexity
  • Strong consistency is non-negotiable everywhere
  • Fewer than 5-10 entities in the domain

The one thing to remember: CQRS in Python becomes powerful when combined with event sourcing and async projections — the ability to build, rebuild, and optimize multiple read models from a single event stream is the pattern’s true payoff, but it demands disciplined handling of eventual consistency.

pythonarchitecturepatterns

See Also