Hexagonal Architecture in Python — Deep Dive

Implementing hexagonal architecture end-to-end

This guide builds a notification service using hexagonal architecture. The service accepts notification requests from multiple sources (API, CLI, message queue) and delivers them through multiple channels (email, Slack, SMS). This multi-input, multi-output scenario demonstrates why ports and adapters shine.

Defining the domain

from dataclasses import dataclass
from enum import Enum
from datetime import datetime

class Channel(Enum):
    EMAIL = "email"
    SLACK = "slack"
    SMS = "sms"

class Priority(Enum):
    LOW = "low"
    NORMAL = "normal"
    URGENT = "urgent"

@dataclass(frozen=True)
class Notification:
    id: str
    recipient: str
    channel: Channel
    subject: str
    body: str
    priority: Priority = Priority.NORMAL
    created_at: datetime = None

    def __post_init__(self):
        if not self.recipient:
            raise ValueError("Recipient cannot be empty")
        if not self.body:
            raise ValueError("Body cannot be empty")

Frozen dataclass ensures immutability — notifications are value objects that do not change after creation.

Defining ports

Driving port (inbound)

from typing import Protocol

class NotificationService(Protocol):
    """Driving port — how the outside world sends notifications."""
    def send(self, notification: Notification) -> str: ...
    def get_status(self, notification_id: str) -> str: ...
    def cancel(self, notification_id: str) -> bool: ...

Driven ports (outbound)

class NotificationStore(Protocol):
    """Driven port — persistence."""
    def save(self, notification: Notification) -> None: ...
    def get(self, notification_id: str) -> Notification | None: ...
    def update_status(self, notification_id: str, status: str) -> None: ...

class DeliveryChannel(Protocol):
    """Driven port — actual delivery mechanism."""
    def deliver(self, notification: Notification) -> bool: ...

class AuditLogger(Protocol):
    """Driven port — compliance audit trail."""
    def log(self, event: str, details: dict) -> None: ...

Each port is small and focused — Interface Segregation applied naturally.

Application core (use case implementation)

from datetime import datetime, timezone

class NotificationServiceImpl:
    def __init__(
        self,
        store: NotificationStore,
        channels: dict[Channel, DeliveryChannel],
        audit: AuditLogger,
    ) -> None:
        self._store = store
        self._channels = channels
        self._audit = audit

    def send(self, notification: Notification) -> str:
        # Store first for durability
        self._store.save(notification)
        self._audit.log("notification.created", {
            "id": notification.id,
            "channel": notification.channel.value,
        })

        # Route to correct delivery channel
        channel = self._channels.get(notification.channel)
        if channel is None:
            self._store.update_status(notification.id, "unsupported_channel")
            raise ValueError(f"No adapter for channel {notification.channel}")

        success = channel.deliver(notification)
        status = "delivered" if success else "failed"
        self._store.update_status(notification.id, status)

        self._audit.log("notification.delivered", {
            "id": notification.id,
            "status": status,
        })

        return notification.id

    def get_status(self, notification_id: str) -> str:
        notification = self._store.get(notification_id)
        if notification is None:
            raise KeyError(f"Notification {notification_id} not found")
        return "stored"

    def cancel(self, notification_id: str) -> bool:
        self._store.update_status(notification_id, "cancelled")
        self._audit.log("notification.cancelled", {"id": notification_id})
        return True

The core knows nothing about HTTP, SQL, or Slack APIs. It works exclusively through ports.

Driving adapters

FastAPI adapter

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel

router = APIRouter(prefix="/notifications")

class SendRequest(BaseModel):
    recipient: str
    channel: str
    subject: str
    body: str
    priority: str = "normal"

@router.post("/")
def send_notification(
    req: SendRequest,
    service: NotificationService = Depends(get_service),
):
    notification = Notification(
        id=generate_id(),
        recipient=req.recipient,
        channel=Channel(req.channel),
        subject=req.subject,
        body=req.body,
        priority=Priority(req.priority),
        created_at=datetime.now(timezone.utc),
    )
    try:
        nid = service.send(notification)
        return {"notification_id": nid, "status": "accepted"}
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@router.get("/{notification_id}/status")
def get_status(
    notification_id: str,
    service: NotificationService = Depends(get_service),
):
    try:
        status = service.get_status(notification_id)
        return {"notification_id": notification_id, "status": status}
    except KeyError:
        raise HTTPException(status_code=404, detail="Not found")

CLI adapter

import click

@click.command()
@click.option("--recipient", required=True)
@click.option("--channel", type=click.Choice(["email", "slack", "sms"]))
@click.option("--subject", required=True)
@click.option("--body", required=True)
def send(recipient, channel, subject, body):
    service = create_notification_service()
    notification = Notification(
        id=generate_id(),
        recipient=recipient,
        channel=Channel(channel),
        subject=subject,
        body=body,
        created_at=datetime.now(timezone.utc),
    )
    nid = service.send(notification)
    click.echo(f"Sent: {nid}")

Both adapters call the same NotificationService port. The core does not know (or care) which adapter triggered it.

