Python Event-Driven Architecture — Deep Dive

Event Sourcing

Event sourcing stores the state of an entity as a sequence of events rather than a current snapshot. Instead of a row in a database that says “Order #123: status=shipped, total=$50”, you store every event that happened to that order:

OrderCreated(order_id=123, customer="alice", items=[...], total=50.00)
PaymentReceived(order_id=123, amount=50.00, method="card")
ItemsPacked(order_id=123, warehouse="east")
OrderShipped(order_id=123, carrier="fedex", tracking="ABC123")

The current state is derived by replaying events in order.

Python Implementation

from dataclasses import dataclass, field
from datetime import datetime
from typing import Protocol
from uuid import uuid4

class DomainEvent(Protocol):
    event_id: str
    timestamp: datetime
    aggregate_id: str

@dataclass
class OrderCreated:
    aggregate_id: str
    customer_id: str
    items: list[dict]
    total: float
    event_id: str = field(default_factory=lambda: str(uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass
class PaymentReceived:
    aggregate_id: str
    amount: float
    method: str
    event_id: str = field(default_factory=lambda: str(uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)

class Order:
    def __init__(self):
        self.id = None
        self.status = "new"
        self.total = 0.0
        self.paid = False
        self._events: list = []

    def apply(self, event):
        handler = getattr(self, f"_on_{type(event).__name__}", None)
        if handler:
            handler(event)
        self._events.append(event)

    def _on_OrderCreated(self, event: OrderCreated):
        self.id = event.aggregate_id
        self.status = "created"
        self.total = event.total

    def _on_PaymentReceived(self, event: PaymentReceived):
        self.paid = True
        self.status = "paid"

    @classmethod
    def from_events(cls, events: list) -> "Order":
        order = cls()
        for event in events:
            order.apply(event)
        order._events = []  # Clear uncommitted events
        return order

Event Store

from sqlalchemy import Column, String, JSON, DateTime, Integer
from sqlalchemy.ext.asyncio import AsyncSession

class EventStore(Base):
    __tablename__ = "events"

    id = Column(Integer, primary_key=True)
    aggregate_id = Column(String, index=True, nullable=False)
    event_type = Column(String, nullable=False)
    event_data = Column(JSON, nullable=False)
    version = Column(Integer, nullable=False)
    timestamp = Column(DateTime, nullable=False)

    __table_args__ = (
        UniqueConstraint("aggregate_id", "version", name="uq_aggregate_version"),
    )

class EventRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def save_events(self, aggregate_id: str, events: list, expected_version: int):
        for i, event in enumerate(events):
            store_entry = EventStore(
                aggregate_id=aggregate_id,
                event_type=type(event).__name__,
                event_data=asdict(event),
                version=expected_version + i + 1,
                timestamp=event.timestamp,
            )
            self.session.add(store_entry)
        # UniqueConstraint on (aggregate_id, version) provides optimistic concurrency
        await self.session.commit()

    async def load_events(self, aggregate_id: str) -> list:
        result = await self.session.execute(
            select(EventStore)
            .where(EventStore.aggregate_id == aggregate_id)
            .order_by(EventStore.version)
        )
        return [deserialize_event(row) for row in result.scalars()]

Snapshots for Performance

Replaying thousands of events for every read is slow. Snapshots store periodic state checkpoints:

class SnapshotStore:
    async def save_snapshot(self, aggregate_id: str, state: dict, version: int):
        await self.session.merge(Snapshot(
            aggregate_id=aggregate_id,
            state=state,
            version=version,
        ))
        await self.session.commit()

    async def load_aggregate(self, aggregate_id: str) -> Order:
        snapshot = await self.get_latest_snapshot(aggregate_id)
        if snapshot:
            order = Order.from_snapshot(snapshot.state)
            events = await self.event_repo.load_events_after(
                aggregate_id, snapshot.version
            )
        else:
            order = Order()
            events = await self.event_repo.load_events(aggregate_id)

        for event in events:
            order.apply(event)
        return order

Idempotent Event Consumers

Network failures, broker retries, and duplicate delivery make idempotency essential. Every consumer must handle the same event being delivered multiple times.

Idempotency Key Pattern

class IdempotentConsumer:
    def __init__(self, redis_client, handler):
        self.redis = redis_client
        self.handler = handler

    async def process(self, event: dict):
        event_id = event["event_id"]

        # Try to acquire processing lock
        acquired = await self.redis.set(
            f"processed:{event_id}",
            "processing",
            nx=True,     # Only set if not exists
            ex=86400,    # Expire after 24 hours
        )

        if not acquired:
            # Already processed or being processed
            return

        try:
            await self.handler(event)
            await self.redis.set(f"processed:{event_id}", "done", ex=86400)
        except Exception:
            await self.redis.delete(f"processed:{event_id}")
            raise

Database-Level Idempotency

For critical operations, use database constraints:

async def process_payment(session: AsyncSession, event: dict):
    # Unique constraint on (order_id, event_id) prevents double processing
    payment = Payment(
        order_id=event["data"]["order_id"],
        event_id=event["event_id"],
        amount=event["data"]["total"],
        status="completed",
    )
    try:
        session.add(payment)
        await session.commit()
    except IntegrityError:
        await session.rollback()
        # Already processed — idempotent success

Dead Letter Queues and Error Handling

When events fail processing after maximum retries, they land in a dead-letter queue (DLQ) for investigation:

# RabbitMQ dead-letter configuration
async def setup_queues(channel):
    # Dead letter exchange
    dlx = await channel.declare_exchange("dlx", aio_pika.ExchangeType.DIRECT)
    dlq = await channel.declare_queue("dead-letters", durable=True)
    await dlq.bind(dlx, "")

    # Main queue with dead-letter routing
    main_queue = await channel.declare_queue(
        "order-events",
        durable=True,
        arguments={
            "x-dead-letter-exchange": "dlx",
            "x-dead-letter-routing-key": "",
            "x-message-ttl": 30000,      # 30s before dead-lettering
            "x-max-delivery-count": 3,    # Max retries
        }
    )
    return main_queue, dlq

DLQ Processing Service

async def process_dead_letters(dlq):
    async with dlq.iterator() as messages:
        async for message in messages:
            async with message.process():
                event = json.loads(message.body)
                headers = message.headers or {}
                death_info = headers.get("x-death", [{}])

                await log_failed_event(
                    event=event,
                    reason=death_info[0].get("reason", "unknown"),
                    original_queue=death_info[0].get("queue", "unknown"),
                    retry_count=death_info[0].get("count", 0),
                )
                await alert_ops_team(event)

Kafka-Based Streaming Architecture

For high-throughput event streaming, Kafka provides ordered, durable event logs:

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json

# Producer
class KafkaEventPublisher:
    def __init__(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers="kafka:9092",
            value_serializer=lambda v: json.dumps(v).encode(),
            key_serializer=lambda k: k.encode() if k else None,
            acks="all",          # Wait for all replicas
            enable_idempotence=True,  # Exactly-once semantics
        )

    async def publish(self, topic: str, key: str, event: dict):
        await self.producer.send_and_wait(
            topic=topic,
            key=key,            # Partition key for ordering
            value=event,
        )

# Consumer with manual offset management
class KafkaEventConsumer:
    def __init__(self, group_id: str, topics: list[str]):
        self.consumer = AIOKafkaConsumer(
            *topics,
            bootstrap_servers="kafka:9092",
            group_id=group_id,
            auto_offset_reset="earliest",
            enable_auto_commit=False,  # Manual commit for at-least-once
            value_deserializer=lambda v: json.loads(v),
        )

    async def consume(self, handler):
        await self.consumer.start()
        try:
            async for message in self.consumer:
                try:
                    await handler(message.value)
                    await self.consumer.commit()
                except Exception as e:
                    logging.error(f"Failed to process: {e}")
                    # Don't commit — message will be redelivered
                    await asyncio.sleep(5)
        finally:
            await self.consumer.stop()

Kafka Consumer Groups and Partitioning

# Order events partitioned by customer_id
# All events for the same customer land in the same partition
# → ordering guaranteed per customer

await publisher.publish(
    topic="order-events",
    key=order.customer_id,   # Partition key
    event={
        "type": "OrderPlaced",
        "data": {"order_id": order.id, "customer_id": order.customer_id}
    }
)

# 3 consumers in the same group = 3 partitions processed in parallel
# Each partition assigned to exactly one consumer

CQRS with Event-Driven Reads

Separate read and write paths using events to build read-optimized projections:

# Write side: stores events
@app.post("/orders")
async def create_order(order: CreateOrderRequest):
    events = order_service.create_order(order)
    await event_store.save_events(order.id, events)
    await event_publisher.publish_all(events)
    return {"order_id": order.id}

# Read side: maintains a denormalized view
class OrderSummaryProjection:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def handle_order_created(self, event: dict):
        summary = OrderSummary(
            order_id=event["data"]["order_id"],
            customer_name=event["data"]["customer_name"],
            total=event["data"]["total"],
            status="created",
            item_count=len(event["data"]["items"]),
        )
        self.db.add(summary)
        await self.db.commit()

    async def handle_order_shipped(self, event: dict):
        summary = await self.db.get(OrderSummary, event["data"]["order_id"])
        summary.status = "shipped"
        summary.tracking_number = event["data"]["tracking"]
        await self.db.commit()

Testing Event-Driven Systems

Unit Testing with In-Memory Broker

class InMemoryBroker:
    def __init__(self):
        self.published: list[dict] = []
        self.handlers: dict[str, list] = {}

    async def publish(self, topic: str, event: dict):
        self.published.append({"topic": topic, "event": event})
        for handler in self.handlers.get(topic, []):
            await handler(event)

    def subscribe(self, topic: str, handler):
        self.handlers.setdefault(topic, []).append(handler)

# Test
async def test_order_creation_publishes_event():
    broker = InMemoryBroker()
    service = OrderService(broker=broker)

    await service.create_order(customer_id="alice", items=[{"id": "prod-1", "qty": 2}])

    assert len(broker.published) == 1
    assert broker.published[0]["topic"] == "order.created"
    assert broker.published[0]["event"]["data"]["customer_id"] == "alice"

Contract Testing for Event Schemas

from pydantic import ValidationError

def test_order_created_event_schema():
    """Ensure producers and consumers agree on event structure."""
    raw_event = {
        "event_id": "abc-123",
        "event_type": "OrderPlaced",
        "timestamp": "2026-03-27T12:00:00Z",
        "data": {
            "order_id": "order-456",
            "customer_id": "cust-789",
            "items": [{"product_id": "p1", "quantity": 2, "unit_price": 9.99}],
            "total_amount": 19.98,
            "currency": "USD",
        }
    }
    # Both producer and consumer validate against the same schema
    event = OrderPlacedEvent(**raw_event)
    assert event.data.total_amount == 19.98

Monitoring Event-Driven Systems

Key metrics to track:

MetricWhat It Tells YouAlert Threshold
Consumer lagHow far behind a consumer is> 10,000 messages
Event processing timeHow long handlers takep99 > 5 seconds
Dead letter queue depthHow many events permanently failed> 0 (investigate every failure)
Event publish rateSystem throughputSudden drops or spikes
Consumer group rebalancesStability of consumer assignments> 5 per hour
from prometheus_client import Counter, Histogram, Gauge

events_published = Counter("events_published_total", "Total events published", ["event_type"])
events_processed = Counter("events_processed_total", "Total events processed", ["event_type", "status"])
processing_duration = Histogram("event_processing_seconds", "Event processing duration", ["event_type"])
consumer_lag = Gauge("consumer_lag_messages", "Consumer lag in messages", ["consumer_group", "topic"])

The one thing to remember: Production event-driven systems in Python require idempotent consumers, a dead-letter strategy for failed events, and the discipline to treat event schemas as contracts — getting these right determines whether your architecture is resilient or a debugging nightmare.

pythonarchitectureevents

See Also