Python Event-Driven Architecture — Deep Dive
Event Sourcing
Event sourcing stores the state of an entity as a sequence of events rather than a current snapshot. Instead of a row in a database that says “Order #123: status=shipped, total=$50”, you store every event that happened to that order:
OrderCreated(order_id=123, customer="alice", items=[...], total=50.00)
PaymentReceived(order_id=123, amount=50.00, method="card")
ItemsPacked(order_id=123, warehouse="east")
OrderShipped(order_id=123, carrier="fedex", tracking="ABC123")
The current state is derived by replaying events in order.
Python Implementation
from dataclasses import dataclass, field
from datetime import datetime
from typing import Protocol
from uuid import uuid4
class DomainEvent(Protocol):
event_id: str
timestamp: datetime
aggregate_id: str
@dataclass
class OrderCreated:
aggregate_id: str
customer_id: str
items: list[dict]
total: float
event_id: str = field(default_factory=lambda: str(uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
@dataclass
class PaymentReceived:
aggregate_id: str
amount: float
method: str
event_id: str = field(default_factory=lambda: str(uuid4()))
timestamp: datetime = field(default_factory=datetime.utcnow)
class Order:
def __init__(self):
self.id = None
self.status = "new"
self.total = 0.0
self.paid = False
self._events: list = []
def apply(self, event):
handler = getattr(self, f"_on_{type(event).__name__}", None)
if handler:
handler(event)
self._events.append(event)
def _on_OrderCreated(self, event: OrderCreated):
self.id = event.aggregate_id
self.status = "created"
self.total = event.total
def _on_PaymentReceived(self, event: PaymentReceived):
self.paid = True
self.status = "paid"
@classmethod
def from_events(cls, events: list) -> "Order":
order = cls()
for event in events:
order.apply(event)
order._events = [] # Clear uncommitted events
return order
Event Store
from sqlalchemy import Column, String, JSON, DateTime, Integer
from sqlalchemy.ext.asyncio import AsyncSession
class EventStore(Base):
__tablename__ = "events"
id = Column(Integer, primary_key=True)
aggregate_id = Column(String, index=True, nullable=False)
event_type = Column(String, nullable=False)
event_data = Column(JSON, nullable=False)
version = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False)
__table_args__ = (
UniqueConstraint("aggregate_id", "version", name="uq_aggregate_version"),
)
class EventRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def save_events(self, aggregate_id: str, events: list, expected_version: int):
for i, event in enumerate(events):
store_entry = EventStore(
aggregate_id=aggregate_id,
event_type=type(event).__name__,
event_data=asdict(event),
version=expected_version + i + 1,
timestamp=event.timestamp,
)
self.session.add(store_entry)
# UniqueConstraint on (aggregate_id, version) provides optimistic concurrency
await self.session.commit()
async def load_events(self, aggregate_id: str) -> list:
result = await self.session.execute(
select(EventStore)
.where(EventStore.aggregate_id == aggregate_id)
.order_by(EventStore.version)
)
return [deserialize_event(row) for row in result.scalars()]
Snapshots for Performance
Replaying thousands of events for every read is slow. Snapshots store periodic state checkpoints:
class SnapshotStore:
async def save_snapshot(self, aggregate_id: str, state: dict, version: int):
await self.session.merge(Snapshot(
aggregate_id=aggregate_id,
state=state,
version=version,
))
await self.session.commit()
async def load_aggregate(self, aggregate_id: str) -> Order:
snapshot = await self.get_latest_snapshot(aggregate_id)
if snapshot:
order = Order.from_snapshot(snapshot.state)
events = await self.event_repo.load_events_after(
aggregate_id, snapshot.version
)
else:
order = Order()
events = await self.event_repo.load_events(aggregate_id)
for event in events:
order.apply(event)
return order
Idempotent Event Consumers
Network failures, broker retries, and duplicate delivery make idempotency essential. Every consumer must handle the same event being delivered multiple times.
Idempotency Key Pattern
class IdempotentConsumer:
def __init__(self, redis_client, handler):
self.redis = redis_client
self.handler = handler
async def process(self, event: dict):
event_id = event["event_id"]
# Try to acquire processing lock
acquired = await self.redis.set(
f"processed:{event_id}",
"processing",
nx=True, # Only set if not exists
ex=86400, # Expire after 24 hours
)
if not acquired:
# Already processed or being processed
return
try:
await self.handler(event)
await self.redis.set(f"processed:{event_id}", "done", ex=86400)
except Exception:
await self.redis.delete(f"processed:{event_id}")
raise
Database-Level Idempotency
For critical operations, use database constraints:
async def process_payment(session: AsyncSession, event: dict):
# Unique constraint on (order_id, event_id) prevents double processing
payment = Payment(
order_id=event["data"]["order_id"],
event_id=event["event_id"],
amount=event["data"]["total"],
status="completed",
)
try:
session.add(payment)
await session.commit()
except IntegrityError:
await session.rollback()
# Already processed — idempotent success
Dead Letter Queues and Error Handling
When events fail processing after maximum retries, they land in a dead-letter queue (DLQ) for investigation:
# RabbitMQ dead-letter configuration
async def setup_queues(channel):
# Dead letter exchange
dlx = await channel.declare_exchange("dlx", aio_pika.ExchangeType.DIRECT)
dlq = await channel.declare_queue("dead-letters", durable=True)
await dlq.bind(dlx, "")
# Main queue with dead-letter routing
main_queue = await channel.declare_queue(
"order-events",
durable=True,
arguments={
"x-dead-letter-exchange": "dlx",
"x-dead-letter-routing-key": "",
"x-message-ttl": 30000, # 30s before dead-lettering
"x-max-delivery-count": 3, # Max retries
}
)
return main_queue, dlq
DLQ Processing Service
async def process_dead_letters(dlq):
async with dlq.iterator() as messages:
async for message in messages:
async with message.process():
event = json.loads(message.body)
headers = message.headers or {}
death_info = headers.get("x-death", [{}])
await log_failed_event(
event=event,
reason=death_info[0].get("reason", "unknown"),
original_queue=death_info[0].get("queue", "unknown"),
retry_count=death_info[0].get("count", 0),
)
await alert_ops_team(event)
Kafka-Based Streaming Architecture
For high-throughput event streaming, Kafka provides ordered, durable event logs:
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
# Producer
class KafkaEventPublisher:
def __init__(self):
self.producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if k else None,
acks="all", # Wait for all replicas
enable_idempotence=True, # Exactly-once semantics
)
async def publish(self, topic: str, key: str, event: dict):
await self.producer.send_and_wait(
topic=topic,
key=key, # Partition key for ordering
value=event,
)
# Consumer with manual offset management
class KafkaEventConsumer:
def __init__(self, group_id: str, topics: list[str]):
self.consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers="kafka:9092",
group_id=group_id,
auto_offset_reset="earliest",
enable_auto_commit=False, # Manual commit for at-least-once
value_deserializer=lambda v: json.loads(v),
)
async def consume(self, handler):
await self.consumer.start()
try:
async for message in self.consumer:
try:
await handler(message.value)
await self.consumer.commit()
except Exception as e:
logging.error(f"Failed to process: {e}")
# Don't commit — message will be redelivered
await asyncio.sleep(5)
finally:
await self.consumer.stop()
Kafka Consumer Groups and Partitioning
# Order events partitioned by customer_id
# All events for the same customer land in the same partition
# → ordering guaranteed per customer
await publisher.publish(
topic="order-events",
key=order.customer_id, # Partition key
event={
"type": "OrderPlaced",
"data": {"order_id": order.id, "customer_id": order.customer_id}
}
)
# 3 consumers in the same group = 3 partitions processed in parallel
# Each partition assigned to exactly one consumer
CQRS with Event-Driven Reads
Separate read and write paths using events to build read-optimized projections:
# Write side: stores events
@app.post("/orders")
async def create_order(order: CreateOrderRequest):
events = order_service.create_order(order)
await event_store.save_events(order.id, events)
await event_publisher.publish_all(events)
return {"order_id": order.id}
# Read side: maintains a denormalized view
class OrderSummaryProjection:
def __init__(self, db: AsyncSession):
self.db = db
async def handle_order_created(self, event: dict):
summary = OrderSummary(
order_id=event["data"]["order_id"],
customer_name=event["data"]["customer_name"],
total=event["data"]["total"],
status="created",
item_count=len(event["data"]["items"]),
)
self.db.add(summary)
await self.db.commit()
async def handle_order_shipped(self, event: dict):
summary = await self.db.get(OrderSummary, event["data"]["order_id"])
summary.status = "shipped"
summary.tracking_number = event["data"]["tracking"]
await self.db.commit()
Testing Event-Driven Systems
Unit Testing with In-Memory Broker
class InMemoryBroker:
def __init__(self):
self.published: list[dict] = []
self.handlers: dict[str, list] = {}
async def publish(self, topic: str, event: dict):
self.published.append({"topic": topic, "event": event})
for handler in self.handlers.get(topic, []):
await handler(event)
def subscribe(self, topic: str, handler):
self.handlers.setdefault(topic, []).append(handler)
# Test
async def test_order_creation_publishes_event():
broker = InMemoryBroker()
service = OrderService(broker=broker)
await service.create_order(customer_id="alice", items=[{"id": "prod-1", "qty": 2}])
assert len(broker.published) == 1
assert broker.published[0]["topic"] == "order.created"
assert broker.published[0]["event"]["data"]["customer_id"] == "alice"
Contract Testing for Event Schemas
from pydantic import ValidationError
def test_order_created_event_schema():
"""Ensure producers and consumers agree on event structure."""
raw_event = {
"event_id": "abc-123",
"event_type": "OrderPlaced",
"timestamp": "2026-03-27T12:00:00Z",
"data": {
"order_id": "order-456",
"customer_id": "cust-789",
"items": [{"product_id": "p1", "quantity": 2, "unit_price": 9.99}],
"total_amount": 19.98,
"currency": "USD",
}
}
# Both producer and consumer validate against the same schema
event = OrderPlacedEvent(**raw_event)
assert event.data.total_amount == 19.98
Monitoring Event-Driven Systems
Key metrics to track:
| Metric | What It Tells You | Alert Threshold |
|---|---|---|
| Consumer lag | How far behind a consumer is | > 10,000 messages |
| Event processing time | How long handlers take | p99 > 5 seconds |
| Dead letter queue depth | How many events permanently failed | > 0 (investigate every failure) |
| Event publish rate | System throughput | Sudden drops or spikes |
| Consumer group rebalances | Stability of consumer assignments | > 5 per hour |
from prometheus_client import Counter, Histogram, Gauge
events_published = Counter("events_published_total", "Total events published", ["event_type"])
events_processed = Counter("events_processed_total", "Total events processed", ["event_type", "status"])
processing_duration = Histogram("event_processing_seconds", "Event processing duration", ["event_type"])
consumer_lag = Gauge("consumer_lag_messages", "Consumer lag in messages", ["consumer_group", "topic"])
The one thing to remember: Production event-driven systems in Python require idempotent consumers, a dead-letter strategy for failed events, and the discipline to treat event schemas as contracts — getting these right determines whether your architecture is resilient or a debugging nightmare.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.