Driven adapters

PostgreSQL store adapter

class PostgresNotificationStore:
    def __init__(self, session_factory):
        self._session_factory = session_factory

    def save(self, notification: Notification) -> None:
        with self._session_factory() as session:
            model = NotificationModel(
                id=notification.id,
                recipient=notification.recipient,
                channel=notification.channel.value,
                subject=notification.subject,
                body=notification.body,
                priority=notification.priority.value,
                status="pending",
                created_at=notification.created_at,
            )
            session.add(model)
            session.commit()

    def get(self, notification_id: str) -> Notification | None:
        with self._session_factory() as session:
            model = session.get(NotificationModel, notification_id)
            return self._to_domain(model) if model else None

    def update_status(self, notification_id: str, status: str) -> None:
        with self._session_factory() as session:
            model = session.get(NotificationModel, notification_id)
            if model:
                model.status = status
                session.commit()

Slack delivery adapter

import httpx

class SlackDeliveryChannel:
    def __init__(self, webhook_url: str) -> None:
        self._webhook_url = webhook_url

    def deliver(self, notification: Notification) -> bool:
        response = httpx.post(self._webhook_url, json={
            "text": f"*{notification.subject}*\n{notification.body}",
        })
        return response.status_code == 200

Email delivery adapter

import smtplib
from email.message import EmailMessage

class SmtpDeliveryChannel:
    def __init__(self, host: str, port: int, username: str, password: str):
        self._host = host
        self._port = port
        self._username = username
        self._password = password

    def deliver(self, notification: Notification) -> bool:
        msg = EmailMessage()
        msg["Subject"] = notification.subject
        msg["From"] = self._username
        msg["To"] = notification.recipient
        msg.set_content(notification.body)

        try:
            with smtplib.SMTP(self._host, self._port) as server:
                server.starttls()
                server.login(self._username, self._password)
                server.send_message(msg)
            return True
        except smtplib.SMTPException:
            return False

Composition root

def create_notification_service() -> NotificationServiceImpl:
    store = PostgresNotificationStore(get_session_factory())
    channels = {
        Channel.EMAIL: SmtpDeliveryChannel(
            settings.smtp_host, settings.smtp_port,
            settings.smtp_user, settings.smtp_pass,
        ),
        Channel.SLACK: SlackDeliveryChannel(settings.slack_webhook),
    }
    audit = FileAuditLogger(settings.audit_log_path)
    return NotificationServiceImpl(store, channels, audit)

Adding SMS support: write TwilioDeliveryChannel, add Channel.SMS: TwilioDeliveryChannel(...) to the dict. Zero changes to existing code.

Testing at each boundary

Core tests (no infrastructure)

class InMemoryStore:
    def __init__(self):
        self.notifications = {}
        self.statuses = {}

    def save(self, n: Notification) -> None:
        self.notifications[n.id] = n
        self.statuses[n.id] = "pending"

    def get(self, nid: str) -> Notification | None:
        return self.notifications.get(nid)

    def update_status(self, nid: str, status: str) -> None:
        self.statuses[nid] = status

class FakeDelivery:
    def __init__(self, succeed=True):
        self._succeed = succeed
        self.delivered = []

    def deliver(self, n: Notification) -> bool:
        self.delivered.append(n)
        return self._succeed

def test_send_routes_to_correct_channel():
    store = InMemoryStore()
    email = FakeDelivery()
    slack = FakeDelivery()
    audit = FakeAuditLogger()

    service = NotificationServiceImpl(
        store, {Channel.EMAIL: email, Channel.SLACK: slack}, audit
    )

    notification = Notification(
        id="1", recipient="user@example.com",
        channel=Channel.EMAIL, subject="Test", body="Hello",
    )
    service.send(notification)

    assert len(email.delivered) == 1
    assert len(slack.delivered) == 0
    assert store.statuses["1"] == "delivered"

Adapter integration tests

Test each driven adapter against a real (containerized) service. The PostgreSQL store gets tested against a test database. The Slack adapter gets tested against a mock webhook. These tests verify serialization and protocol correctness, not business logic.

Enforcing the hexagonal boundary

Use import-linter to prevent core from importing adapters:

[[tool.importlinter.contracts]]
name = "Core independence"
type = "independence"
modules = ["src.core", "src.adapters"]

This fails CI if anyone imports adapter code inside the core package.

Common pitfalls

  1. Leaking framework types into ports — Accepting fastapi.Request or sqlalchemy.Session as a port parameter defeats the purpose.
  2. Too many ports — Not every function call needs a port. Ports are for external system boundaries, not internal helper functions.
  3. Adapter logic in the core — If your use case contains retry logic or HTTP status code handling, adapter concerns have leaked inward.
  4. Skipping the composition root — Instantiating adapters inside use cases creates hidden coupling.

The one thing to remember: Hexagonal architecture succeeds when every external system — whether it drives the app or is driven by it — connects through an explicit port, keeping your core logic completely infrastructure-free.

pythonarchitectureclean-code

See Also