Python CQRS Pattern — Deep Dive
CQRS with Event Sourcing
CQRS pairs naturally with event sourcing. The write side stores events; the read side projects those events into query-optimized views. This gives you a complete audit trail and the ability to rebuild read models from scratch.
The Full Architecture
Command → Validate → Store Events (append-only)
↓
Event Bus (async)
↓
┌───────────┼───────────┐
↓ ↓ ↓
Projection A Projection B Projection C
(order list) (dashboard) (search index)
↓ ↓ ↓
Read DB A Read DB B Elasticsearch
Python Implementation
# Command bus
from typing import Callable, Dict, Type
from dataclasses import dataclass
class CommandBus:
def __init__(self):
self._handlers: Dict[Type, Callable] = {}
def register(self, command_type: Type, handler: Callable):
self._handlers[command_type] = handler
async def dispatch(self, command) -> any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
return await handler(command)
# Commands
@dataclass
class CreateProduct:
name: str
price: float
category: str
sku: str
@dataclass
class UpdatePrice:
product_id: str
new_price: float
reason: str
# Command handler with event sourcing
class ProductCommandHandler:
def __init__(self, event_store, event_publisher):
self.event_store = event_store
self.event_publisher = event_publisher
async def handle_create_product(self, command: CreateProduct) -> str:
product = Product.create(
name=command.name,
price=command.price,
category=command.category,
sku=command.sku,
)
events = product.uncommitted_events()
await self.event_store.save(
aggregate_id=product.id,
events=events,
expected_version=0,
)
for event in events:
await self.event_publisher.publish(event)
return product.id
async def handle_update_price(self, command: UpdatePrice):
# Load aggregate from events
events = await self.event_store.load(command.product_id)
product = Product.from_events(events)
# Apply domain logic
product.update_price(command.new_price, command.reason)
new_events = product.uncommitted_events()
await self.event_store.save(
aggregate_id=product.id,
events=new_events,
expected_version=len(events),
)
for event in new_events:
await self.event_publisher.publish(event)
Projection Strategies
Synchronous Projections (Simple)
Update the read model in the same process, immediately after persisting events:
class SyncProjectionRunner:
def __init__(self, projections: list):
self.projections = projections
async def project(self, events: list):
for event in events:
for projection in self.projections:
handler = getattr(
projection,
f"on_{type(event).__name__}",
None
)
if handler:
await handler(event)
# Usage in command handler
async def handle_create_product(self, command: CreateProduct):
# ... save events ...
await self.projection_runner.project(events) # Immediate consistency
Tradeoff: Read model is always consistent with write model, but command processing is slower (must wait for projections).
Asynchronous Projections (Scalable)
Projections consume events from a broker with independent processing:
class AsyncProjectionWorker:
def __init__(self, consumer, projections: list, checkpoint_store):
self.consumer = consumer
self.projections = projections
self.checkpoint_store = checkpoint_store
async def run(self):
last_position = await self.checkpoint_store.get_position()
async for event in self.consumer.stream(from_position=last_position):
for projection in self.projections:
handler = getattr(
projection,
f"on_{event.event_type}",
None
)
if handler:
await handler(event.data)
# Checkpoint periodically, not every event
if event.position % 100 == 0:
await self.checkpoint_store.save_position(event.position)
# Multiple projection workers can run independently
# Each tracks its own checkpoint
Tradeoff: Read model may lag behind writes (eventual consistency), but commands are fast and projections scale independently.
Rebuilding Projections
One of the greatest benefits of event sourcing + CQRS: you can rebuild any read model from scratch by replaying all events.
class ProjectionRebuilder:
def __init__(self, event_store, projection, read_db):
self.event_store = event_store
self.projection = projection
self.read_db = read_db
async def rebuild(self, batch_size: int = 1000):
# Clear existing read model
await self.read_db.drop_collection(self.projection.collection_name)
# Replay all events
position = 0
while True:
events = await self.event_store.load_all(
from_position=position,
limit=batch_size,
)
if not events:
break
for event in events:
handler = getattr(
self.projection,
f"on_{event.event_type}",
None
)
if handler:
await handler(event.data)
position = events[-1].position + 1
logging.info(f"Rebuilt up to position {position}")
This is invaluable when you need a new read model (e.g., a search index or a new dashboard) — you replay the event history into the new projection without touching the write side.
Handling Eventual Consistency
Read-Your-Own-Writes
After a user creates an order, they expect to see it immediately. But with async projections, the read model might lag.
@app.post("/orders")
async def create_order(command: PlaceOrderCommand, response: Response):
order_id = await command_bus.dispatch(command)
# Return a version token
version = await event_store.get_latest_version(order_id)
response.headers["X-Version"] = str(version)
return {"order_id": order_id}
@app.get("/orders/{order_id}")
async def get_order(order_id: str, request: Request):
expected_version = request.headers.get("X-Version")
order = await query_service.get_order_summary(order_id)
if expected_version and order:
if order.get("version", 0) < int(expected_version):
# Read model hasn't caught up yet
# Option 1: Read directly from write store (fallback)
return await write_store_query.get_order(order_id)
# Option 2: Return 202 Accepted with retry header
# response.status_code = 202
# response.headers["Retry-After"] = "1"
return order
Causal Consistency
Ensure related reads see causally related writes:
class CausalConsistencyMiddleware:
async def __call__(self, request, call_next):
# Client sends the last event version it saw
causal_token = request.headers.get("X-Causal-Token")
if causal_token:
# Wait (with timeout) for read model to reach this version
version = int(causal_token)
await self.wait_for_projection(version, timeout=5.0)
response = await call_next(request)
# Attach current projection version for client to track
current = await self.get_projection_version()
response.headers["X-Causal-Token"] = str(current)
return response
Multiple Read Models for Different Consumers
# Projection 1: Fast order lookup (MongoDB)
class OrderLookupProjection:
collection_name = "order_lookup"
async def on_OrderPlaced(self, data):
await self.db.insert("order_lookup", {
"order_id": data["order_id"],
"customer_id": data["customer_id"],
"total": data["total"],
"status": "placed",
})
# Projection 2: Full-text search (Elasticsearch)
class OrderSearchProjection:
collection_name = "order_search"
async def on_OrderPlaced(self, data):
await self.es.index(
index="orders",
id=data["order_id"],
body={
"customer_name": data["customer_name"],
"items": data["item_descriptions"],
"total": data["total"],
"status": "placed",
}
)
# Projection 3: Analytics (ClickHouse/BigQuery)
class OrderAnalyticsProjection:
collection_name = "order_analytics"
async def on_OrderPlaced(self, data):
await self.analytics.insert("orders_fact", {
"order_id": data["order_id"],
"customer_segment": data["customer_segment"],
"total": data["total"],
"item_count": data["item_count"],
"region": data["region"],
"placed_at": data["timestamp"],
})
Each projection serves a different consumer: the web app uses the MongoDB lookup, the search page uses Elasticsearch, and the BI team uses the analytics projection. All built from the same event stream.
Testing CQRS Systems
Command Side Tests
async def test_create_order_emits_event():
event_store = InMemoryEventStore()
publisher = InMemoryPublisher()
handler = OrderCommandHandler(event_store, publisher)
order_id = await handler.handle_create_order(CreateOrder(
customer_id="cust-1",
items=[{"sku": "WIDGET-1", "qty": 2}],
))
events = await event_store.load(order_id)
assert len(events) == 1
assert events[0].event_type == "OrderCreated"
assert events[0].data["customer_id"] == "cust-1"
async def test_price_update_requires_positive_amount():
event_store = InMemoryEventStore()
# Pre-populate with product creation events
await event_store.save("prod-1", [ProductCreated(price=10.0)], 0)
handler = ProductCommandHandler(event_store, InMemoryPublisher())
with pytest.raises(ValueError, match="Price must be positive"):
await handler.handle_update_price(UpdatePrice(
product_id="prod-1", new_price=-5.0, reason="test"
))
Projection Tests
async def test_order_summary_projection():
read_db = InMemoryReadDB()
projection = OrderSummaryProjection(read_db)
await projection.on_OrderPlaced({
"order_id": "o-1",
"customer_id": "c-1",
"total": 99.99,
"item_count": 3,
"created_at": "2026-03-27T12:00:00Z",
})
summary = await read_db.find_one("order_summaries", {"order_id": "o-1"})
assert summary["status"] == "placed"
assert summary["total"] == 99.99
await projection.on_OrderShipped({"order_id": "o-1", "tracking": "TRK123"})
summary = await read_db.find_one("order_summaries", {"order_id": "o-1"})
assert summary["status"] == "shipped"
assert summary["tracking"] == "TRK123"
Performance Characteristics
| Metric | Traditional CRUD | CQRS |
|---|---|---|
| Write latency | Low (single DB write) | Low (append-only + async publish) |
| Read latency | Medium (joins, aggregations) | Very low (pre-computed views) |
| Read scalability | Limited by write DB | Independent (separate read store) |
| Storage | Efficient (current state) | Higher (events + multiple read models) |
| Complexity | Low | Higher (two models, projections, eventual consistency) |
When to Use CQRS
Good fit:
- Read and write workloads have very different scaling needs
- Complex domain logic on the write side
- Multiple read-optimized views needed for different consumers
- Event sourcing is already in use
- High read-to-write ratio (100:1 or more)
Poor fit:
- Simple CRUD with uniform read/write patterns
- Small team that can’t maintain the operational complexity
- Strong consistency is non-negotiable everywhere
- Fewer than 5-10 entities in the domain
The one thing to remember: CQRS in Python becomes powerful when combined with event sourcing and async projections — the ability to build, rebuild, and optimize multiple read models from a single event stream is the pattern’s true payoff, but it demands disciplined handling of eventual consistency.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.