Domain-Driven Design in Python — Deep Dive

Building a DDD system in Python

This guide implements DDD patterns for an insurance policy management system — a domain complex enough to justify the investment. We will build entities, value objects, aggregates, domain events, repositories, and show how bounded contexts communicate.

Value objects

Value objects are immutable, equality-based-on-attributes, and self-validating.

from dataclasses import dataclass
from decimal import Decimal

@dataclass(frozen=True)
class Money:
    amount: Decimal
    currency: str

    def __post_init__(self):
        if self.amount < 0:
            raise ValueError("Money amount cannot be negative")
        if len(self.currency) != 3:
            raise ValueError("Currency must be a 3-letter ISO code")

    def add(self, other: "Money") -> "Money":
        if self.currency != other.currency:
            raise ValueError(f"Cannot add {self.currency} and {other.currency}")
        return Money(self.amount + other.amount, self.currency)

    def multiply(self, factor: Decimal) -> "Money":
        return Money((self.amount * factor).quantize(Decimal("0.01")), self.currency)

@dataclass(frozen=True)
class Address:
    street: str
    city: str
    state: str
    zip_code: str
    country: str = "US"

@dataclass(frozen=True)
class DateRange:
    start: date
    end: date

    def __post_init__(self):
        if self.end <= self.start:
            raise ValueError("End date must be after start date")

    @property
    def days(self) -> int:
        return (self.end - self.start).days

    def overlaps(self, other: "DateRange") -> bool:
        return self.start < other.end and other.start < self.end

Frozen dataclasses give immutability automatically. The __post_init__ hook provides validation at creation time — invalid value objects can never exist.

Entities

Entities have identity. Two policies with different IDs are different policies even if all other attributes match.

from datetime import date
from uuid import UUID, uuid4

@dataclass
class Policyholder:
    id: UUID
    name: str
    address: Address
    date_of_birth: date

    def __eq__(self, other):
        if not isinstance(other, Policyholder):
            return NotImplemented
        return self.id == other.id

    def __hash__(self):
        return hash(self.id)

    @property
    def age(self) -> int:
        today = date.today()
        return today.year - self.date_of_birth.year - (
            (today.month, today.day) < (self.date_of_birth.month, self.date_of_birth.day)
        )

Domain events

Events capture what happened in the domain. They are immutable records.

from datetime import datetime, timezone

@dataclass(frozen=True)
class DomainEvent:
    occurred_at: datetime

@dataclass(frozen=True)
class PolicyIssued(DomainEvent):
    policy_id: UUID
    holder_id: UUID
    premium: Money
    coverage_period: DateRange

@dataclass(frozen=True)
class ClaimFiled(DomainEvent):
    claim_id: UUID
    policy_id: UUID
    amount: Money
    description: str

@dataclass(frozen=True)
class ClaimApproved(DomainEvent):
    claim_id: UUID
    approved_amount: Money

@dataclass(frozen=True)
class PolicyCancelled(DomainEvent):
    policy_id: UUID
    reason: str
    effective_date: date

Aggregates

The InsurancePolicy aggregate enforces all business invariants. External code never modifies child entities directly — everything goes through the aggregate root.

from enum import Enum

class PolicyStatus(Enum):
    DRAFT = "draft"
    ACTIVE = "active"
    SUSPENDED = "suspended"
    CANCELLED = "cancelled"
    EXPIRED = "expired"

class ClaimStatus(Enum):
    PENDING = "pending"
    APPROVED = "approved"
    DENIED = "denied"

@dataclass
class Claim:
    id: UUID
    amount: Money
    description: str
    status: ClaimStatus = ClaimStatus.PENDING
    filed_at: datetime = None

