Aggregate Pattern in Python — Deep Dive

Building production aggregates in Python

This guide implements the Aggregate Pattern for a project management system. We build a Project aggregate that manages tasks, enforces team capacity rules, and emits domain events — demonstrating how aggregates work in real Python applications.

Domain model

Value objects

from dataclasses import dataclass
from datetime import date, datetime, timezone
from decimal import Decimal
from enum import Enum
from uuid import UUID, uuid4

class Priority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class TaskStatus(Enum):
    TODO = "todo"
    IN_PROGRESS = "in_progress"
    IN_REVIEW = "in_review"
    DONE = "done"

class ProjectStatus(Enum):
    PLANNING = "planning"
    ACTIVE = "active"
    ON_HOLD = "on_hold"
    COMPLETED = "completed"
    CANCELLED = "cancelled"

@dataclass(frozen=True)
class StoryPoints:
    value: int

    def __post_init__(self):
        if self.value < 0:
            raise ValueError("Story points cannot be negative")
        if self.value > 21:
            raise ValueError("Story points cannot exceed 21 (Fibonacci limit)")

    def __add__(self, other: "StoryPoints") -> "StoryPoints":
        return StoryPoints(self.value + other.value)

@dataclass(frozen=True)
class TeamMember:
    id: UUID
    name: str
    max_concurrent_tasks: int = 3

Internal entity: Task

@dataclass
class Task:
    """Internal entity within the Project aggregate.
    Never accessed directly from outside the aggregate."""
    id: UUID
    title: str
    assignee_id: UUID | None
    priority: Priority
    status: TaskStatus
    story_points: StoryPoints
    created_at: datetime
    completed_at: datetime | None = None

    def _mark_in_progress(self) -> None:
        if self.status != TaskStatus.TODO:
            raise ValueError(f"Cannot start task in {self.status.value} state")
        if self.assignee_id is None:
            raise ValueError("Cannot start unassigned task")
        self.status = TaskStatus.IN_PROGRESS

    def _mark_done(self) -> None:
        if self.status not in (TaskStatus.IN_PROGRESS, TaskStatus.IN_REVIEW):
            raise ValueError(f"Cannot complete task in {self.status.value} state")
        self.status = TaskStatus.DONE
        self.completed_at = datetime.now(timezone.utc)

    def _assign(self, member_id: UUID) -> None:
        self.assignee_id = member_id

Methods are prefixed with _ to signal they should not be called directly. The aggregate root calls them after checking cross-entity invariants.

Domain events

@dataclass(frozen=True)
class DomainEvent:
    event_id: UUID
    occurred_at: datetime
    aggregate_id: UUID

@dataclass(frozen=True)
class TaskCreated(DomainEvent):
    task_id: UUID
    title: str
    priority: str

@dataclass(frozen=True)
class TaskAssigned(DomainEvent):
    task_id: UUID
    assignee_id: UUID

@dataclass(frozen=True)
class TaskCompleted(DomainEvent):
    task_id: UUID
    story_points: int

@dataclass(frozen=True)
class SprintCapacityExceeded(DomainEvent):
    current_points: int
    max_points: int

@dataclass(frozen=True)
class ProjectCompleted(DomainEvent):
    total_tasks: int
    total_points: int
    duration_days: int

The aggregate root: Project

