Event Sourcing in Python — Deep Dive
Building an event-sourced system in Python
This guide implements event sourcing for a banking account system. We will build the event model, aggregate, event store, projections, and snapshot mechanism from scratch using standard Python libraries.
Event base class
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from uuid import UUID, uuid4
import json
@dataclass(frozen=True)
class Event:
event_id: UUID = field(default_factory=uuid4)
occurred_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
version: int = 0
def to_dict(self) -> dict:
return {
"event_type": type(self).__name__,
"event_id": str(self.event_id),
"occurred_at": self.occurred_at.isoformat(),
"version": self.version,
"data": self._data_dict(),
}
def _data_dict(self) -> dict:
raise NotImplementedError
Domain events
@dataclass(frozen=True)
class AccountOpened(Event):
account_id: UUID = None
owner_name: str = ""
initial_deposit: Decimal = Decimal("0")
def _data_dict(self) -> dict:
return {
"account_id": str(self.account_id),
"owner_name": self.owner_name,
"initial_deposit": str(self.initial_deposit),
}
@dataclass(frozen=True)
class MoneyDeposited(Event):
account_id: UUID = None
amount: Decimal = Decimal("0")
reference: str = ""
def _data_dict(self) -> dict:
return {
"account_id": str(self.account_id),
"amount": str(self.amount),
"reference": self.reference,
}
@dataclass(frozen=True)
class MoneyWithdrawn(Event):
account_id: UUID = None
amount: Decimal = Decimal("0")
reference: str = ""
def _data_dict(self) -> dict:
return {
"account_id": str(self.account_id),
"amount": str(self.amount),
"reference": self.reference,
}
@dataclass(frozen=True)
class AccountFrozen(Event):
account_id: UUID = None
reason: str = ""
def _data_dict(self) -> dict:
return {"account_id": str(self.account_id), "reason": self.reason}
@dataclass(frozen=True)
class AccountClosed(Event):
account_id: UUID = None
closing_balance: Decimal = Decimal("0")
def _data_dict(self) -> dict:
return {
"account_id": str(self.account_id),
"closing_balance": str(self.closing_balance),
}
The aggregate
The BankAccount aggregate rebuilds its state from events and validates business rules before emitting new events.
from enum import Enum
class AccountStatus(Enum):
ACTIVE = "active"
FROZEN = "frozen"
CLOSED = "closed"
class BankAccount:
def __init__(self, account_id: UUID) -> None:
self.id = account_id
self.owner_name = ""
self.balance = Decimal("0")
self.status = AccountStatus.ACTIVE
self.version = 0
self._pending_events: list[Event] = []
# --- Event application (state reconstruction) ---
def apply(self, event: Event) -> None:
handler = getattr(self, f"_apply_{type(event).__name__}", None)
if handler is None:
raise ValueError(f"Unknown event type: {type(event).__name__}")
handler(event)
self.version = event.version
def _apply_AccountOpened(self, event: AccountOpened) -> None:
self.owner_name = event.owner_name
self.balance = event.initial_deposit
self.status = AccountStatus.ACTIVE
def _apply_MoneyDeposited(self, event: MoneyDeposited) -> None:
self.balance += event.amount
def _apply_MoneyWithdrawn(self, event: MoneyWithdrawn) -> None:
self.balance -= event.amount
def _apply_AccountFrozen(self, event: AccountFrozen) -> None:
self.status = AccountStatus.FROZEN
def _apply_AccountClosed(self, event: AccountClosed) -> None:
self.status = AccountStatus.CLOSED
self.balance = Decimal("0")
# --- Commands (business logic) ---
def deposit(self, amount: Decimal, reference: str = "") -> None:
if self.status != AccountStatus.ACTIVE:
raise ValueError(f"Cannot deposit to {self.status.value} account")
if amount <= 0:
raise ValueError("Deposit amount must be positive")
event = MoneyDeposited(
account_id=self.id,
amount=amount,
reference=reference,
version=self.version + 1,
)
self.apply(event)
self._pending_events.append(event)
def withdraw(self, amount: Decimal, reference: str = "") -> None:
if self.status != AccountStatus.ACTIVE:
raise ValueError(f"Cannot withdraw from {self.status.value} account")
if amount <= 0:
raise ValueError("Withdrawal amount must be positive")
if amount > self.balance:
raise ValueError(
f"Insufficient funds: balance={self.balance}, requested={amount}"
)
event = MoneyWithdrawn(
account_id=self.id,
amount=amount,
reference=reference,
version=self.version + 1,
)
self.apply(event)
self._pending_events.append(event)
def freeze(self, reason: str) -> None:
if self.status != AccountStatus.ACTIVE:
raise ValueError("Only active accounts can be frozen")
event = AccountFrozen(
account_id=self.id, reason=reason, version=self.version + 1
)
self.apply(event)
self._pending_events.append(event)
def close(self) -> None:
if self.status == AccountStatus.CLOSED:
raise ValueError("Account is already closed")
event = AccountClosed(
account_id=self.id,
closing_balance=self.balance,
version=self.version + 1,
)
self.apply(event)
self._pending_events.append(event)
@property
def pending_events(self) -> list[Event]:
return list(self._pending_events)
def clear_pending(self) -> None:
self._pending_events.clear()
# --- Factory ---
@classmethod
def open(cls, account_id: UUID, owner: str, deposit: Decimal) -> "BankAccount":
if deposit < Decimal("100"):
raise ValueError("Minimum opening deposit is $100")
account = cls(account_id)
event = AccountOpened(
account_id=account_id,
owner_name=owner,
initial_deposit=deposit,
version=1,
)
account.apply(event)
account._pending_events.append(event)
return account
The key pattern: each command method validates business rules, creates an event, applies it to update state, and stores it in the pending list.
Event store
PostgreSQL implementation
import psycopg2
from psycopg2.extras import execute_values
class PostgresEventStore:
def __init__(self, connection_string: str) -> None:
self._conn_str = connection_string
def setup(self) -> None:
with psycopg2.connect(self._conn_str) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS events (
id UUID PRIMARY KEY,
stream_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
version INT NOT NULL,
data JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
UNIQUE(stream_id, version)
);
CREATE INDEX IF NOT EXISTS idx_events_stream
ON events(stream_id, version);
""")
def append(self, stream_id: UUID, events: list[Event],
expected_version: int) -> None:
"""Append events with optimistic concurrency control."""
with psycopg2.connect(self._conn_str) as conn:
with conn.cursor() as cur:
# Check current version for optimistic locking
cur.execute(
"SELECT MAX(version) FROM events WHERE stream_id = %s",
(str(stream_id),)
)
current = cur.fetchone()[0] or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, "
f"found {current}"
)
rows = [
(str(e.event_id), str(stream_id), type(e).__name__,
e.version, json.dumps(e._data_dict()),
e.occurred_at.isoformat())
for e in events
]
execute_values(cur, """
INSERT INTO events (id, stream_id, event_type, version,
data, occurred_at)
VALUES %s
""", rows)
def load(self, stream_id: UUID, after_version: int = 0) -> list[Event]:
with psycopg2.connect(self._conn_str) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT event_type, version, data, occurred_at
FROM events
WHERE stream_id = %s AND version > %s
ORDER BY version
""", (str(stream_id), after_version))
return [
self._deserialize(row[0], row[1], row[2], row[3])
for row in cur.fetchall()
]
def _deserialize(self, event_type, version, data, occurred_at) -> Event:
registry = {
"AccountOpened": AccountOpened,
"MoneyDeposited": MoneyDeposited,
"MoneyWithdrawn": MoneyWithdrawn,
"AccountFrozen": AccountFrozen,
"AccountClosed": AccountClosed,
}
cls = registry[event_type]
return cls(version=version, occurred_at=occurred_at, **data)
The UNIQUE(stream_id, version) constraint provides optimistic concurrency control at the database level.
Reconstituting aggregates
class AccountRepository:
def __init__(self, event_store: PostgresEventStore) -> None:
self._store = event_store
def get(self, account_id: UUID) -> BankAccount:
events = self._store.load(account_id)
if not events:
raise KeyError(f"Account {account_id} not found")
account = BankAccount(account_id)
for event in events:
account.apply(event)
return account
def save(self, account: BankAccount) -> None:
pending = account.pending_events
if not pending:
return
expected = account.version - len(pending)
self._store.append(account.id, pending, expected)
account.clear_pending()
Projections
Projections transform the event stream into queryable read models.
class AccountBalanceProjection:
"""Maintains a denormalized balance table for fast lookups."""
def __init__(self, conn_string: str) -> None:
self._conn_str = conn_string
def handle(self, event: Event) -> None:
handler = getattr(self, f"_on_{type(event).__name__}", None)
if handler:
handler(event)
def _on_AccountOpened(self, event: AccountOpened) -> None:
with psycopg2.connect(self._conn_str) as conn:
conn.execute("""
INSERT INTO account_balances (id, owner, balance, status)
VALUES (%s, %s, %s, 'active')
""", (str(event.account_id), event.owner_name,
str(event.initial_deposit)))
def _on_MoneyDeposited(self, event: MoneyDeposited) -> None:
with psycopg2.connect(self._conn_str) as conn:
conn.execute("""
UPDATE account_balances
SET balance = balance + %s
WHERE id = %s
""", (str(event.amount), str(event.account_id)))
def _on_MoneyWithdrawn(self, event: MoneyWithdrawn) -> None:
with psycopg2.connect(self._conn_str) as conn:
conn.execute("""
UPDATE account_balances
SET balance = balance - %s
WHERE id = %s
""", (str(event.amount), str(event.account_id)))
Projections can be rebuilt from scratch by replaying all events — useful when adding new read models or fixing projection bugs.
Snapshots
For accounts with thousands of transactions, replaying from the beginning becomes slow. Snapshots solve this.
@dataclass
class AccountSnapshot:
account_id: UUID
owner_name: str
balance: Decimal
status: str
version: int
created_at: datetime
class SnapshotStore:
def save(self, snapshot: AccountSnapshot) -> None: ...
def load(self, account_id: UUID) -> AccountSnapshot | None: ...
class SnapshottingAccountRepo:
def __init__(self, event_store, snapshot_store, snapshot_interval=100):
self._events = event_store
self._snapshots = snapshot_store
self._interval = snapshot_interval
def get(self, account_id: UUID) -> BankAccount:
snapshot = self._snapshots.load(account_id)
account = BankAccount(account_id)
start_version = 0
if snapshot:
account.owner_name = snapshot.owner_name
account.balance = snapshot.balance
account.status = AccountStatus(snapshot.status)
account.version = snapshot.version
start_version = snapshot.version
events = self._events.load(account_id, after_version=start_version)
for event in events:
account.apply(event)
return account
def save(self, account: BankAccount) -> None:
pending = account.pending_events
if not pending:
return
expected = account.version - len(pending)
self._events.append(account.id, pending, expected)
account.clear_pending()
# Create snapshot every N events
if account.version % self._interval == 0:
self._snapshots.save(AccountSnapshot(
account_id=account.id,
owner_name=account.owner_name,
balance=account.balance,
status=account.status.value,
version=account.version,
created_at=datetime.now(timezone.utc),
))
Event versioning and upcasting
When event schemas evolve, you need a strategy to handle old events.
class EventUpcaster:
"""Transforms old event versions to current schema during loading."""
def upcast(self, event_type: str, version: int, data: dict) -> dict:
key = (event_type, version)
if key == ("MoneyDeposited", 1):
# V1 didn't have 'reference' field — add default
data.setdefault("reference", "legacy-deposit")
return data
Upcasting transforms events at read time, keeping the stored events immutable while supporting schema evolution.
Testing
def test_deposit_and_withdraw():
account = BankAccount.open(uuid4(), "Alice", Decimal("500"))
account.deposit(Decimal("200"), "salary")
account.withdraw(Decimal("50"), "coffee")
assert account.balance == Decimal("650")
assert len(account.pending_events) == 3
def test_reconstruct_from_events():
account_id = uuid4()
original = BankAccount.open(account_id, "Bob", Decimal("1000"))
original.deposit(Decimal("500"))
original.withdraw(Decimal("200"))
# Simulate loading from event store
restored = BankAccount(account_id)
for event in original.pending_events:
restored.apply(event)
assert restored.balance == original.balance
assert restored.version == original.version
def test_concurrent_modification_detected():
store = PostgresEventStore(test_conn)
account = BankAccount.open(uuid4(), "Eve", Decimal("100"))
store.append(account.id, account.pending_events, 0)
# Simulate two concurrent loads
a1 = repo.get(account.id)
a2 = repo.get(account.id)
a1.deposit(Decimal("50"))
a2.deposit(Decimal("75"))
repo.save(a1) # Succeeds
with pytest.raises(ConcurrencyError):
repo.save(a2) # Fails — version conflict
When event sourcing fits
| Good fit | Poor fit |
|---|---|
| Audit requirements (finance, healthcare) | Simple CRUD with no history needs |
| Temporal queries (“state at time T”) | High-write, low-read workloads |
| Event-driven microservices | Small teams unfamiliar with the pattern |
| Complex domain with rich business rules | Domains where current state is all that matters |
Performance considerations
- Event loading: For aggregates with few events (under 1000), replaying is fast. Beyond that, use snapshots.
- Projection lag: Projections update asynchronously. Design UIs to tolerate brief delays or use synchronous projections for critical reads.
- Storage: Events are append-only and grow indefinitely. Plan for archiving old events or compacting streams.
The one thing to remember: Event sourcing in Python stores every state change as an immutable event, uses aggregate replay for consistency, and builds projections for queries — giving you a complete, auditable history with the flexibility to derive any view of the data.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.