@dataclass
class InsurancePolicy:
    """Aggregate root for insurance policies."""
    id: UUID
    holder: Policyholder
    premium: Money
    coverage_period: DateRange
    max_claim_amount: Money
    status: PolicyStatus = PolicyStatus.DRAFT
    claims: list[Claim] = None
    _events: list[DomainEvent] = None

    def __post_init__(self):
        if self.claims is None:
            self.claims = []
        if self._events is None:
            self._events = []

    @property
    def pending_events(self) -> list[DomainEvent]:
        return list(self._events)

    def clear_events(self) -> None:
        self._events.clear()

    # --- Aggregate behaviors ---

    def activate(self) -> None:
        if self.status != PolicyStatus.DRAFT:
            raise ValueError(f"Cannot activate policy in {self.status.value} state")
        if self.holder.age < 18:
            raise ValueError("Policyholder must be at least 18 years old")
        self.status = PolicyStatus.ACTIVE
        self._events.append(PolicyIssued(
            occurred_at=datetime.now(timezone.utc),
            policy_id=self.id,
            holder_id=self.holder.id,
            premium=self.premium,
            coverage_period=self.coverage_period,
        ))

    def file_claim(self, claim_id: UUID, amount: Money, description: str) -> Claim:
        if self.status != PolicyStatus.ACTIVE:
            raise ValueError("Claims can only be filed on active policies")
        if amount.currency != self.premium.currency:
            raise ValueError("Claim currency must match policy currency")
        if amount.amount > self.max_claim_amount.amount:
            raise ValueError(
                f"Claim amount {amount.amount} exceeds maximum {self.max_claim_amount.amount}"
            )

        total_pending = sum(
            c.amount.amount for c in self.claims if c.status == ClaimStatus.PENDING
        )
        if total_pending + amount.amount > self.max_claim_amount.amount:
            raise ValueError("Total pending claims would exceed maximum")

        claim = Claim(
            id=claim_id,
            amount=amount,
            description=description,
            filed_at=datetime.now(timezone.utc),
        )
        self.claims.append(claim)

        self._events.append(ClaimFiled(
            occurred_at=datetime.now(timezone.utc),
            claim_id=claim_id,
            policy_id=self.id,
            amount=amount,
            description=description,
        ))
        return claim

    def approve_claim(self, claim_id: UUID) -> None:
        claim = self._find_claim(claim_id)
        if claim.status != ClaimStatus.PENDING:
            raise ValueError(f"Claim {claim_id} is not pending")
        claim.status = ClaimStatus.APPROVED
        self._events.append(ClaimApproved(
            occurred_at=datetime.now(timezone.utc),
            claim_id=claim_id,
            approved_amount=claim.amount,
        ))

    def cancel(self, reason: str) -> None:
        if self.status == PolicyStatus.CANCELLED:
            raise ValueError("Policy is already cancelled")
        pending = [c for c in self.claims if c.status == ClaimStatus.PENDING]
        if pending:
            raise ValueError(
                f"Cannot cancel policy with {len(pending)} pending claims"
            )
        self.status = PolicyStatus.CANCELLED
        self._events.append(PolicyCancelled(
            occurred_at=datetime.now(timezone.utc),
            policy_id=self.id,
            reason=reason,
            effective_date=date.today(),
        ))

    def _find_claim(self, claim_id: UUID) -> Claim:
        for claim in self.claims:
            if claim.id == claim_id:
                return claim
        raise KeyError(f"Claim {claim_id} not found in policy {self.id}")

Key aggregate design decisions:

  1. All mutations go through the rootfile_claim and approve_claim are methods on InsurancePolicy, not on Claim.
  2. Invariants are checked at mutation time — You cannot create an invalid state.
  3. Events are collected internally — The infrastructure layer publishes them after persistence.

Repository pattern

from typing import Protocol

class PolicyRepository(Protocol):
    def get(self, policy_id: UUID) -> InsurancePolicy | None: ...
    def save(self, policy: InsurancePolicy) -> None: ...
    def find_active_by_holder(self, holder_id: UUID) -> list[InsurancePolicy]: ...

SQLAlchemy implementation