@dataclass
class Project:
    """Aggregate root. All task operations go through this class."""
    id: UUID
    name: str
    status: ProjectStatus
    team: list[TeamMember]
    sprint_capacity: StoryPoints
    started_at: date | None = None
    _tasks: list[Task] = None
    _events: list[DomainEvent] = None
    _version: int = 0

    def __post_init__(self):
        if self._tasks is None:
            self._tasks = []
        if self._events is None:
            self._events = []

    # --- Read-only access ---

    @property
    def tasks(self) -> tuple[Task, ...]:
        return tuple(self._tasks)

    @property
    def pending_events(self) -> list[DomainEvent]:
        return list(self._events)

    @property
    def version(self) -> int:
        return self._version

    def clear_events(self) -> None:
        self._events.clear()

    # --- Computed properties ---

    @property
    def total_story_points(self) -> int:
        return sum(t.story_points.value for t in self._tasks)

    @property
    def completed_story_points(self) -> int:
        return sum(
            t.story_points.value for t in self._tasks
            if t.status == TaskStatus.DONE
        )

    @property
    def active_story_points(self) -> int:
        return sum(
            t.story_points.value for t in self._tasks
            if t.status in (TaskStatus.TODO, TaskStatus.IN_PROGRESS, TaskStatus.IN_REVIEW)
        )

    @property
    def completion_percentage(self) -> float:
        total = self.total_story_points
        if total == 0:
            return 0.0
        return round(self.completed_story_points / total * 100, 1)

    # --- Commands (business operations) ---

    def activate(self) -> None:
        if self.status != ProjectStatus.PLANNING:
            raise ValueError("Only planning projects can be activated")
        if not self.team:
            raise ValueError("Cannot activate a project with no team members")
        self.status = ProjectStatus.ACTIVE
        self.started_at = date.today()

    def create_task(
        self,
        title: str,
        priority: Priority,
        story_points: StoryPoints,
        assignee_id: UUID | None = None,
    ) -> UUID:
        if self.status not in (ProjectStatus.PLANNING, ProjectStatus.ACTIVE):
            raise ValueError(f"Cannot add tasks to {self.status.value} project")

        if assignee_id and not self._is_team_member(assignee_id):
            raise ValueError("Assignee is not a team member")

        # Check sprint capacity
        if self.active_story_points + story_points.value > self.sprint_capacity.value:
            self._events.append(SprintCapacityExceeded(
                event_id=uuid4(),
                occurred_at=datetime.now(timezone.utc),
                aggregate_id=self.id,
                current_points=self.active_story_points,
                max_points=self.sprint_capacity.value,
            ))

        task_id = uuid4()
        task = Task(
            id=task_id,
            title=title,
            assignee_id=assignee_id,
            priority=priority,
            status=TaskStatus.TODO,
            story_points=story_points,
            created_at=datetime.now(timezone.utc),
        )
        self._tasks.append(task)

        self._events.append(TaskCreated(
            event_id=uuid4(),
            occurred_at=datetime.now(timezone.utc),
            aggregate_id=self.id,
            task_id=task_id,
            title=title,
            priority=priority.name,
        ))

        return task_id

    def assign_task(self, task_id: UUID, member_id: UUID) -> None:
        if self.status != ProjectStatus.ACTIVE:
            raise ValueError("Project must be active to assign tasks")

        task = self._find_task(task_id)
        member = self._find_member(member_id)

        # Cross-entity invariant: member workload limit
        active_tasks_for_member = sum(
            1 for t in self._tasks
            if t.assignee_id == member_id
            and t.status in (TaskStatus.IN_PROGRESS, TaskStatus.IN_REVIEW)
        )
        if active_tasks_for_member >= member.max_concurrent_tasks:
            raise ValueError(
                f"{member.name} already has {active_tasks_for_member} active tasks "
                f"(max: {member.max_concurrent_tasks})"
            )

        task._assign(member_id)
        self._events.append(TaskAssigned(
            event_id=uuid4(),
            occurred_at=datetime.now(timezone.utc),
            aggregate_id=self.id,
            task_id=task_id,
            assignee_id=member_id,
        ))

    def start_task(self, task_id: UUID) -> None:
        if self.status != ProjectStatus.ACTIVE:
            raise ValueError("Project must be active to start tasks")
        task = self._find_task(task_id)
        task._mark_in_progress()

    def complete_task(self, task_id: UUID) -> None:
        task = self._find_task(task_id)
        task._mark_done()
        self._events.append(TaskCompleted(
            event_id=uuid4(),
            occurred_at=datetime.now(timezone.utc),
            aggregate_id=self.id,
            task_id=task_id,
            story_points=task.story_points.value,
        ))

        # Check if all tasks are done
        if all(t.status == TaskStatus.DONE for t in self._tasks) and self._tasks:
            self.status = ProjectStatus.COMPLETED
            self._events.append(ProjectCompleted(
                event_id=uuid4(),
                occurred_at=datetime.now(timezone.utc),
                aggregate_id=self.id,
                total_tasks=len(self._tasks),
                total_points=self.total_story_points,
                duration_days=(date.today() - self.started_at).days if self.started_at else 0,
            ))

    # --- Internal helpers ---

    def _find_task(self, task_id: UUID) -> Task:
        for task in self._tasks:
            if task.id == task_id:
                return task
        raise KeyError(f"Task {task_id} not found in project {self.id}")

    def _find_member(self, member_id: UUID) -> TeamMember:
        for member in self.team:
            if member.id == member_id:
                return member
        raise KeyError(f"Member {member_id} not on project team")

    def _is_team_member(self, member_id: UUID) -> bool:
        return any(m.id == member_id for m in self.team)

Repository with optimistic concurrency

from typing import Protocol

class ProjectRepository(Protocol):
    def get(self, project_id: UUID) -> Project: ...
    def save(self, project: Project) -> None: ...

class SqlProjectRepository:
    def __init__(self, session) -> None:
        self._session = session

    def get(self, project_id: UUID) -> Project:
        model = self._session.query(ProjectModel).get(str(project_id))
        if model is None:
            raise KeyError(f"Project {project_id} not found")
        return self._to_domain(model)

    def save(self, project: Project) -> None:
        model = self._session.query(ProjectModel).get(str(project.id))
        if model is None:
            model = ProjectModel(id=str(project.id))
            self._session.add(model)

        # Optimistic concurrency check
        if model.version != project.version:
            raise ConcurrencyError(
                f"Project {project.id} was modified by another process "
                f"(expected version {project.version}, found {model.version})"
            )

        self._update_model(model, project)
        model.version += 1
        project._version = model.version
        self._session.flush()

