Python Event Bus Patterns — Deep Dive

Production-Grade Event Bus Implementation

A real event bus needs more than subscribe/publish. Production requirements include error isolation, middleware support, type safety, and handler priority.

from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any, Callable, TypeVar
import asyncio
import logging

logger = logging.getLogger(__name__)

T = TypeVar("T")

@dataclass
class Subscription:
    handler: Callable
    priority: int = 0
    error_handler: Callable | None = None

class EventBus:
    def __init__(self):
        self._subscriptions: dict[type, list[Subscription]] = defaultdict(list)
        self._middleware: list[Callable] = []

    def subscribe(
        self,
        event_type: type,
        handler: Callable,
        priority: int = 0,
        on_error: Callable | None = None,
    ):
        sub = Subscription(handler=handler, priority=priority, error_handler=on_error)
        self._subscriptions[event_type].append(sub)
        self._subscriptions[event_type].sort(key=lambda s: s.priority, reverse=True)

    def add_middleware(self, middleware: Callable):
        self._middleware.append(middleware)

    async def publish(self, event: Any) -> list[Exception]:
        errors = []
        processed_event = event
        for mw in self._middleware:
            processed_event = await mw(processed_event)
            if processed_event is None:
                return errors  # Middleware filtered the event

        for sub in self._subscriptions.get(type(event), []):
            try:
                result = sub.handler(processed_event)
                if asyncio.iscoroutine(result):
                    await result
            except Exception as exc:
                errors.append(exc)
                if sub.error_handler:
                    try:
                        sub.error_handler(exc, processed_event)
                    except Exception:
                        logger.exception("Error handler failed")
                else:
                    logger.exception(
                        f"Handler {sub.handler.__name__} failed for {type(event).__name__}"
                    )
        return errors

Key design decisions:

  • Error isolation — one failing handler does not prevent others from running.
  • Priority ordering — high-priority handlers (e.g., validation) run before lower-priority ones (e.g., analytics).
  • Middleware chain — cross-cutting concerns (logging, metrics, authorization) intercept events before handlers.
  • Error collectionpublish returns exceptions so callers can decide how to handle failures.

Middleware Patterns

Middleware transforms or filters events before they reach handlers:

async def logging_middleware(event):
    logger.info(f"Event published: {type(event).__name__}")
    return event

async def authorization_middleware(event):
    if hasattr(event, "user_id"):
        if not await check_permissions(event.user_id, type(event).__name__):
            logger.warning(f"Unauthorized event from {event.user_id}")
            return None  # Filter out unauthorized events
    return event

async def metrics_middleware(event):
    start = time.monotonic()
    # Middleware doesn't await handlers; it just transforms/filters
    EVENT_COUNTER.labels(event_type=type(event).__name__).inc()
    return event

bus.add_middleware(logging_middleware)
bus.add_middleware(authorization_middleware)
bus.add_middleware(metrics_middleware)

Middleware runs in registration order. Returning None from any middleware stops the event from reaching handlers.

Typed Event Hierarchies

Python’s type system enables event hierarchies where handlers can subscribe to base classes:

from dataclasses import dataclass

@dataclass
class DomainEvent:
    timestamp: float = field(default_factory=time.time)

@dataclass
class OrderEvent(DomainEvent):
    order_id: str = ""

@dataclass
class OrderPlaced(OrderEvent):
    user_id: str = ""
    amount: float = 0.0

@dataclass
class OrderCancelled(OrderEvent):
    reason: str = ""

To support hierarchy-based subscription, modify the bus to check parent classes:

async def publish(self, event):
    handler_types = set()
    for cls in type(event).__mro__:
        if cls in self._subscriptions:
            handler_types.add(cls)

    for event_type in handler_types:
        for sub in self._subscriptions[event_type]:
            # ... execute handler

Now a handler subscribed to OrderEvent receives both OrderPlaced and OrderCancelled events. A handler subscribed to DomainEvent receives everything. This is powerful for cross-cutting concerns like audit logging.

Distributed Event Bus with Redis

For multi-process or multi-service deployments, extend the in-process bus with Redis Pub/Sub:

import aioredis
import json
import pickle

