Python Event Bus Patterns — Deep Dive
Production-Grade Event Bus Implementation
A real event bus needs more than subscribe/publish. Production requirements include error isolation, middleware support, type safety, and handler priority.
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any, Callable, TypeVar
import asyncio
import logging
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class Subscription:
handler: Callable
priority: int = 0
error_handler: Callable | None = None
class EventBus:
def __init__(self):
self._subscriptions: dict[type, list[Subscription]] = defaultdict(list)
self._middleware: list[Callable] = []
def subscribe(
self,
event_type: type,
handler: Callable,
priority: int = 0,
on_error: Callable | None = None,
):
sub = Subscription(handler=handler, priority=priority, error_handler=on_error)
self._subscriptions[event_type].append(sub)
self._subscriptions[event_type].sort(key=lambda s: s.priority, reverse=True)
def add_middleware(self, middleware: Callable):
self._middleware.append(middleware)
async def publish(self, event: Any) -> list[Exception]:
errors = []
processed_event = event
for mw in self._middleware:
processed_event = await mw(processed_event)
if processed_event is None:
return errors # Middleware filtered the event
for sub in self._subscriptions.get(type(event), []):
try:
result = sub.handler(processed_event)
if asyncio.iscoroutine(result):
await result
except Exception as exc:
errors.append(exc)
if sub.error_handler:
try:
sub.error_handler(exc, processed_event)
except Exception:
logger.exception("Error handler failed")
else:
logger.exception(
f"Handler {sub.handler.__name__} failed for {type(event).__name__}"
)
return errors
Key design decisions:
- Error isolation — one failing handler does not prevent others from running.
- Priority ordering — high-priority handlers (e.g., validation) run before lower-priority ones (e.g., analytics).
- Middleware chain — cross-cutting concerns (logging, metrics, authorization) intercept events before handlers.
- Error collection —
publishreturns exceptions so callers can decide how to handle failures.
Middleware Patterns
Middleware transforms or filters events before they reach handlers:
async def logging_middleware(event):
logger.info(f"Event published: {type(event).__name__}")
return event
async def authorization_middleware(event):
if hasattr(event, "user_id"):
if not await check_permissions(event.user_id, type(event).__name__):
logger.warning(f"Unauthorized event from {event.user_id}")
return None # Filter out unauthorized events
return event
async def metrics_middleware(event):
start = time.monotonic()
# Middleware doesn't await handlers; it just transforms/filters
EVENT_COUNTER.labels(event_type=type(event).__name__).inc()
return event
bus.add_middleware(logging_middleware)
bus.add_middleware(authorization_middleware)
bus.add_middleware(metrics_middleware)
Middleware runs in registration order. Returning None from any middleware stops the event from reaching handlers.
Typed Event Hierarchies
Python’s type system enables event hierarchies where handlers can subscribe to base classes:
from dataclasses import dataclass
@dataclass
class DomainEvent:
timestamp: float = field(default_factory=time.time)
@dataclass
class OrderEvent(DomainEvent):
order_id: str = ""
@dataclass
class OrderPlaced(OrderEvent):
user_id: str = ""
amount: float = 0.0
@dataclass
class OrderCancelled(OrderEvent):
reason: str = ""
To support hierarchy-based subscription, modify the bus to check parent classes:
async def publish(self, event):
handler_types = set()
for cls in type(event).__mro__:
if cls in self._subscriptions:
handler_types.add(cls)
for event_type in handler_types:
for sub in self._subscriptions[event_type]:
# ... execute handler
Now a handler subscribed to OrderEvent receives both OrderPlaced and OrderCancelled events. A handler subscribed to DomainEvent receives everything. This is powerful for cross-cutting concerns like audit logging.
Distributed Event Bus with Redis
For multi-process or multi-service deployments, extend the in-process bus with Redis Pub/Sub:
import aioredis
import json
import pickle
class DistributedEventBus:
def __init__(self, local_bus: EventBus, redis_url: str, service_id: str):
self.local = local_bus
self.redis_url = redis_url
self.service_id = service_id
async def publish(self, event, local_only=False):
await self.local.publish(event)
if not local_only:
redis = aioredis.from_url(self.redis_url)
payload = json.dumps({
"service": self.service_id,
"event_type": f"{type(event).__module__}.{type(event).__qualname__}",
"data": event.__dict__,
})
await redis.publish("events", payload)
async def listen(self):
redis = aioredis.from_url(self.redis_url)
pubsub = redis.pubsub()
await pubsub.subscribe("events")
async for msg in pubsub.listen():
if msg["type"] != "message":
continue
payload = json.loads(msg["data"])
if payload["service"] == self.service_id:
continue # Skip own events
event = reconstruct_event(payload)
if event:
await self.local.publish(event)
This bridges in-process and distributed event handling. Local handlers react immediately; remote services receive events through Redis with minimal added latency.
Saga Coordination with Events
For multi-step business processes, events coordinate saga steps:
@dataclass
class PaymentProcessed(DomainEvent):
order_id: str = ""
transaction_id: str = ""
@dataclass
class InventoryReserved(DomainEvent):
order_id: str = ""
warehouse: str = ""
@dataclass
class OrderFulfillmentStarted(DomainEvent):
order_id: str = ""
class FulfillmentSaga:
def __init__(self, bus: EventBus):
self.pending: dict[str, dict] = {}
bus.subscribe(PaymentProcessed, self.on_payment)
bus.subscribe(InventoryReserved, self.on_inventory)
def on_payment(self, event: PaymentProcessed):
state = self.pending.setdefault(event.order_id, {})
state["payment"] = True
self._check_complete(event.order_id)
def on_inventory(self, event: InventoryReserved):
state = self.pending.setdefault(event.order_id, {})
state["inventory"] = True
self._check_complete(event.order_id)
def _check_complete(self, order_id: str):
state = self.pending.get(order_id, {})
if state.get("payment") and state.get("inventory"):
del self.pending[order_id]
bus.publish(OrderFulfillmentStarted(order_id=order_id))
The saga waits for both payment and inventory events before starting fulfillment. Each event can arrive in any order.
Testing Event-Driven Code
Testing event bus systems requires capturing published events:
class TestEventBus(EventBus):
def __init__(self):
super().__init__()
self.published: list = []
async def publish(self, event):
self.published.append(event)
return await super().publish(event)
# In tests
async def test_order_placement_publishes_event():
bus = TestEventBus()
service = OrderService(bus)
await service.place_order(user_id="u1", amount=49.99)
assert len(bus.published) == 1
assert isinstance(bus.published[0], OrderPlaced)
assert bus.published[0].user_id == "u1"
For integration tests, verify that the full chain works:
async def test_order_triggers_email_and_inventory():
bus = TestEventBus()
email_sent = []
inventory_reserved = []
bus.subscribe(OrderPlaced, lambda e: email_sent.append(e))
bus.subscribe(OrderPlaced, lambda e: inventory_reserved.append(e))
await bus.publish(OrderPlaced(order_id="1", user_id="u1", amount=50))
assert len(email_sent) == 1
assert len(inventory_reserved) == 1
Anti-Patterns
- Event storms — handler A publishes event B, whose handler publishes event C, whose handler publishes event A. Use cycle detection or limit publish depth.
- Fat events — events carrying entire database rows instead of just IDs and relevant changes. Keep events small; let handlers fetch additional data if needed.
- Synchronous distributed bus — publishing to Redis and waiting for all remote handlers to complete defeats the purpose of async decoupling.
- No event versioning — changing an event’s fields without versioning breaks consumers. Include a
versionfield and support multiple versions during migration periods.
Libraries and Frameworks
| Library | Use Case | Key Feature |
|---|---|---|
| blinker | Flask ecosystem, simple signals | Named signals, weak references |
| pyee | Node.js-style event emitter | Familiar API, async support |
| pypubsub | Desktop apps (wxPython) | Topic-based routing |
| dry-python/events | Domain-driven design | Typed events, integration with dry-python ecosystem |
| Custom | Full control | Exactly the features you need |
For most production applications, a custom implementation (50–100 lines) provides exactly what you need without external dependencies.
The one thing to remember: A production event bus in Python needs error isolation between handlers, middleware for cross-cutting concerns, and clear boundaries between in-process events and distributed messaging — without these, the decoupling benefit turns into debugging chaos.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.