Unit of Work with event dispatch

class UnitOfWork:
    def __init__(self, session_factory, event_bus):
        self._session_factory = session_factory
        self._event_bus = event_bus
        self._aggregates: list[Project] = []

    def __enter__(self):
        self._session = self._session_factory()
        self.projects = SqlProjectRepository(self._session)
        return self

    def __exit__(self, *args):
        if args[0]:
            self._session.rollback()
        self._session.close()

    def track(self, project: Project) -> None:
        self._aggregates.append(project)

    def commit(self) -> None:
        for agg in self._aggregates:
            self.projects.save(agg)
        self._session.commit()

        # Dispatch events after successful commit
        for agg in self._aggregates:
            for event in agg.pending_events:
                self._event_bus.publish(event)
            agg.clear_events()
        self._aggregates.clear()

Use case layer

class AssignTaskUseCase:
    def __init__(self, uow_factory) -> None:
        self._uow_factory = uow_factory

    def execute(self, project_id: UUID, task_id: UUID, member_id: UUID) -> None:
        with self._uow_factory() as uow:
            project = uow.projects.get(project_id)
            uow.track(project)
            project.assign_task(task_id, member_id)
            uow.commit()

The use case is thin: load aggregate, call a method, commit. All business logic lives in the aggregate.

Testing

Aggregate unit tests

def make_project(**overrides) -> Project:
    defaults = {
        "id": uuid4(),
        "name": "Test Project",
        "status": ProjectStatus.ACTIVE,
        "team": [TeamMember(id=uuid4(), name="Alice", max_concurrent_tasks=2)],
        "sprint_capacity": StoryPoints(20),
        "started_at": date.today(),
    }
    defaults.update(overrides)
    return Project(**defaults)

def test_create_task_returns_id():
    project = make_project()
    task_id = project.create_task("Build login", Priority.HIGH, StoryPoints(5))
    assert isinstance(task_id, UUID)
    assert len(project.tasks) == 1

def test_cannot_add_task_to_completed_project():
    project = make_project(status=ProjectStatus.COMPLETED)
    with pytest.raises(ValueError, match="Cannot add tasks"):
        project.create_task("Late task", Priority.LOW, StoryPoints(1))

def test_member_workload_limit():
    member_id = uuid4()
    project = make_project(
        team=[TeamMember(id=member_id, name="Bob", max_concurrent_tasks=1)]
    )
    t1 = project.create_task("Task 1", Priority.MEDIUM, StoryPoints(3), member_id)
    project.start_task(t1)

    t2 = project.create_task("Task 2", Priority.MEDIUM, StoryPoints(2), member_id)
    with pytest.raises(ValueError, match="already has 1 active tasks"):
        project.assign_task(t2, member_id)

def test_completing_all_tasks_completes_project():
    project = make_project()
    member_id = project.team[0].id
    t1 = project.create_task("Task 1", Priority.HIGH, StoryPoints(5), member_id)
    project.start_task(t1)
    project.complete_task(t1)

    assert project.status == ProjectStatus.COMPLETED
    completed_events = [e for e in project.pending_events if isinstance(e, ProjectCompleted)]
    assert len(completed_events) == 1

def test_sprint_capacity_warning():
    project = make_project(sprint_capacity=StoryPoints(5))
    project.create_task("Big task", Priority.HIGH, StoryPoints(8))
    warnings = [e for e in project.pending_events if isinstance(e, SprintCapacityExceeded)]
    assert len(warnings) == 1

All tests are pure unit tests: no database, no mocking, no infrastructure. The aggregate validates itself.

Aggregate sizing guidelines

SignalAction
Aggregate has > 15 entitiesConsider splitting
Multiple users frequently compete for the same aggregateToo large — split to reduce contention
Operations on child entities never check other childrenThe child might be its own aggregate
Loading the aggregate requires joining 5+ tablesPerformance concern — consider splitting
Business rules cross the proposed boundaryKeep together — the invariant dictates the boundary

Common pitfalls

  1. Exposing internal collections — Returning self._tasks directly allows bypass. Return tuples or frozen views.
  2. Anemic aggregates — If the aggregate root is just getters and setters with validation in services, you have lost the pattern’s benefit. Put behavior in the aggregate.
  3. Cross-aggregate transactions — Saving two aggregates in one transaction defeats the purpose. Use domain events for cross-aggregate coordination.
  4. Too-large aggregates — Including everything “just in case” causes locking contention and slow loading. Start small and grow only when invariants require it.

The one thing to remember: Aggregates in Python enforce business invariants by routing all modifications through a root object that checks rules before applying changes — keeping your domain consistent without scattering validation across your codebase.

pythonarchitectureddd

See Also

  • Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
  • Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
  • Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
  • Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.
  • Python Connection Draining How to shut down a Python server without hanging up on people mid-conversation — like a store that locks the entrance but lets shoppers finish.