Event Sourcing in Python — Deep Dive

Building an event-sourced system in Python

This guide implements event sourcing for a banking account system. We will build the event model, aggregate, event store, projections, and snapshot mechanism from scratch using standard Python libraries.

Event base class

from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from uuid import UUID, uuid4
import json

@dataclass(frozen=True)
class Event:
    event_id: UUID = field(default_factory=uuid4)
    occurred_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    version: int = 0

    def to_dict(self) -> dict:
        return {
            "event_type": type(self).__name__,
            "event_id": str(self.event_id),
            "occurred_at": self.occurred_at.isoformat(),
            "version": self.version,
            "data": self._data_dict(),
        }

    def _data_dict(self) -> dict:
        raise NotImplementedError

Domain events

@dataclass(frozen=True)
class AccountOpened(Event):
    account_id: UUID = None
    owner_name: str = ""
    initial_deposit: Decimal = Decimal("0")

    def _data_dict(self) -> dict:
        return {
            "account_id": str(self.account_id),
            "owner_name": self.owner_name,
            "initial_deposit": str(self.initial_deposit),
        }

@dataclass(frozen=True)
class MoneyDeposited(Event):
    account_id: UUID = None
    amount: Decimal = Decimal("0")
    reference: str = ""

    def _data_dict(self) -> dict:
        return {
            "account_id": str(self.account_id),
            "amount": str(self.amount),
            "reference": self.reference,
        }

@dataclass(frozen=True)
class MoneyWithdrawn(Event):
    account_id: UUID = None
    amount: Decimal = Decimal("0")
    reference: str = ""

    def _data_dict(self) -> dict:
        return {
            "account_id": str(self.account_id),
            "amount": str(self.amount),
            "reference": self.reference,
        }

@dataclass(frozen=True)
class AccountFrozen(Event):
    account_id: UUID = None
    reason: str = ""

    def _data_dict(self) -> dict:
        return {"account_id": str(self.account_id), "reason": self.reason}

@dataclass(frozen=True)
class AccountClosed(Event):
    account_id: UUID = None
    closing_balance: Decimal = Decimal("0")

    def _data_dict(self) -> dict:
        return {
            "account_id": str(self.account_id),
            "closing_balance": str(self.closing_balance),
        }

The aggregate

The BankAccount aggregate rebuilds its state from events and validates business rules before emitting new events.

from enum import Enum

class AccountStatus(Enum):
    ACTIVE = "active"
    FROZEN = "frozen"
    CLOSED = "closed"

class BankAccount:
    def __init__(self, account_id: UUID) -> None:
        self.id = account_id
        self.owner_name = ""
        self.balance = Decimal("0")
        self.status = AccountStatus.ACTIVE
        self.version = 0
        self._pending_events: list[Event] = []

    # --- Event application (state reconstruction) ---

    def apply(self, event: Event) -> None:
        handler = getattr(self, f"_apply_{type(event).__name__}", None)
        if handler is None:
            raise ValueError(f"Unknown event type: {type(event).__name__}")
        handler(event)
        self.version = event.version

    def _apply_AccountOpened(self, event: AccountOpened) -> None:
        self.owner_name = event.owner_name
        self.balance = event.initial_deposit
        self.status = AccountStatus.ACTIVE

    def _apply_MoneyDeposited(self, event: MoneyDeposited) -> None:
        self.balance += event.amount

    def _apply_MoneyWithdrawn(self, event: MoneyWithdrawn) -> None:
        self.balance -= event.amount

    def _apply_AccountFrozen(self, event: AccountFrozen) -> None:
        self.status = AccountStatus.FROZEN

    def _apply_AccountClosed(self, event: AccountClosed) -> None:
        self.status = AccountStatus.CLOSED
        self.balance = Decimal("0")

    # --- Commands (business logic) ---

    def deposit(self, amount: Decimal, reference: str = "") -> None:
        if self.status != AccountStatus.ACTIVE:
            raise ValueError(f"Cannot deposit to {self.status.value} account")
        if amount <= 0:
            raise ValueError("Deposit amount must be positive")
        event = MoneyDeposited(
            account_id=self.id,
            amount=amount,
            reference=reference,
            version=self.version + 1,
        )
        self.apply(event)
        self._pending_events.append(event)

    def withdraw(self, amount: Decimal, reference: str = "") -> None:
        if self.status != AccountStatus.ACTIVE:
            raise ValueError(f"Cannot withdraw from {self.status.value} account")
        if amount <= 0:
            raise ValueError("Withdrawal amount must be positive")
        if amount > self.balance:
            raise ValueError(
                f"Insufficient funds: balance={self.balance}, requested={amount}"
            )
        event = MoneyWithdrawn(
            account_id=self.id,
            amount=amount,
            reference=reference,
            version=self.version + 1,
        )
        self.apply(event)
        self._pending_events.append(event)

    def freeze(self, reason: str) -> None:
        if self.status != AccountStatus.ACTIVE:
            raise ValueError("Only active accounts can be frozen")
        event = AccountFrozen(
            account_id=self.id, reason=reason, version=self.version + 1
        )
        self.apply(event)
        self._pending_events.append(event)

    def close(self) -> None:
        if self.status == AccountStatus.CLOSED:
            raise ValueError("Account is already closed")
        event = AccountClosed(
            account_id=self.id,
            closing_balance=self.balance,
            version=self.version + 1,
        )
        self.apply(event)
        self._pending_events.append(event)

    @property
    def pending_events(self) -> list[Event]:
        return list(self._pending_events)

    def clear_pending(self) -> None:
        self._pending_events.clear()

    # --- Factory ---

    @classmethod
    def open(cls, account_id: UUID, owner: str, deposit: Decimal) -> "BankAccount":
        if deposit < Decimal("100"):
            raise ValueError("Minimum opening deposit is $100")
        account = cls(account_id)
        event = AccountOpened(
            account_id=account_id,
            owner_name=owner,
            initial_deposit=deposit,
            version=1,
        )
        account.apply(event)
        account._pending_events.append(event)
        return account