class DistributedEventBus:
    def __init__(self, local_bus: EventBus, redis_url: str, service_id: str):
        self.local = local_bus
        self.redis_url = redis_url
        self.service_id = service_id

    async def publish(self, event, local_only=False):
        await self.local.publish(event)
        if not local_only:
            redis = aioredis.from_url(self.redis_url)
            payload = json.dumps({
                "service": self.service_id,
                "event_type": f"{type(event).__module__}.{type(event).__qualname__}",
                "data": event.__dict__,
            })
            await redis.publish("events", payload)

    async def listen(self):
        redis = aioredis.from_url(self.redis_url)
        pubsub = redis.pubsub()
        await pubsub.subscribe("events")
        async for msg in pubsub.listen():
            if msg["type"] != "message":
                continue
            payload = json.loads(msg["data"])
            if payload["service"] == self.service_id:
                continue  # Skip own events
            event = reconstruct_event(payload)
            if event:
                await self.local.publish(event)

This bridges in-process and distributed event handling. Local handlers react immediately; remote services receive events through Redis with minimal added latency.

Saga Coordination with Events

For multi-step business processes, events coordinate saga steps:

@dataclass
class PaymentProcessed(DomainEvent):
    order_id: str = ""
    transaction_id: str = ""

@dataclass
class InventoryReserved(DomainEvent):
    order_id: str = ""
    warehouse: str = ""

@dataclass
class OrderFulfillmentStarted(DomainEvent):
    order_id: str = ""

class FulfillmentSaga:
    def __init__(self, bus: EventBus):
        self.pending: dict[str, dict] = {}
        bus.subscribe(PaymentProcessed, self.on_payment)
        bus.subscribe(InventoryReserved, self.on_inventory)

    def on_payment(self, event: PaymentProcessed):
        state = self.pending.setdefault(event.order_id, {})
        state["payment"] = True
        self._check_complete(event.order_id)

    def on_inventory(self, event: InventoryReserved):
        state = self.pending.setdefault(event.order_id, {})
        state["inventory"] = True
        self._check_complete(event.order_id)

    def _check_complete(self, order_id: str):
        state = self.pending.get(order_id, {})
        if state.get("payment") and state.get("inventory"):
            del self.pending[order_id]
            bus.publish(OrderFulfillmentStarted(order_id=order_id))

The saga waits for both payment and inventory events before starting fulfillment. Each event can arrive in any order.

Testing Event-Driven Code

Testing event bus systems requires capturing published events:

class TestEventBus(EventBus):
    def __init__(self):
        super().__init__()
        self.published: list = []

    async def publish(self, event):
        self.published.append(event)
        return await super().publish(event)

# In tests
async def test_order_placement_publishes_event():
    bus = TestEventBus()
    service = OrderService(bus)

    await service.place_order(user_id="u1", amount=49.99)

    assert len(bus.published) == 1
    assert isinstance(bus.published[0], OrderPlaced)
    assert bus.published[0].user_id == "u1"

For integration tests, verify that the full chain works:

async def test_order_triggers_email_and_inventory():
    bus = TestEventBus()
    email_sent = []
    inventory_reserved = []

    bus.subscribe(OrderPlaced, lambda e: email_sent.append(e))
    bus.subscribe(OrderPlaced, lambda e: inventory_reserved.append(e))

    await bus.publish(OrderPlaced(order_id="1", user_id="u1", amount=50))

    assert len(email_sent) == 1
    assert len(inventory_reserved) == 1

Anti-Patterns

  1. Event storms — handler A publishes event B, whose handler publishes event C, whose handler publishes event A. Use cycle detection or limit publish depth.
  2. Fat events — events carrying entire database rows instead of just IDs and relevant changes. Keep events small; let handlers fetch additional data if needed.
  3. Synchronous distributed bus — publishing to Redis and waiting for all remote handlers to complete defeats the purpose of async decoupling.
  4. No event versioning — changing an event’s fields without versioning breaks consumers. Include a version field and support multiple versions during migration periods.

Libraries and Frameworks

LibraryUse CaseKey Feature
blinkerFlask ecosystem, simple signalsNamed signals, weak references
pyeeNode.js-style event emitterFamiliar API, async support
pypubsubDesktop apps (wxPython)Topic-based routing
dry-python/eventsDomain-driven designTyped events, integration with dry-python ecosystem
CustomFull controlExactly the features you need

For most production applications, a custom implementation (50–100 lines) provides exactly what you need without external dependencies.

The one thing to remember: A production event bus in Python needs error isolation between handlers, middleware for cross-cutting concerns, and clear boundaries between in-process events and distributed messaging — without these, the decoupling benefit turns into debugging chaos.

pythonevent-busarchitecturepatterns

See Also