Aggregate Pattern in Python — Deep Dive
Building production aggregates in Python
This guide implements the Aggregate Pattern for a project management system. We build a Project aggregate that manages tasks, enforces team capacity rules, and emits domain events — demonstrating how aggregates work in real Python applications.
Domain model
Value objects
from dataclasses import dataclass
from datetime import date, datetime, timezone
from decimal import Decimal
from enum import Enum
from uuid import UUID, uuid4
class Priority(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class TaskStatus(Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
IN_REVIEW = "in_review"
DONE = "done"
class ProjectStatus(Enum):
PLANNING = "planning"
ACTIVE = "active"
ON_HOLD = "on_hold"
COMPLETED = "completed"
CANCELLED = "cancelled"
@dataclass(frozen=True)
class StoryPoints:
value: int
def __post_init__(self):
if self.value < 0:
raise ValueError("Story points cannot be negative")
if self.value > 21:
raise ValueError("Story points cannot exceed 21 (Fibonacci limit)")
def __add__(self, other: "StoryPoints") -> "StoryPoints":
return StoryPoints(self.value + other.value)
@dataclass(frozen=True)
class TeamMember:
id: UUID
name: str
max_concurrent_tasks: int = 3
Internal entity: Task
@dataclass
class Task:
"""Internal entity within the Project aggregate.
Never accessed directly from outside the aggregate."""
id: UUID
title: str
assignee_id: UUID | None
priority: Priority
status: TaskStatus
story_points: StoryPoints
created_at: datetime
completed_at: datetime | None = None
def _mark_in_progress(self) -> None:
if self.status != TaskStatus.TODO:
raise ValueError(f"Cannot start task in {self.status.value} state")
if self.assignee_id is None:
raise ValueError("Cannot start unassigned task")
self.status = TaskStatus.IN_PROGRESS
def _mark_done(self) -> None:
if self.status not in (TaskStatus.IN_PROGRESS, TaskStatus.IN_REVIEW):
raise ValueError(f"Cannot complete task in {self.status.value} state")
self.status = TaskStatus.DONE
self.completed_at = datetime.now(timezone.utc)
def _assign(self, member_id: UUID) -> None:
self.assignee_id = member_id
Methods are prefixed with _ to signal they should not be called directly. The aggregate root calls them after checking cross-entity invariants.
Domain events
@dataclass(frozen=True)
class DomainEvent:
event_id: UUID
occurred_at: datetime
aggregate_id: UUID
@dataclass(frozen=True)
class TaskCreated(DomainEvent):
task_id: UUID
title: str
priority: str
@dataclass(frozen=True)
class TaskAssigned(DomainEvent):
task_id: UUID
assignee_id: UUID
@dataclass(frozen=True)
class TaskCompleted(DomainEvent):
task_id: UUID
story_points: int
@dataclass(frozen=True)
class SprintCapacityExceeded(DomainEvent):
current_points: int
max_points: int
@dataclass(frozen=True)
class ProjectCompleted(DomainEvent):
total_tasks: int
total_points: int
duration_days: int
The aggregate root: Project
@dataclass
class Project:
"""Aggregate root. All task operations go through this class."""
id: UUID
name: str
status: ProjectStatus
team: list[TeamMember]
sprint_capacity: StoryPoints
started_at: date | None = None
_tasks: list[Task] = None
_events: list[DomainEvent] = None
_version: int = 0
def __post_init__(self):
if self._tasks is None:
self._tasks = []
if self._events is None:
self._events = []
# --- Read-only access ---
@property
def tasks(self) -> tuple[Task, ...]:
return tuple(self._tasks)
@property
def pending_events(self) -> list[DomainEvent]:
return list(self._events)
@property
def version(self) -> int:
return self._version
def clear_events(self) -> None:
self._events.clear()
# --- Computed properties ---
@property
def total_story_points(self) -> int:
return sum(t.story_points.value for t in self._tasks)
@property
def completed_story_points(self) -> int:
return sum(
t.story_points.value for t in self._tasks
if t.status == TaskStatus.DONE
)
@property
def active_story_points(self) -> int:
return sum(
t.story_points.value for t in self._tasks
if t.status in (TaskStatus.TODO, TaskStatus.IN_PROGRESS, TaskStatus.IN_REVIEW)
)
@property
def completion_percentage(self) -> float:
total = self.total_story_points
if total == 0:
return 0.0
return round(self.completed_story_points / total * 100, 1)
# --- Commands (business operations) ---
def activate(self) -> None:
if self.status != ProjectStatus.PLANNING:
raise ValueError("Only planning projects can be activated")
if not self.team:
raise ValueError("Cannot activate a project with no team members")
self.status = ProjectStatus.ACTIVE
self.started_at = date.today()
def create_task(
self,
title: str,
priority: Priority,
story_points: StoryPoints,
assignee_id: UUID | None = None,
) -> UUID:
if self.status not in (ProjectStatus.PLANNING, ProjectStatus.ACTIVE):
raise ValueError(f"Cannot add tasks to {self.status.value} project")
if assignee_id and not self._is_team_member(assignee_id):
raise ValueError("Assignee is not a team member")
# Check sprint capacity
if self.active_story_points + story_points.value > self.sprint_capacity.value:
self._events.append(SprintCapacityExceeded(
event_id=uuid4(),
occurred_at=datetime.now(timezone.utc),
aggregate_id=self.id,
current_points=self.active_story_points,
max_points=self.sprint_capacity.value,
))
task_id = uuid4()
task = Task(
id=task_id,
title=title,
assignee_id=assignee_id,
priority=priority,
status=TaskStatus.TODO,
story_points=story_points,
created_at=datetime.now(timezone.utc),
)
self._tasks.append(task)
self._events.append(TaskCreated(
event_id=uuid4(),
occurred_at=datetime.now(timezone.utc),
aggregate_id=self.id,
task_id=task_id,
title=title,
priority=priority.name,
))
return task_id
def assign_task(self, task_id: UUID, member_id: UUID) -> None:
if self.status != ProjectStatus.ACTIVE:
raise ValueError("Project must be active to assign tasks")
task = self._find_task(task_id)
member = self._find_member(member_id)
# Cross-entity invariant: member workload limit
active_tasks_for_member = sum(
1 for t in self._tasks
if t.assignee_id == member_id
and t.status in (TaskStatus.IN_PROGRESS, TaskStatus.IN_REVIEW)
)
if active_tasks_for_member >= member.max_concurrent_tasks:
raise ValueError(
f"{member.name} already has {active_tasks_for_member} active tasks "
f"(max: {member.max_concurrent_tasks})"
)
task._assign(member_id)
self._events.append(TaskAssigned(
event_id=uuid4(),
occurred_at=datetime.now(timezone.utc),
aggregate_id=self.id,
task_id=task_id,
assignee_id=member_id,
))
def start_task(self, task_id: UUID) -> None:
if self.status != ProjectStatus.ACTIVE:
raise ValueError("Project must be active to start tasks")
task = self._find_task(task_id)
task._mark_in_progress()
def complete_task(self, task_id: UUID) -> None:
task = self._find_task(task_id)
task._mark_done()
self._events.append(TaskCompleted(
event_id=uuid4(),
occurred_at=datetime.now(timezone.utc),
aggregate_id=self.id,
task_id=task_id,
story_points=task.story_points.value,
))
# Check if all tasks are done
if all(t.status == TaskStatus.DONE for t in self._tasks) and self._tasks:
self.status = ProjectStatus.COMPLETED
self._events.append(ProjectCompleted(
event_id=uuid4(),
occurred_at=datetime.now(timezone.utc),
aggregate_id=self.id,
total_tasks=len(self._tasks),
total_points=self.total_story_points,
duration_days=(date.today() - self.started_at).days if self.started_at else 0,
))
# --- Internal helpers ---
def _find_task(self, task_id: UUID) -> Task:
for task in self._tasks:
if task.id == task_id:
return task
raise KeyError(f"Task {task_id} not found in project {self.id}")
def _find_member(self, member_id: UUID) -> TeamMember:
for member in self.team:
if member.id == member_id:
return member
raise KeyError(f"Member {member_id} not on project team")
def _is_team_member(self, member_id: UUID) -> bool:
return any(m.id == member_id for m in self.team)
Repository with optimistic concurrency
from typing import Protocol
class ProjectRepository(Protocol):
def get(self, project_id: UUID) -> Project: ...
def save(self, project: Project) -> None: ...
class SqlProjectRepository:
def __init__(self, session) -> None:
self._session = session
def get(self, project_id: UUID) -> Project:
model = self._session.query(ProjectModel).get(str(project_id))
if model is None:
raise KeyError(f"Project {project_id} not found")
return self._to_domain(model)
def save(self, project: Project) -> None:
model = self._session.query(ProjectModel).get(str(project.id))
if model is None:
model = ProjectModel(id=str(project.id))
self._session.add(model)
# Optimistic concurrency check
if model.version != project.version:
raise ConcurrencyError(
f"Project {project.id} was modified by another process "
f"(expected version {project.version}, found {model.version})"
)
self._update_model(model, project)
model.version += 1
project._version = model.version
self._session.flush()
Unit of Work with event dispatch
class UnitOfWork:
def __init__(self, session_factory, event_bus):
self._session_factory = session_factory
self._event_bus = event_bus
self._aggregates: list[Project] = []
def __enter__(self):
self._session = self._session_factory()
self.projects = SqlProjectRepository(self._session)
return self
def __exit__(self, *args):
if args[0]:
self._session.rollback()
self._session.close()
def track(self, project: Project) -> None:
self._aggregates.append(project)
def commit(self) -> None:
for agg in self._aggregates:
self.projects.save(agg)
self._session.commit()
# Dispatch events after successful commit
for agg in self._aggregates:
for event in agg.pending_events:
self._event_bus.publish(event)
agg.clear_events()
self._aggregates.clear()
Use case layer
class AssignTaskUseCase:
def __init__(self, uow_factory) -> None:
self._uow_factory = uow_factory
def execute(self, project_id: UUID, task_id: UUID, member_id: UUID) -> None:
with self._uow_factory() as uow:
project = uow.projects.get(project_id)
uow.track(project)
project.assign_task(task_id, member_id)
uow.commit()
The use case is thin: load aggregate, call a method, commit. All business logic lives in the aggregate.
Testing
Aggregate unit tests
def make_project(**overrides) -> Project:
defaults = {
"id": uuid4(),
"name": "Test Project",
"status": ProjectStatus.ACTIVE,
"team": [TeamMember(id=uuid4(), name="Alice", max_concurrent_tasks=2)],
"sprint_capacity": StoryPoints(20),
"started_at": date.today(),
}
defaults.update(overrides)
return Project(**defaults)
def test_create_task_returns_id():
project = make_project()
task_id = project.create_task("Build login", Priority.HIGH, StoryPoints(5))
assert isinstance(task_id, UUID)
assert len(project.tasks) == 1
def test_cannot_add_task_to_completed_project():
project = make_project(status=ProjectStatus.COMPLETED)
with pytest.raises(ValueError, match="Cannot add tasks"):
project.create_task("Late task", Priority.LOW, StoryPoints(1))
def test_member_workload_limit():
member_id = uuid4()
project = make_project(
team=[TeamMember(id=member_id, name="Bob", max_concurrent_tasks=1)]
)
t1 = project.create_task("Task 1", Priority.MEDIUM, StoryPoints(3), member_id)
project.start_task(t1)
t2 = project.create_task("Task 2", Priority.MEDIUM, StoryPoints(2), member_id)
with pytest.raises(ValueError, match="already has 1 active tasks"):
project.assign_task(t2, member_id)
def test_completing_all_tasks_completes_project():
project = make_project()
member_id = project.team[0].id
t1 = project.create_task("Task 1", Priority.HIGH, StoryPoints(5), member_id)
project.start_task(t1)
project.complete_task(t1)
assert project.status == ProjectStatus.COMPLETED
completed_events = [e for e in project.pending_events if isinstance(e, ProjectCompleted)]
assert len(completed_events) == 1
def test_sprint_capacity_warning():
project = make_project(sprint_capacity=StoryPoints(5))
project.create_task("Big task", Priority.HIGH, StoryPoints(8))
warnings = [e for e in project.pending_events if isinstance(e, SprintCapacityExceeded)]
assert len(warnings) == 1
All tests are pure unit tests: no database, no mocking, no infrastructure. The aggregate validates itself.
Aggregate sizing guidelines
| Signal | Action |
|---|---|
| Aggregate has > 15 entities | Consider splitting |
| Multiple users frequently compete for the same aggregate | Too large — split to reduce contention |
| Operations on child entities never check other children | The child might be its own aggregate |
| Loading the aggregate requires joining 5+ tables | Performance concern — consider splitting |
| Business rules cross the proposed boundary | Keep together — the invariant dictates the boundary |
Common pitfalls
- Exposing internal collections — Returning
self._tasksdirectly allows bypass. Return tuples or frozen views. - Anemic aggregates — If the aggregate root is just getters and setters with validation in services, you have lost the pattern’s benefit. Put behavior in the aggregate.
- Cross-aggregate transactions — Saving two aggregates in one transaction defeats the purpose. Use domain events for cross-aggregate coordination.
- Too-large aggregates — Including everything “just in case” causes locking contention and slow loading. Start small and grow only when invariants require it.
The one thing to remember: Aggregates in Python enforce business invariants by routing all modifications through a root object that checks rules before applying changes — keeping your domain consistent without scattering validation across your codebase.
See Also
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.
- Python Connection Draining How to shut down a Python server without hanging up on people mid-conversation — like a store that locks the entrance but lets shoppers finish.