The key pattern: each command method validates business rules, creates an event, applies it to update state, and stores it in the pending list.

Event store

PostgreSQL implementation

import psycopg2
from psycopg2.extras import execute_values

class PostgresEventStore:
    def __init__(self, connection_string: str) -> None:
        self._conn_str = connection_string

    def setup(self) -> None:
        with psycopg2.connect(self._conn_str) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS events (
                    id UUID PRIMARY KEY,
                    stream_id UUID NOT NULL,
                    event_type VARCHAR(100) NOT NULL,
                    version INT NOT NULL,
                    data JSONB NOT NULL,
                    occurred_at TIMESTAMPTZ NOT NULL,
                    UNIQUE(stream_id, version)
                );
                CREATE INDEX IF NOT EXISTS idx_events_stream
                    ON events(stream_id, version);
            """)

    def append(self, stream_id: UUID, events: list[Event],
               expected_version: int) -> None:
        """Append events with optimistic concurrency control."""
        with psycopg2.connect(self._conn_str) as conn:
            with conn.cursor() as cur:
                # Check current version for optimistic locking
                cur.execute(
                    "SELECT MAX(version) FROM events WHERE stream_id = %s",
                    (str(stream_id),)
                )
                current = cur.fetchone()[0] or 0
                if current != expected_version:
                    raise ConcurrencyError(
                        f"Expected version {expected_version}, "
                        f"found {current}"
                    )

                rows = [
                    (str(e.event_id), str(stream_id), type(e).__name__,
                     e.version, json.dumps(e._data_dict()),
                     e.occurred_at.isoformat())
                    for e in events
                ]
                execute_values(cur, """
                    INSERT INTO events (id, stream_id, event_type, version,
                                       data, occurred_at)
                    VALUES %s
                """, rows)

    def load(self, stream_id: UUID, after_version: int = 0) -> list[Event]:
        with psycopg2.connect(self._conn_str) as conn:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT event_type, version, data, occurred_at
                    FROM events
                    WHERE stream_id = %s AND version > %s
                    ORDER BY version
                """, (str(stream_id), after_version))

                return [
                    self._deserialize(row[0], row[1], row[2], row[3])
                    for row in cur.fetchall()
                ]

    def _deserialize(self, event_type, version, data, occurred_at) -> Event:
        registry = {
            "AccountOpened": AccountOpened,
            "MoneyDeposited": MoneyDeposited,
            "MoneyWithdrawn": MoneyWithdrawn,
            "AccountFrozen": AccountFrozen,
            "AccountClosed": AccountClosed,
        }
        cls = registry[event_type]
        return cls(version=version, occurred_at=occurred_at, **data)

The UNIQUE(stream_id, version) constraint provides optimistic concurrency control at the database level.

Reconstituting aggregates

class AccountRepository:
    def __init__(self, event_store: PostgresEventStore) -> None:
        self._store = event_store

    def get(self, account_id: UUID) -> BankAccount:
        events = self._store.load(account_id)
        if not events:
            raise KeyError(f"Account {account_id} not found")
        account = BankAccount(account_id)
        for event in events:
            account.apply(event)
        return account

    def save(self, account: BankAccount) -> None:
        pending = account.pending_events
        if not pending:
            return
        expected = account.version - len(pending)
        self._store.append(account.id, pending, expected)
        account.clear_pending()

Projections

Projections transform the event stream into queryable read models.

