Python Event Emitter Patterns — Deep Dive
Building a Production Event Emitter
The minimal dict-of-lists approach works for prototypes but breaks down in production. Here’s a robust implementation addressing real-world concerns:
from __future__ import annotations
import asyncio
import weakref
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine
@dataclass
class Listener:
callback: Callable
once: bool = False
priority: int = 0
weak: bool = False
_ref: weakref.ref | None = field(default=None, repr=False)
class EventEmitter:
def __init__(self):
self._listeners: dict[str, list[Listener]] = defaultdict(list)
self._wildcard_listeners: list[Listener] = []
def on(
self,
event: str,
callback: Callable,
priority: int = 0,
weak: bool = False,
) -> Callable:
listener = Listener(
callback=callback,
priority=priority,
weak=weak,
)
if weak:
listener._ref = weakref.ref(callback, lambda _: self.off(event, callback))
self._listeners[event].append(listener)
self._listeners[event].sort(key=lambda l: l.priority, reverse=True)
return callback # allows use as decorator
def once(self, event: str, callback: Callable) -> Callable:
listener = Listener(callback=callback, once=True)
self._listeners[event].append(listener)
return callback
def off(self, event: str, callback: Callable) -> None:
self._listeners[event] = [
l for l in self._listeners[event] if l.callback != callback
]
def emit(self, event: str, *args: Any, **kwargs: Any) -> int:
listeners = self._listeners.get(event, []) + self._wildcard_listeners
to_remove = []
for listener in listeners:
listener.callback(*args, **kwargs)
if listener.once:
to_remove.append(listener)
for listener in to_remove:
if listener in self._listeners.get(event, []):
self._listeners[event].remove(listener)
return len(listeners)
def listener_count(self, event: str) -> int:
return len(self._listeners.get(event, []))
def remove_all_listeners(self, event: str | None = None) -> None:
if event:
self._listeners.pop(event, None)
else:
self._listeners.clear()
self._wildcard_listeners.clear()
Async Event Emitting
When listeners are coroutines, you need an async dispatch strategy. Three approaches, each with different tradeoffs:
Sequential Async Dispatch
Listeners run one at a time. Predictable order, but slow if listeners do I/O:
async def emit_async(self, event: str, *args, **kwargs) -> int:
listeners = self._listeners.get(event, [])
for listener in listeners:
result = listener.callback(*args, **kwargs)
if asyncio.iscoroutine(result):
await result
return len(listeners)
Concurrent Async Dispatch
All listeners run concurrently via gather. Fast, but execution order is nondeterministic:
async def emit_concurrent(self, event: str, *args, **kwargs) -> list:
listeners = self._listeners.get(event, [])
coros = []
for listener in listeners:
result = listener.callback(*args, **kwargs)
if asyncio.iscoroutine(result):
coros.append(result)
return await asyncio.gather(*coros, return_exceptions=True)
Fire-and-Forget Dispatch
Schedule listeners as tasks without waiting. The emitter returns immediately:
def emit_fire_and_forget(self, event: str, *args, **kwargs) -> int:
listeners = self._listeners.get(event, [])
loop = asyncio.get_running_loop()
for listener in listeners:
result = listener.callback(*args, **kwargs)
if asyncio.iscoroutine(result):
loop.create_task(result)
return len(listeners)
Choose based on your needs: sequential for pipelines with ordering dependencies, concurrent for independent I/O-bound listeners, fire-and-forget for non-critical side effects.
Typed Events with Dataclasses
String event names are error-prone — a typo silently fails. Typed events catch mistakes at development time:
from dataclasses import dataclass
from typing import TypeVar, Generic, Callable
T = TypeVar("T")
@dataclass(frozen=True)
class Event(Generic[T]):
name: str
# Define events as typed constants
@dataclass
class UserData:
user_id: int
email: str
USER_CREATED = Event[UserData]("user.created")
USER_DELETED = Event[int]("user.deleted") # just the user_id
class TypedEmitter:
def __init__(self):
self._listeners: dict[str, list[Callable]] = defaultdict(list)
def on(self, event: Event[T], callback: Callable[[T], None]) -> None:
self._listeners[event.name].append(callback)
def emit(self, event: Event[T], data: T) -> None:
for callback in self._listeners.get(event.name, []):
callback(data)
# Usage — IDE autocompletes the payload type
emitter = TypedEmitter()
def handle_user(data: UserData):
print(f"Welcome {data.email}")
emitter.on(USER_CREATED, handle_user)
emitter.emit(USER_CREATED, UserData(user_id=1, email="alice@example.com"))
Memory Leak Prevention
Event emitters are one of the most common sources of memory leaks in long-running Python applications. Three defense mechanisms:
Weak Reference Listeners
import weakref
class SafeEmitter:
def on_weak(self, event: str, obj: object, method_name: str):
ref = weakref.ref(obj, lambda _: self._cleanup(event, ref))
self._listeners[event].append((ref, method_name))
def _cleanup(self, event, dead_ref):
self._listeners[event] = [
(r, m) for r, m in self._listeners[event] if r is not dead_ref
]
Listener Limits
class BoundedEmitter(EventEmitter):
MAX_LISTENERS = 20
def on(self, event, callback, **kwargs):
if len(self._listeners.get(event, [])) >= self.MAX_LISTENERS:
import warnings
warnings.warn(
f"Possible memory leak: {event} has {self.MAX_LISTENERS}+ listeners",
ResourceWarning,
stacklevel=2,
)
return super().on(event, callback, **kwargs)
Scoped Subscriptions
Tie listener lifetime to a context manager:
from contextlib import contextmanager
@contextmanager
def scoped_listener(emitter, event, callback):
emitter.on(event, callback)
try:
yield
finally:
emitter.off(event, callback)
# Listener automatically removed when scope exits
with scoped_listener(emitter, "data", process_data):
run_data_pipeline()
Plugin Architecture with Events
Events enable plugin systems where third-party code extends behavior without modifying the core:
class Application:
def __init__(self):
self.events = EventEmitter()
self._plugins: list = []
def register_plugin(self, plugin):
self._plugins.append(plugin)
plugin.setup(self.events)
def handle_request(self, request):
self.events.emit("before_request", request)
response = self._process(request)
self.events.emit("after_request", request, response)
return response
# Plugin authors implement setup()
class LoggingPlugin:
def setup(self, events: EventEmitter):
events.on("before_request", self._log_request)
events.on("after_request", self._log_response)
def _log_request(self, request):
print(f"→ {request.method} {request.path}")
def _log_response(self, request, response):
print(f"← {response.status_code}")
class MetricsPlugin:
def setup(self, events: EventEmitter):
events.on("after_request", self._record_metrics)
def _record_metrics(self, request, response):
statsd.increment(f"http.{response.status_code}")
This pattern is used by Flask (signals), pytest (hooks), and Scrapy (signals) in production.
Event Middleware / Interceptors
Add cross-cutting concerns without modifying individual listeners:
class MiddlewareEmitter(EventEmitter):
def __init__(self):
super().__init__()
self._middleware: list[Callable] = []
def use(self, middleware: Callable):
self._middleware.append(middleware)
def emit(self, event, *args, **kwargs):
context = {"event": event, "args": args, "kwargs": kwargs, "cancelled": False}
for mw in self._middleware:
mw(context)
if context["cancelled"]:
return 0
return super().emit(event, *context["args"], **context["kwargs"])
# Example: rate-limiting middleware
def rate_limit_middleware(context):
event = context["event"]
now = time.monotonic()
if now - last_emit.get(event, 0) < 0.1:
context["cancelled"] = True
else:
last_emit[event] = now
Performance Benchmarks
For a simple emit with 10 listeners:
| Approach | Operations/sec | Notes |
|---|---|---|
| Direct function calls | ~5,000,000 | Baseline |
| Dict-of-lists emitter | ~1,200,000 | Minimal overhead |
| pyee synchronous | ~800,000 | Extra features cost |
| blinker signals | ~600,000 | Weak ref overhead |
| Async emit (sequential) | ~50,000 | Coroutine creation + await |
For most applications, emitter overhead is negligible compared to actual listener work (database queries, HTTP calls, file I/O). Optimize the listeners, not the dispatch.
Debugging Event-Driven Code
Event-driven code is notoriously hard to debug because the call stack doesn’t show why a listener was called. Mitigations:
- Event logging middleware — log every event emission with timestamp and payload summary
- Listener registration tracing — record the call site where each listener was registered (use
inspect.stack()) - Event flow diagrams — maintain documentation of which events trigger which listeners
- Correlation IDs — pass a unique ID through event payloads so you can trace a single user action across all its handlers
import inspect
import logging
class DebugEmitter(EventEmitter):
def on(self, event, callback, **kwargs):
frame = inspect.stack()[1]
logging.debug(
f"Listener registered: {event} → {callback.__qualname__} "
f"(from {frame.filename}:{frame.lineno})"
)
return super().on(event, callback, **kwargs)
One thing to remember: Event emitters are deceptively simple to implement but require discipline in production — typed events prevent silent failures, weak references prevent memory leaks, and middleware enables cross-cutting concerns without spaghetti listener code.
See Also
- Python Observer Vs Pubsub Two ways Python code can share news — one is like telling your friends directly, the other is like posting on a bulletin board for anyone to read.
- Python Rxpy Reactive Programming How RxPY lets Python code react to streams of data the way a news ticker reacts to breaking stories — automatically and in real time.
- Python State Machines Transitions How the transitions library helps Python code manage things that change between clear stages — like a traffic light that only goes green → yellow → red.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
- Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.