Domain-Driven Design in Python — Deep Dive
Building a DDD system in Python
This guide implements DDD patterns for an insurance policy management system — a domain complex enough to justify the investment. We will build entities, value objects, aggregates, domain events, repositories, and show how bounded contexts communicate.
Value objects
Value objects are immutable, equality-based-on-attributes, and self-validating.
from dataclasses import dataclass
from decimal import Decimal
@dataclass(frozen=True)
class Money:
amount: Decimal
currency: str
def __post_init__(self):
if self.amount < 0:
raise ValueError("Money amount cannot be negative")
if len(self.currency) != 3:
raise ValueError("Currency must be a 3-letter ISO code")
def add(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError(f"Cannot add {self.currency} and {other.currency}")
return Money(self.amount + other.amount, self.currency)
def multiply(self, factor: Decimal) -> "Money":
return Money((self.amount * factor).quantize(Decimal("0.01")), self.currency)
@dataclass(frozen=True)
class Address:
street: str
city: str
state: str
zip_code: str
country: str = "US"
@dataclass(frozen=True)
class DateRange:
start: date
end: date
def __post_init__(self):
if self.end <= self.start:
raise ValueError("End date must be after start date")
@property
def days(self) -> int:
return (self.end - self.start).days
def overlaps(self, other: "DateRange") -> bool:
return self.start < other.end and other.start < self.end
Frozen dataclasses give immutability automatically. The __post_init__ hook provides validation at creation time — invalid value objects can never exist.
Entities
Entities have identity. Two policies with different IDs are different policies even if all other attributes match.
from datetime import date
from uuid import UUID, uuid4
@dataclass
class Policyholder:
id: UUID
name: str
address: Address
date_of_birth: date
def __eq__(self, other):
if not isinstance(other, Policyholder):
return NotImplemented
return self.id == other.id
def __hash__(self):
return hash(self.id)
@property
def age(self) -> int:
today = date.today()
return today.year - self.date_of_birth.year - (
(today.month, today.day) < (self.date_of_birth.month, self.date_of_birth.day)
)
Domain events
Events capture what happened in the domain. They are immutable records.
from datetime import datetime, timezone
@dataclass(frozen=True)
class DomainEvent:
occurred_at: datetime
@dataclass(frozen=True)
class PolicyIssued(DomainEvent):
policy_id: UUID
holder_id: UUID
premium: Money
coverage_period: DateRange
@dataclass(frozen=True)
class ClaimFiled(DomainEvent):
claim_id: UUID
policy_id: UUID
amount: Money
description: str
@dataclass(frozen=True)
class ClaimApproved(DomainEvent):
claim_id: UUID
approved_amount: Money
@dataclass(frozen=True)
class PolicyCancelled(DomainEvent):
policy_id: UUID
reason: str
effective_date: date
Aggregates
The InsurancePolicy aggregate enforces all business invariants. External code never modifies child entities directly — everything goes through the aggregate root.
from enum import Enum
class PolicyStatus(Enum):
DRAFT = "draft"
ACTIVE = "active"
SUSPENDED = "suspended"
CANCELLED = "cancelled"
EXPIRED = "expired"
class ClaimStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
DENIED = "denied"
@dataclass
class Claim:
id: UUID
amount: Money
description: str
status: ClaimStatus = ClaimStatus.PENDING
filed_at: datetime = None
@dataclass
class InsurancePolicy:
"""Aggregate root for insurance policies."""
id: UUID
holder: Policyholder
premium: Money
coverage_period: DateRange
max_claim_amount: Money
status: PolicyStatus = PolicyStatus.DRAFT
claims: list[Claim] = None
_events: list[DomainEvent] = None
def __post_init__(self):
if self.claims is None:
self.claims = []
if self._events is None:
self._events = []
@property
def pending_events(self) -> list[DomainEvent]:
return list(self._events)
def clear_events(self) -> None:
self._events.clear()
# --- Aggregate behaviors ---
def activate(self) -> None:
if self.status != PolicyStatus.DRAFT:
raise ValueError(f"Cannot activate policy in {self.status.value} state")
if self.holder.age < 18:
raise ValueError("Policyholder must be at least 18 years old")
self.status = PolicyStatus.ACTIVE
self._events.append(PolicyIssued(
occurred_at=datetime.now(timezone.utc),
policy_id=self.id,
holder_id=self.holder.id,
premium=self.premium,
coverage_period=self.coverage_period,
))
def file_claim(self, claim_id: UUID, amount: Money, description: str) -> Claim:
if self.status != PolicyStatus.ACTIVE:
raise ValueError("Claims can only be filed on active policies")
if amount.currency != self.premium.currency:
raise ValueError("Claim currency must match policy currency")
if amount.amount > self.max_claim_amount.amount:
raise ValueError(
f"Claim amount {amount.amount} exceeds maximum {self.max_claim_amount.amount}"
)
total_pending = sum(
c.amount.amount for c in self.claims if c.status == ClaimStatus.PENDING
)
if total_pending + amount.amount > self.max_claim_amount.amount:
raise ValueError("Total pending claims would exceed maximum")
claim = Claim(
id=claim_id,
amount=amount,
description=description,
filed_at=datetime.now(timezone.utc),
)
self.claims.append(claim)
self._events.append(ClaimFiled(
occurred_at=datetime.now(timezone.utc),
claim_id=claim_id,
policy_id=self.id,
amount=amount,
description=description,
))
return claim
def approve_claim(self, claim_id: UUID) -> None:
claim = self._find_claim(claim_id)
if claim.status != ClaimStatus.PENDING:
raise ValueError(f"Claim {claim_id} is not pending")
claim.status = ClaimStatus.APPROVED
self._events.append(ClaimApproved(
occurred_at=datetime.now(timezone.utc),
claim_id=claim_id,
approved_amount=claim.amount,
))
def cancel(self, reason: str) -> None:
if self.status == PolicyStatus.CANCELLED:
raise ValueError("Policy is already cancelled")
pending = [c for c in self.claims if c.status == ClaimStatus.PENDING]
if pending:
raise ValueError(
f"Cannot cancel policy with {len(pending)} pending claims"
)
self.status = PolicyStatus.CANCELLED
self._events.append(PolicyCancelled(
occurred_at=datetime.now(timezone.utc),
policy_id=self.id,
reason=reason,
effective_date=date.today(),
))
def _find_claim(self, claim_id: UUID) -> Claim:
for claim in self.claims:
if claim.id == claim_id:
return claim
raise KeyError(f"Claim {claim_id} not found in policy {self.id}")
Key aggregate design decisions:
- All mutations go through the root —
file_claimandapprove_claimare methods onInsurancePolicy, not onClaim. - Invariants are checked at mutation time — You cannot create an invalid state.
- Events are collected internally — The infrastructure layer publishes them after persistence.
Repository pattern
from typing import Protocol
class PolicyRepository(Protocol):
def get(self, policy_id: UUID) -> InsurancePolicy | None: ...
def save(self, policy: InsurancePolicy) -> None: ...
def find_active_by_holder(self, holder_id: UUID) -> list[InsurancePolicy]: ...
SQLAlchemy implementation
class SqlPolicyRepository:
def __init__(self, session: Session) -> None:
self._session = session
def get(self, policy_id: UUID) -> InsurancePolicy | None:
model = self._session.get(PolicyModel, str(policy_id))
return self._to_domain(model) if model else None
def save(self, policy: InsurancePolicy) -> None:
model = self._to_model(policy)
self._session.merge(model)
self._session.flush()
def _to_domain(self, m: PolicyModel) -> InsurancePolicy:
holder = Policyholder(
id=UUID(m.holder_id),
name=m.holder_name,
address=Address(m.street, m.city, m.state, m.zip_code),
date_of_birth=m.holder_dob,
)
return InsurancePolicy(
id=UUID(m.id),
holder=holder,
premium=Money(Decimal(m.premium_amount), m.premium_currency),
coverage_period=DateRange(m.coverage_start, m.coverage_end),
max_claim_amount=Money(Decimal(m.max_claim), m.premium_currency),
status=PolicyStatus(m.status),
claims=[self._claim_to_domain(c) for c in m.claims],
)
Unit of Work with event dispatching
class UnitOfWork:
def __init__(self, session_factory, event_bus: EventBus):
self._session_factory = session_factory
self._event_bus = event_bus
def __enter__(self):
self._session = self._session_factory()
self.policies = SqlPolicyRepository(self._session)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self._session.rollback()
self._session.close()
def commit(self) -> None:
self._session.commit()
# Dispatch events after successful commit
for policy in self._tracked_aggregates():
for event in policy.pending_events:
self._event_bus.publish(event)
policy.clear_events()
Bounded context integration
The billing context needs to know when a policy is issued (to set up payment schedules) but uses its own model.
# billing/event_handlers.py — Anti-corruption layer
class PolicyIssuedHandler:
def __init__(self, billing_service: BillingService):
self._billing = billing_service
def handle(self, event: PolicyIssued) -> None:
# Translate from policy context to billing context
self._billing.create_payment_schedule(
reference_id=str(event.policy_id),
amount=float(event.premium.amount),
currency=event.premium.currency,
start_date=event.coverage_period.start,
frequency="monthly",
)
The handler acts as an anti-corruption layer — it translates between the policy domain model and the billing domain model. Neither context depends on the other’s internal structures.
Testing aggregate behavior
def test_cannot_file_claim_on_inactive_policy():
policy = create_draft_policy()
with pytest.raises(ValueError, match="active policies"):
policy.file_claim(uuid4(), Money(Decimal("500"), "USD"), "Water damage")
def test_claim_total_cannot_exceed_maximum():
policy = create_active_policy(max_claim=Decimal("1000"))
policy.file_claim(uuid4(), Money(Decimal("800"), "USD"), "First claim")
with pytest.raises(ValueError, match="exceed maximum"):
policy.file_claim(uuid4(), Money(Decimal("300"), "USD"), "Second claim")
def test_activate_emits_policy_issued_event():
policy = create_draft_policy()
policy.activate()
events = policy.pending_events
assert len(events) == 1
assert isinstance(events[0], PolicyIssued)
assert events[0].policy_id == policy.id
Aggregate tests are pure unit tests — no database, no framework, no mocking. They verify business invariants directly.
Tradeoffs
- Modeling investment: DDD requires deep domain understanding before writing code. For well-understood CRUD domains, this is overkill.
- Aggregate boundaries: Choosing the wrong aggregate boundary leads to either overly large aggregates (performance) or cross-aggregate consistency issues (complexity).
- Event consistency: Domain events published after commit create eventual consistency scenarios that need careful handling.
- Team alignment: DDD only works when developers and domain experts collaborate regularly. Without that feedback loop, the model drifts from reality.
The one thing to remember: DDD in Python leverages dataclasses for value objects, rich aggregate methods for invariant enforcement, and domain events for cross-context communication — keeping the business model at the center of your architecture.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.