class AccountBalanceProjection:
    """Maintains a denormalized balance table for fast lookups."""

    def __init__(self, conn_string: str) -> None:
        self._conn_str = conn_string

    def handle(self, event: Event) -> None:
        handler = getattr(self, f"_on_{type(event).__name__}", None)
        if handler:
            handler(event)

    def _on_AccountOpened(self, event: AccountOpened) -> None:
        with psycopg2.connect(self._conn_str) as conn:
            conn.execute("""
                INSERT INTO account_balances (id, owner, balance, status)
                VALUES (%s, %s, %s, 'active')
            """, (str(event.account_id), event.owner_name,
                  str(event.initial_deposit)))

    def _on_MoneyDeposited(self, event: MoneyDeposited) -> None:
        with psycopg2.connect(self._conn_str) as conn:
            conn.execute("""
                UPDATE account_balances
                SET balance = balance + %s
                WHERE id = %s
            """, (str(event.amount), str(event.account_id)))

    def _on_MoneyWithdrawn(self, event: MoneyWithdrawn) -> None:
        with psycopg2.connect(self._conn_str) as conn:
            conn.execute("""
                UPDATE account_balances
                SET balance = balance - %s
                WHERE id = %s
            """, (str(event.amount), str(event.account_id)))

Projections can be rebuilt from scratch by replaying all events — useful when adding new read models or fixing projection bugs.

Snapshots

For accounts with thousands of transactions, replaying from the beginning becomes slow. Snapshots solve this.

@dataclass
class AccountSnapshot:
    account_id: UUID
    owner_name: str
    balance: Decimal
    status: str
    version: int
    created_at: datetime

class SnapshotStore:
    def save(self, snapshot: AccountSnapshot) -> None: ...
    def load(self, account_id: UUID) -> AccountSnapshot | None: ...

class SnapshottingAccountRepo:
    def __init__(self, event_store, snapshot_store, snapshot_interval=100):
        self._events = event_store
        self._snapshots = snapshot_store
        self._interval = snapshot_interval

    def get(self, account_id: UUID) -> BankAccount:
        snapshot = self._snapshots.load(account_id)

        account = BankAccount(account_id)
        start_version = 0

        if snapshot:
            account.owner_name = snapshot.owner_name
            account.balance = snapshot.balance
            account.status = AccountStatus(snapshot.status)
            account.version = snapshot.version
            start_version = snapshot.version

        events = self._events.load(account_id, after_version=start_version)
        for event in events:
            account.apply(event)

        return account

    def save(self, account: BankAccount) -> None:
        pending = account.pending_events
        if not pending:
            return
        expected = account.version - len(pending)
        self._events.append(account.id, pending, expected)
        account.clear_pending()

        # Create snapshot every N events
        if account.version % self._interval == 0:
            self._snapshots.save(AccountSnapshot(
                account_id=account.id,
                owner_name=account.owner_name,
                balance=account.balance,
                status=account.status.value,
                version=account.version,
                created_at=datetime.now(timezone.utc),
            ))

Event versioning and upcasting

When event schemas evolve, you need a strategy to handle old events.

class EventUpcaster:
    """Transforms old event versions to current schema during loading."""

    def upcast(self, event_type: str, version: int, data: dict) -> dict:
        key = (event_type, version)
        if key == ("MoneyDeposited", 1):
            # V1 didn't have 'reference' field — add default
            data.setdefault("reference", "legacy-deposit")
        return data

Upcasting transforms events at read time, keeping the stored events immutable while supporting schema evolution.

Testing

def test_deposit_and_withdraw():
    account = BankAccount.open(uuid4(), "Alice", Decimal("500"))
    account.deposit(Decimal("200"), "salary")
    account.withdraw(Decimal("50"), "coffee")
    assert account.balance == Decimal("650")
    assert len(account.pending_events) == 3

def test_reconstruct_from_events():
    account_id = uuid4()
    original = BankAccount.open(account_id, "Bob", Decimal("1000"))
    original.deposit(Decimal("500"))
    original.withdraw(Decimal("200"))

    # Simulate loading from event store
    restored = BankAccount(account_id)
    for event in original.pending_events:
        restored.apply(event)

    assert restored.balance == original.balance
    assert restored.version == original.version

def test_concurrent_modification_detected():
    store = PostgresEventStore(test_conn)
    account = BankAccount.open(uuid4(), "Eve", Decimal("100"))
    store.append(account.id, account.pending_events, 0)

    # Simulate two concurrent loads
    a1 = repo.get(account.id)
    a2 = repo.get(account.id)
    a1.deposit(Decimal("50"))
    a2.deposit(Decimal("75"))

    repo.save(a1)  # Succeeds
    with pytest.raises(ConcurrencyError):
        repo.save(a2)  # Fails — version conflict

When event sourcing fits

Good fitPoor fit
Audit requirements (finance, healthcare)Simple CRUD with no history needs
Temporal queries (“state at time T”)High-write, low-read workloads
Event-driven microservicesSmall teams unfamiliar with the pattern
Complex domain with rich business rulesDomains where current state is all that matters

Performance considerations

  • Event loading: For aggregates with few events (under 1000), replaying is fast. Beyond that, use snapshots.
  • Projection lag: Projections update asynchronously. Design UIs to tolerate brief delays or use synchronous projections for critical reads.
  • Storage: Events are append-only and grow indefinitely. Plan for archiving old events or compacting streams.

The one thing to remember: Event sourcing in Python stores every state change as an immutable event, uses aggregate replay for consistency, and builds projections for queries — giving you a complete, auditable history with the flexibility to derive any view of the data.

pythonarchitectureevent-sourcing

See Also