Python Event Emitter Patterns — Deep Dive

Building a Production Event Emitter

The minimal dict-of-lists approach works for prototypes but breaks down in production. Here’s a robust implementation addressing real-world concerns:

from __future__ import annotations
import asyncio
import weakref
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine

@dataclass
class Listener:
    callback: Callable
    once: bool = False
    priority: int = 0
    weak: bool = False
    _ref: weakref.ref | None = field(default=None, repr=False)

class EventEmitter:
    def __init__(self):
        self._listeners: dict[str, list[Listener]] = defaultdict(list)
        self._wildcard_listeners: list[Listener] = []

    def on(
        self,
        event: str,
        callback: Callable,
        priority: int = 0,
        weak: bool = False,
    ) -> Callable:
        listener = Listener(
            callback=callback,
            priority=priority,
            weak=weak,
        )
        if weak:
            listener._ref = weakref.ref(callback, lambda _: self.off(event, callback))

        self._listeners[event].append(listener)
        self._listeners[event].sort(key=lambda l: l.priority, reverse=True)
        return callback  # allows use as decorator

    def once(self, event: str, callback: Callable) -> Callable:
        listener = Listener(callback=callback, once=True)
        self._listeners[event].append(listener)
        return callback

    def off(self, event: str, callback: Callable) -> None:
        self._listeners[event] = [
            l for l in self._listeners[event] if l.callback != callback
        ]

    def emit(self, event: str, *args: Any, **kwargs: Any) -> int:
        listeners = self._listeners.get(event, []) + self._wildcard_listeners
        to_remove = []

        for listener in listeners:
            listener.callback(*args, **kwargs)
            if listener.once:
                to_remove.append(listener)

        for listener in to_remove:
            if listener in self._listeners.get(event, []):
                self._listeners[event].remove(listener)

        return len(listeners)

    def listener_count(self, event: str) -> int:
        return len(self._listeners.get(event, []))

    def remove_all_listeners(self, event: str | None = None) -> None:
        if event:
            self._listeners.pop(event, None)
        else:
            self._listeners.clear()
            self._wildcard_listeners.clear()

Async Event Emitting

When listeners are coroutines, you need an async dispatch strategy. Three approaches, each with different tradeoffs:

Sequential Async Dispatch

Listeners run one at a time. Predictable order, but slow if listeners do I/O:

async def emit_async(self, event: str, *args, **kwargs) -> int:
    listeners = self._listeners.get(event, [])
    for listener in listeners:
        result = listener.callback(*args, **kwargs)
        if asyncio.iscoroutine(result):
            await result
    return len(listeners)

Concurrent Async Dispatch

All listeners run concurrently via gather. Fast, but execution order is nondeterministic:

async def emit_concurrent(self, event: str, *args, **kwargs) -> list:
    listeners = self._listeners.get(event, [])
    coros = []
    for listener in listeners:
        result = listener.callback(*args, **kwargs)
        if asyncio.iscoroutine(result):
            coros.append(result)
    return await asyncio.gather(*coros, return_exceptions=True)

Fire-and-Forget Dispatch

Schedule listeners as tasks without waiting. The emitter returns immediately:

def emit_fire_and_forget(self, event: str, *args, **kwargs) -> int:
    listeners = self._listeners.get(event, [])
    loop = asyncio.get_running_loop()
    for listener in listeners:
        result = listener.callback(*args, **kwargs)
        if asyncio.iscoroutine(result):
            loop.create_task(result)
    return len(listeners)

Choose based on your needs: sequential for pipelines with ordering dependencies, concurrent for independent I/O-bound listeners, fire-and-forget for non-critical side effects.

Typed Events with Dataclasses

String event names are error-prone — a typo silently fails. Typed events catch mistakes at development time:

from dataclasses import dataclass
from typing import TypeVar, Generic, Callable

T = TypeVar("T")

@dataclass(frozen=True)
class Event(Generic[T]):
    name: str

# Define events as typed constants
@dataclass
class UserData:
    user_id: int
    email: str

USER_CREATED = Event[UserData]("user.created")
USER_DELETED = Event[int]("user.deleted")  # just the user_id

class TypedEmitter:
    def __init__(self):
        self._listeners: dict[str, list[Callable]] = defaultdict(list)

    def on(self, event: Event[T], callback: Callable[[T], None]) -> None:
        self._listeners[event.name].append(callback)

    def emit(self, event: Event[T], data: T) -> None:
        for callback in self._listeners.get(event.name, []):
            callback(data)

# Usage — IDE autocompletes the payload type
emitter = TypedEmitter()

def handle_user(data: UserData):
    print(f"Welcome {data.email}")

emitter.on(USER_CREATED, handle_user)
emitter.emit(USER_CREATED, UserData(user_id=1, email="alice@example.com"))

Memory Leak Prevention

Event emitters are one of the most common sources of memory leaks in long-running Python applications. Three defense mechanisms:

Weak Reference Listeners

import weakref

