Hexagonal Architecture in Python — Deep Dive
Implementing hexagonal architecture end-to-end
This guide builds a notification service using hexagonal architecture. The service accepts notification requests from multiple sources (API, CLI, message queue) and delivers them through multiple channels (email, Slack, SMS). This multi-input, multi-output scenario demonstrates why ports and adapters shine.
Defining the domain
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class Channel(Enum):
EMAIL = "email"
SLACK = "slack"
SMS = "sms"
class Priority(Enum):
LOW = "low"
NORMAL = "normal"
URGENT = "urgent"
@dataclass(frozen=True)
class Notification:
id: str
recipient: str
channel: Channel
subject: str
body: str
priority: Priority = Priority.NORMAL
created_at: datetime = None
def __post_init__(self):
if not self.recipient:
raise ValueError("Recipient cannot be empty")
if not self.body:
raise ValueError("Body cannot be empty")
Frozen dataclass ensures immutability — notifications are value objects that do not change after creation.
Defining ports
Driving port (inbound)
from typing import Protocol
class NotificationService(Protocol):
"""Driving port — how the outside world sends notifications."""
def send(self, notification: Notification) -> str: ...
def get_status(self, notification_id: str) -> str: ...
def cancel(self, notification_id: str) -> bool: ...
Driven ports (outbound)
class NotificationStore(Protocol):
"""Driven port — persistence."""
def save(self, notification: Notification) -> None: ...
def get(self, notification_id: str) -> Notification | None: ...
def update_status(self, notification_id: str, status: str) -> None: ...
class DeliveryChannel(Protocol):
"""Driven port — actual delivery mechanism."""
def deliver(self, notification: Notification) -> bool: ...
class AuditLogger(Protocol):
"""Driven port — compliance audit trail."""
def log(self, event: str, details: dict) -> None: ...
Each port is small and focused — Interface Segregation applied naturally.
Application core (use case implementation)
from datetime import datetime, timezone
class NotificationServiceImpl:
def __init__(
self,
store: NotificationStore,
channels: dict[Channel, DeliveryChannel],
audit: AuditLogger,
) -> None:
self._store = store
self._channels = channels
self._audit = audit
def send(self, notification: Notification) -> str:
# Store first for durability
self._store.save(notification)
self._audit.log("notification.created", {
"id": notification.id,
"channel": notification.channel.value,
})
# Route to correct delivery channel
channel = self._channels.get(notification.channel)
if channel is None:
self._store.update_status(notification.id, "unsupported_channel")
raise ValueError(f"No adapter for channel {notification.channel}")
success = channel.deliver(notification)
status = "delivered" if success else "failed"
self._store.update_status(notification.id, status)
self._audit.log("notification.delivered", {
"id": notification.id,
"status": status,
})
return notification.id
def get_status(self, notification_id: str) -> str:
notification = self._store.get(notification_id)
if notification is None:
raise KeyError(f"Notification {notification_id} not found")
return "stored"
def cancel(self, notification_id: str) -> bool:
self._store.update_status(notification_id, "cancelled")
self._audit.log("notification.cancelled", {"id": notification_id})
return True
The core knows nothing about HTTP, SQL, or Slack APIs. It works exclusively through ports.
Driving adapters
FastAPI adapter
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
router = APIRouter(prefix="/notifications")
class SendRequest(BaseModel):
recipient: str
channel: str
subject: str
body: str
priority: str = "normal"
@router.post("/")
def send_notification(
req: SendRequest,
service: NotificationService = Depends(get_service),
):
notification = Notification(
id=generate_id(),
recipient=req.recipient,
channel=Channel(req.channel),
subject=req.subject,
body=req.body,
priority=Priority(req.priority),
created_at=datetime.now(timezone.utc),
)
try:
nid = service.send(notification)
return {"notification_id": nid, "status": "accepted"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/{notification_id}/status")
def get_status(
notification_id: str,
service: NotificationService = Depends(get_service),
):
try:
status = service.get_status(notification_id)
return {"notification_id": notification_id, "status": status}
except KeyError:
raise HTTPException(status_code=404, detail="Not found")
CLI adapter
import click
@click.command()
@click.option("--recipient", required=True)
@click.option("--channel", type=click.Choice(["email", "slack", "sms"]))
@click.option("--subject", required=True)
@click.option("--body", required=True)
def send(recipient, channel, subject, body):
service = create_notification_service()
notification = Notification(
id=generate_id(),
recipient=recipient,
channel=Channel(channel),
subject=subject,
body=body,
created_at=datetime.now(timezone.utc),
)
nid = service.send(notification)
click.echo(f"Sent: {nid}")
Both adapters call the same NotificationService port. The core does not know (or care) which adapter triggered it.
Driven adapters
PostgreSQL store adapter
class PostgresNotificationStore:
def __init__(self, session_factory):
self._session_factory = session_factory
def save(self, notification: Notification) -> None:
with self._session_factory() as session:
model = NotificationModel(
id=notification.id,
recipient=notification.recipient,
channel=notification.channel.value,
subject=notification.subject,
body=notification.body,
priority=notification.priority.value,
status="pending",
created_at=notification.created_at,
)
session.add(model)
session.commit()
def get(self, notification_id: str) -> Notification | None:
with self._session_factory() as session:
model = session.get(NotificationModel, notification_id)
return self._to_domain(model) if model else None
def update_status(self, notification_id: str, status: str) -> None:
with self._session_factory() as session:
model = session.get(NotificationModel, notification_id)
if model:
model.status = status
session.commit()
Slack delivery adapter
import httpx
class SlackDeliveryChannel:
def __init__(self, webhook_url: str) -> None:
self._webhook_url = webhook_url
def deliver(self, notification: Notification) -> bool:
response = httpx.post(self._webhook_url, json={
"text": f"*{notification.subject}*\n{notification.body}",
})
return response.status_code == 200
Email delivery adapter
import smtplib
from email.message import EmailMessage
class SmtpDeliveryChannel:
def __init__(self, host: str, port: int, username: str, password: str):
self._host = host
self._port = port
self._username = username
self._password = password
def deliver(self, notification: Notification) -> bool:
msg = EmailMessage()
msg["Subject"] = notification.subject
msg["From"] = self._username
msg["To"] = notification.recipient
msg.set_content(notification.body)
try:
with smtplib.SMTP(self._host, self._port) as server:
server.starttls()
server.login(self._username, self._password)
server.send_message(msg)
return True
except smtplib.SMTPException:
return False
Composition root
def create_notification_service() -> NotificationServiceImpl:
store = PostgresNotificationStore(get_session_factory())
channels = {
Channel.EMAIL: SmtpDeliveryChannel(
settings.smtp_host, settings.smtp_port,
settings.smtp_user, settings.smtp_pass,
),
Channel.SLACK: SlackDeliveryChannel(settings.slack_webhook),
}
audit = FileAuditLogger(settings.audit_log_path)
return NotificationServiceImpl(store, channels, audit)
Adding SMS support: write TwilioDeliveryChannel, add Channel.SMS: TwilioDeliveryChannel(...) to the dict. Zero changes to existing code.
Testing at each boundary
Core tests (no infrastructure)
class InMemoryStore:
def __init__(self):
self.notifications = {}
self.statuses = {}
def save(self, n: Notification) -> None:
self.notifications[n.id] = n
self.statuses[n.id] = "pending"
def get(self, nid: str) -> Notification | None:
return self.notifications.get(nid)
def update_status(self, nid: str, status: str) -> None:
self.statuses[nid] = status
class FakeDelivery:
def __init__(self, succeed=True):
self._succeed = succeed
self.delivered = []
def deliver(self, n: Notification) -> bool:
self.delivered.append(n)
return self._succeed
def test_send_routes_to_correct_channel():
store = InMemoryStore()
email = FakeDelivery()
slack = FakeDelivery()
audit = FakeAuditLogger()
service = NotificationServiceImpl(
store, {Channel.EMAIL: email, Channel.SLACK: slack}, audit
)
notification = Notification(
id="1", recipient="user@example.com",
channel=Channel.EMAIL, subject="Test", body="Hello",
)
service.send(notification)
assert len(email.delivered) == 1
assert len(slack.delivered) == 0
assert store.statuses["1"] == "delivered"
Adapter integration tests
Test each driven adapter against a real (containerized) service. The PostgreSQL store gets tested against a test database. The Slack adapter gets tested against a mock webhook. These tests verify serialization and protocol correctness, not business logic.
Enforcing the hexagonal boundary
Use import-linter to prevent core from importing adapters:
[[tool.importlinter.contracts]]
name = "Core independence"
type = "independence"
modules = ["src.core", "src.adapters"]
This fails CI if anyone imports adapter code inside the core package.
Common pitfalls
- Leaking framework types into ports — Accepting
fastapi.Requestorsqlalchemy.Sessionas a port parameter defeats the purpose. - Too many ports — Not every function call needs a port. Ports are for external system boundaries, not internal helper functions.
- Adapter logic in the core — If your use case contains retry logic or HTTP status code handling, adapter concerns have leaked inward.
- Skipping the composition root — Instantiating adapters inside use cases creates hidden coupling.
The one thing to remember: Hexagonal architecture succeeds when every external system — whether it drives the app or is driven by it — connects through an explicit port, keeping your core logic completely infrastructure-free.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.