class SqlPolicyRepository:
    def __init__(self, session: Session) -> None:
        self._session = session

    def get(self, policy_id: UUID) -> InsurancePolicy | None:
        model = self._session.get(PolicyModel, str(policy_id))
        return self._to_domain(model) if model else None

    def save(self, policy: InsurancePolicy) -> None:
        model = self._to_model(policy)
        self._session.merge(model)
        self._session.flush()

    def _to_domain(self, m: PolicyModel) -> InsurancePolicy:
        holder = Policyholder(
            id=UUID(m.holder_id),
            name=m.holder_name,
            address=Address(m.street, m.city, m.state, m.zip_code),
            date_of_birth=m.holder_dob,
        )
        return InsurancePolicy(
            id=UUID(m.id),
            holder=holder,
            premium=Money(Decimal(m.premium_amount), m.premium_currency),
            coverage_period=DateRange(m.coverage_start, m.coverage_end),
            max_claim_amount=Money(Decimal(m.max_claim), m.premium_currency),
            status=PolicyStatus(m.status),
            claims=[self._claim_to_domain(c) for c in m.claims],
        )

Unit of Work with event dispatching

class UnitOfWork:
    def __init__(self, session_factory, event_bus: EventBus):
        self._session_factory = session_factory
        self._event_bus = event_bus

    def __enter__(self):
        self._session = self._session_factory()
        self.policies = SqlPolicyRepository(self._session)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self._session.rollback()
        self._session.close()

    def commit(self) -> None:
        self._session.commit()
        # Dispatch events after successful commit
        for policy in self._tracked_aggregates():
            for event in policy.pending_events:
                self._event_bus.publish(event)
            policy.clear_events()

Bounded context integration

The billing context needs to know when a policy is issued (to set up payment schedules) but uses its own model.

# billing/event_handlers.py — Anti-corruption layer
class PolicyIssuedHandler:
    def __init__(self, billing_service: BillingService):
        self._billing = billing_service

    def handle(self, event: PolicyIssued) -> None:
        # Translate from policy context to billing context
        self._billing.create_payment_schedule(
            reference_id=str(event.policy_id),
            amount=float(event.premium.amount),
            currency=event.premium.currency,
            start_date=event.coverage_period.start,
            frequency="monthly",
        )

The handler acts as an anti-corruption layer — it translates between the policy domain model and the billing domain model. Neither context depends on the other’s internal structures.

Testing aggregate behavior

def test_cannot_file_claim_on_inactive_policy():
    policy = create_draft_policy()
    with pytest.raises(ValueError, match="active policies"):
        policy.file_claim(uuid4(), Money(Decimal("500"), "USD"), "Water damage")

def test_claim_total_cannot_exceed_maximum():
    policy = create_active_policy(max_claim=Decimal("1000"))
    policy.file_claim(uuid4(), Money(Decimal("800"), "USD"), "First claim")
    with pytest.raises(ValueError, match="exceed maximum"):
        policy.file_claim(uuid4(), Money(Decimal("300"), "USD"), "Second claim")

def test_activate_emits_policy_issued_event():
    policy = create_draft_policy()
    policy.activate()
    events = policy.pending_events
    assert len(events) == 1
    assert isinstance(events[0], PolicyIssued)
    assert events[0].policy_id == policy.id

Aggregate tests are pure unit tests — no database, no framework, no mocking. They verify business invariants directly.

Tradeoffs

  • Modeling investment: DDD requires deep domain understanding before writing code. For well-understood CRUD domains, this is overkill.
  • Aggregate boundaries: Choosing the wrong aggregate boundary leads to either overly large aggregates (performance) or cross-aggregate consistency issues (complexity).
  • Event consistency: Domain events published after commit create eventual consistency scenarios that need careful handling.
  • Team alignment: DDD only works when developers and domain experts collaborate regularly. Without that feedback loop, the model drifts from reality.

The one thing to remember: DDD in Python leverages dataclasses for value objects, rich aggregate methods for invariant enforcement, and domain events for cross-context communication — keeping the business model at the center of your architecture.

pythonarchitectureddd

See Also