class SafeEmitter:
    def on_weak(self, event: str, obj: object, method_name: str):
        ref = weakref.ref(obj, lambda _: self._cleanup(event, ref))
        self._listeners[event].append((ref, method_name))

    def _cleanup(self, event, dead_ref):
        self._listeners[event] = [
            (r, m) for r, m in self._listeners[event] if r is not dead_ref
        ]

Listener Limits

class BoundedEmitter(EventEmitter):
    MAX_LISTENERS = 20

    def on(self, event, callback, **kwargs):
        if len(self._listeners.get(event, [])) >= self.MAX_LISTENERS:
            import warnings
            warnings.warn(
                f"Possible memory leak: {event} has {self.MAX_LISTENERS}+ listeners",
                ResourceWarning,
                stacklevel=2,
            )
        return super().on(event, callback, **kwargs)

Scoped Subscriptions

Tie listener lifetime to a context manager:

from contextlib import contextmanager

@contextmanager
def scoped_listener(emitter, event, callback):
    emitter.on(event, callback)
    try:
        yield
    finally:
        emitter.off(event, callback)

# Listener automatically removed when scope exits
with scoped_listener(emitter, "data", process_data):
    run_data_pipeline()

Plugin Architecture with Events

Events enable plugin systems where third-party code extends behavior without modifying the core:

class Application:
    def __init__(self):
        self.events = EventEmitter()
        self._plugins: list = []

    def register_plugin(self, plugin):
        self._plugins.append(plugin)
        plugin.setup(self.events)

    def handle_request(self, request):
        self.events.emit("before_request", request)
        response = self._process(request)
        self.events.emit("after_request", request, response)
        return response

# Plugin authors implement setup()
class LoggingPlugin:
    def setup(self, events: EventEmitter):
        events.on("before_request", self._log_request)
        events.on("after_request", self._log_response)

    def _log_request(self, request):
        print(f"→ {request.method} {request.path}")

    def _log_response(self, request, response):
        print(f"← {response.status_code}")

class MetricsPlugin:
    def setup(self, events: EventEmitter):
        events.on("after_request", self._record_metrics)

    def _record_metrics(self, request, response):
        statsd.increment(f"http.{response.status_code}")

This pattern is used by Flask (signals), pytest (hooks), and Scrapy (signals) in production.

Event Middleware / Interceptors

Add cross-cutting concerns without modifying individual listeners:

class MiddlewareEmitter(EventEmitter):
    def __init__(self):
        super().__init__()
        self._middleware: list[Callable] = []

    def use(self, middleware: Callable):
        self._middleware.append(middleware)

    def emit(self, event, *args, **kwargs):
        context = {"event": event, "args": args, "kwargs": kwargs, "cancelled": False}

        for mw in self._middleware:
            mw(context)
            if context["cancelled"]:
                return 0

        return super().emit(event, *context["args"], **context["kwargs"])

# Example: rate-limiting middleware
def rate_limit_middleware(context):
    event = context["event"]
    now = time.monotonic()
    if now - last_emit.get(event, 0) < 0.1:
        context["cancelled"] = True
    else:
        last_emit[event] = now

Performance Benchmarks

For a simple emit with 10 listeners:

ApproachOperations/secNotes
Direct function calls~5,000,000Baseline
Dict-of-lists emitter~1,200,000Minimal overhead
pyee synchronous~800,000Extra features cost
blinker signals~600,000Weak ref overhead
Async emit (sequential)~50,000Coroutine creation + await

For most applications, emitter overhead is negligible compared to actual listener work (database queries, HTTP calls, file I/O). Optimize the listeners, not the dispatch.

Debugging Event-Driven Code

Event-driven code is notoriously hard to debug because the call stack doesn’t show why a listener was called. Mitigations:

  1. Event logging middleware — log every event emission with timestamp and payload summary
  2. Listener registration tracing — record the call site where each listener was registered (use inspect.stack())
  3. Event flow diagrams — maintain documentation of which events trigger which listeners
  4. Correlation IDs — pass a unique ID through event payloads so you can trace a single user action across all its handlers
import inspect
import logging

class DebugEmitter(EventEmitter):
    def on(self, event, callback, **kwargs):
        frame = inspect.stack()[1]
        logging.debug(
            f"Listener registered: {event}{callback.__qualname__} "
            f"(from {frame.filename}:{frame.lineno})"
        )
        return super().on(event, callback, **kwargs)

One thing to remember: Event emitters are deceptively simple to implement but require discipline in production — typed events prevent silent failures, weak references prevent memory leaks, and middleware enables cross-cutting concerns without spaghetti listener code.

pythoneventsdesign-patterns

See Also

  • Python Observer Vs Pubsub Two ways Python code can share news — one is like telling your friends directly, the other is like posting on a bulletin board for anyone to read.
  • Python Rxpy Reactive Programming How RxPY lets Python code react to streams of data the way a news ticker reacts to breaking stories — automatically and in real time.
  • Python State Machines Transitions How the transitions library helps Python code manage things that change between clear stages — like a traffic light that only goes green → yellow → red.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.