Python Microservices Architecture — Deep Dive
Domain-Driven Design for Service Boundaries
The most reliable way to define microservice boundaries comes from Domain-Driven Design (DDD). The core concept is the Bounded Context — a boundary within which a particular domain model applies.
Identifying Bounded Contexts
Map your business processes and look for natural seams:
E-commerce Domain Map:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Catalog Context │ │ Order Context │ │ Shipping Context │
│ │ │ │ │ │
│ - Product │ │ - Order │ │ - Shipment │
│ - Category │ │ - LineItem │ │ - Carrier │
│ - Pricing │ │ - Payment │ │ - Tracking │
│ - Review │ │ - Discount │ │ - Address │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Notice that “Product” means different things in different contexts. In Catalog, a Product has descriptions, images, and reviews. In Order, a Product is just an ID, name, and price. Each context has its own model of the same real-world thing.
Python Service Structure
order-service/
├── src/
│ └── order/
│ ├── __init__.py
│ ├── domain/
│ │ ├── models.py # Order, LineItem (business objects)
│ │ ├── events.py # OrderCreated, OrderCancelled
│ │ ├── commands.py # CreateOrder, CancelOrder
│ │ └── services.py # Domain logic (no I/O)
│ ├── application/
│ │ ├── handlers.py # Command and event handlers
│ │ └── queries.py # Read-side query handlers
│ ├── infrastructure/
│ │ ├── repository.py # Database access
│ │ ├── messaging.py # Broker integration
│ │ └── http_client.py # External service calls
│ └── api/
│ ├── routes.py # FastAPI routes
│ └── schemas.py # API request/response models
├── tests/
├── Dockerfile
├── pyproject.toml
└── alembic/ # Database migrations
The domain layer has zero dependencies on frameworks or infrastructure. This makes it testable and portable.
Inter-Service Communication Patterns
Synchronous with Circuit Breakers
Direct HTTP calls between services need protection against cascading failures:
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=0.5, max=5))
async def get_product_details(product_id: str) -> dict:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"http://catalog-service/api/products/{product_id}"
)
response.raise_for_status()
return response.json()
The circuit breaker trips after 5 consecutive failures, returning errors immediately for 30 seconds instead of waiting for timeouts. The retry decorator handles transient failures.
Asynchronous with Event Bus
For events that don’t need immediate responses:
# Domain event definition
from dataclasses import dataclass, field
from datetime import datetime
from uuid import uuid4
@dataclass
class OrderCreated:
order_id: str
customer_id: str
items: list[dict]
total_amount: float
event_id: str = field(default_factory=lambda: str(uuid4()))
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# Publishing
import json
import aio_pika
class RabbitMQPublisher:
def __init__(self, connection: aio_pika.Connection):
self.connection = connection
async def publish(self, event: OrderCreated):
channel = await self.connection.channel()
exchange = await channel.declare_exchange(
"domain_events", aio_pika.ExchangeType.TOPIC, durable=True
)
message = aio_pika.Message(
body=json.dumps({
"event_type": type(event).__name__,
"event_id": event.event_id,
"timestamp": event.timestamp,
"data": {
"order_id": event.order_id,
"customer_id": event.customer_id,
"items": event.items,
"total_amount": event.total_amount,
}
}).encode(),
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
)
await exchange.publish(message, routing_key="order.created")
# Consuming (in the notification service)
class NotificationConsumer:
async def on_order_created(self, message: aio_pika.IncomingMessage):
async with message.process():
event = json.loads(message.body)
await self.send_confirmation_email(
event["data"]["customer_id"],
event["data"]["order_id"]
)
gRPC for Internal High-Throughput Communication
When services make many calls to each other, gRPC’s binary protocol and connection multiplexing outperform REST:
// inventory.proto
syntax = "proto3";
service InventoryService {
rpc CheckStock (StockRequest) returns (StockResponse);
rpc ReserveItems (ReserveRequest) returns (ReserveResponse);
rpc StreamUpdates (StockQuery) returns (stream StockUpdate);
}
message StockRequest {
string product_id = 1;
}
message StockResponse {
string product_id = 1;
int32 available = 2;
bool in_stock = 3;
}
# gRPC server implementation
import grpc
from concurrent import futures
import inventory_pb2_grpc as pb2_grpc
import inventory_pb2 as pb2
class InventoryServicer(pb2_grpc.InventoryServiceServicer):
async def CheckStock(self, request, context):
stock = await self.repo.get_stock(request.product_id)
return pb2.StockResponse(
product_id=request.product_id,
available=stock.quantity,
in_stock=stock.quantity > 0
)
async def serve():
server = grpc.aio.server()
pb2_grpc.add_InventoryServiceServicer_to_server(InventoryServicer(), server)
server.add_insecure_port("[::]:50051")
await server.start()
await server.wait_for_termination()
Distributed Data Patterns
The Outbox Pattern
When a service needs to update its database AND publish an event, you face a consistency problem. The database write might succeed but the event publish might fail (or vice versa).
The outbox pattern solves this:
from sqlalchemy.ext.asyncio import AsyncSession
async def create_order(session: AsyncSession, order_data: dict):
# 1. Create the order
order = Order(**order_data)
session.add(order)
# 2. Write event to outbox table (same transaction)
outbox_entry = OutboxMessage(
event_type="OrderCreated",
payload=json.dumps({
"order_id": str(order.id),
"customer_id": order.customer_id,
"total": float(order.total),
}),
created_at=datetime.utcnow(),
)
session.add(outbox_entry)
# 3. Single commit — both or neither
await session.commit()
# Separate process polls the outbox and publishes
async def outbox_relay():
while True:
async with get_session() as session:
entries = await session.execute(
select(OutboxMessage)
.where(OutboxMessage.published == False)
.order_by(OutboxMessage.created_at)
.limit(100)
)
for entry in entries.scalars():
await broker.publish(entry.event_type, json.loads(entry.payload))
entry.published = True
await session.commit()
await asyncio.sleep(1)
API Composition for Cross-Service Queries
When a frontend needs data from multiple services (order details + product info + customer name), use an API composition layer:
# BFF (Backend for Frontend) or API Gateway
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/api/order-details/{order_id}")
async def get_order_details(order_id: str):
async with httpx.AsyncClient() as client:
order_resp, = await asyncio.gather(
client.get(f"http://order-service/api/orders/{order_id}"),
)
order = order_resp.json()
product_tasks = [
client.get(f"http://catalog-service/api/products/{item['product_id']}")
for item in order["items"]
]
customer_task = client.get(
f"http://customer-service/api/customers/{order['customer_id']}"
)
results = await asyncio.gather(*product_tasks, customer_task)
products = {r.json()["id"]: r.json() for r in results[:-1]}
customer = results[-1].json()
return {
"order": order,
"products": products,
"customer": {"name": customer["name"], "email": customer["email"]},
}
Observability
Without observability, debugging distributed systems is impossible.
Distributed Tracing with OpenTelemetry
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Setup once at startup
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
# Auto-instrument frameworks
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
This propagates trace IDs across service boundaries automatically. A single user request generates a trace that spans all services it touches, visible in Jaeger or Grafana Tempo.
Structured Logging
import structlog
logger = structlog.get_logger()
@app.middleware("http")
async def log_requests(request, call_next):
trace_id = request.headers.get("x-trace-id", str(uuid4()))
structlog.contextvars.bind_contextvars(trace_id=trace_id)
logger.info("request_started", method=request.method, path=request.url.path)
response = await call_next(request)
logger.info("request_completed", status=response.status_code)
return response
Deployment with Docker and Kubernetes
Each service gets its own Dockerfile:
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir .
COPY src/ src/
EXPOSE 8000
CMD ["uvicorn", "order.api.routes:app", "--host", "0.0.0.0", "--port", "8000"]
Kubernetes manifests define how services discover each other:
apiVersion: v1
kind: Service
metadata:
name: order-service
spec:
selector:
app: order-service
ports:
- port: 80
targetPort: 8000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
spec:
replicas: 3
selector:
matchLabels:
app: order-service
template:
spec:
containers:
- name: order
image: registry/order-service:v1.2.3
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: order-secrets
key: database-url
readinessProbe:
httpGet:
path: /health
port: 8000
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
When to Use Microservices
| Signal | Monolith | Microservices |
|---|---|---|
| Team size | < 20 engineers | 20+ engineers |
| Deployment frequency | Weekly/monthly | Multiple times daily |
| Scaling needs | Uniform | Different services need different scaling |
| Domain complexity | Single domain | Multiple distinct business domains |
| Organizational structure | One team | Multiple autonomous teams |
The most successful microservices migrations happen incrementally: extract one service at a time from the monolith, starting with the domain that changes most frequently or has the most distinct scaling needs.
The one thing to remember: Python microservices succeed when service boundaries follow business domains, communication patterns match consistency needs (sync for queries, async for commands), and observability is built in from day one — not bolted on later.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.