Python Microservices Architecture — Deep Dive

Domain-Driven Design for Service Boundaries

The most reliable way to define microservice boundaries comes from Domain-Driven Design (DDD). The core concept is the Bounded Context — a boundary within which a particular domain model applies.

Identifying Bounded Contexts

Map your business processes and look for natural seams:

E-commerce Domain Map:
┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│  Catalog Context │  │  Order Context   │  │ Shipping Context │
│                  │  │                  │  │                  │
│ - Product        │  │ - Order          │  │ - Shipment       │
│ - Category       │  │ - LineItem       │  │ - Carrier        │
│ - Pricing        │  │ - Payment        │  │ - Tracking       │
│ - Review         │  │ - Discount       │  │ - Address        │
└─────────────────┘  └─────────────────┘  └─────────────────┘

Notice that “Product” means different things in different contexts. In Catalog, a Product has descriptions, images, and reviews. In Order, a Product is just an ID, name, and price. Each context has its own model of the same real-world thing.

Python Service Structure

order-service/
├── src/
│   └── order/
│       ├── __init__.py
│       ├── domain/
│       │   ├── models.py       # Order, LineItem (business objects)
│       │   ├── events.py       # OrderCreated, OrderCancelled
│       │   ├── commands.py     # CreateOrder, CancelOrder
│       │   └── services.py     # Domain logic (no I/O)
│       ├── application/
│       │   ├── handlers.py     # Command and event handlers
│       │   └── queries.py      # Read-side query handlers
│       ├── infrastructure/
│       │   ├── repository.py   # Database access
│       │   ├── messaging.py    # Broker integration
│       │   └── http_client.py  # External service calls
│       └── api/
│           ├── routes.py       # FastAPI routes
│           └── schemas.py      # API request/response models
├── tests/
├── Dockerfile
├── pyproject.toml
└── alembic/                    # Database migrations

The domain layer has zero dependencies on frameworks or infrastructure. This makes it testable and portable.

Inter-Service Communication Patterns

Synchronous with Circuit Breakers

Direct HTTP calls between services need protection against cascading failures:

import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=30)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=0.5, max=5))
async def get_product_details(product_id: str) -> dict:
    async with httpx.AsyncClient(timeout=5.0) as client:
        response = await client.get(
            f"http://catalog-service/api/products/{product_id}"
        )
        response.raise_for_status()
        return response.json()

The circuit breaker trips after 5 consecutive failures, returning errors immediately for 30 seconds instead of waiting for timeouts. The retry decorator handles transient failures.

Asynchronous with Event Bus

For events that don’t need immediate responses:

# Domain event definition
from dataclasses import dataclass, field
from datetime import datetime
from uuid import uuid4

@dataclass
class OrderCreated:
    order_id: str
    customer_id: str
    items: list[dict]
    total_amount: float
    event_id: str = field(default_factory=lambda: str(uuid4()))
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())

# Publishing
import json
import aio_pika

class RabbitMQPublisher:
    def __init__(self, connection: aio_pika.Connection):
        self.connection = connection

    async def publish(self, event: OrderCreated):
        channel = await self.connection.channel()
        exchange = await channel.declare_exchange(
            "domain_events", aio_pika.ExchangeType.TOPIC, durable=True
        )
        message = aio_pika.Message(
            body=json.dumps({
                "event_type": type(event).__name__,
                "event_id": event.event_id,
                "timestamp": event.timestamp,
                "data": {
                    "order_id": event.order_id,
                    "customer_id": event.customer_id,
                    "items": event.items,
                    "total_amount": event.total_amount,
                }
            }).encode(),
            content_type="application/json",
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
        )
        await exchange.publish(message, routing_key="order.created")

# Consuming (in the notification service)
class NotificationConsumer:
    async def on_order_created(self, message: aio_pika.IncomingMessage):
        async with message.process():
            event = json.loads(message.body)
            await self.send_confirmation_email(
                event["data"]["customer_id"],
                event["data"]["order_id"]
            )

gRPC for Internal High-Throughput Communication

When services make many calls to each other, gRPC’s binary protocol and connection multiplexing outperform REST:

// inventory.proto
syntax = "proto3";

service InventoryService {
    rpc CheckStock (StockRequest) returns (StockResponse);
    rpc ReserveItems (ReserveRequest) returns (ReserveResponse);
    rpc StreamUpdates (StockQuery) returns (stream StockUpdate);
}

message StockRequest {
    string product_id = 1;
}

message StockResponse {
    string product_id = 1;
    int32 available = 2;
    bool in_stock = 3;
}
# gRPC server implementation
import grpc
from concurrent import futures
import inventory_pb2_grpc as pb2_grpc
import inventory_pb2 as pb2

class InventoryServicer(pb2_grpc.InventoryServiceServicer):
    async def CheckStock(self, request, context):
        stock = await self.repo.get_stock(request.product_id)
        return pb2.StockResponse(
            product_id=request.product_id,
            available=stock.quantity,
            in_stock=stock.quantity > 0
        )

async def serve():
    server = grpc.aio.server()
    pb2_grpc.add_InventoryServiceServicer_to_server(InventoryServicer(), server)
    server.add_insecure_port("[::]:50051")
    await server.start()
    await server.wait_for_termination()

Distributed Data Patterns

The Outbox Pattern

When a service needs to update its database AND publish an event, you face a consistency problem. The database write might succeed but the event publish might fail (or vice versa).

The outbox pattern solves this:

from sqlalchemy.ext.asyncio import AsyncSession

async def create_order(session: AsyncSession, order_data: dict):
    # 1. Create the order
    order = Order(**order_data)
    session.add(order)

    # 2. Write event to outbox table (same transaction)
    outbox_entry = OutboxMessage(
        event_type="OrderCreated",
        payload=json.dumps({
            "order_id": str(order.id),
            "customer_id": order.customer_id,
            "total": float(order.total),
        }),
        created_at=datetime.utcnow(),
    )
    session.add(outbox_entry)

    # 3. Single commit — both or neither
    await session.commit()

# Separate process polls the outbox and publishes
async def outbox_relay():
    while True:
        async with get_session() as session:
            entries = await session.execute(
                select(OutboxMessage)
                .where(OutboxMessage.published == False)
                .order_by(OutboxMessage.created_at)
                .limit(100)
            )
            for entry in entries.scalars():
                await broker.publish(entry.event_type, json.loads(entry.payload))
                entry.published = True
            await session.commit()
        await asyncio.sleep(1)

API Composition for Cross-Service Queries

When a frontend needs data from multiple services (order details + product info + customer name), use an API composition layer:

# BFF (Backend for Frontend) or API Gateway
from fastapi import FastAPI
import httpx

app = FastAPI()

@app.get("/api/order-details/{order_id}")
async def get_order_details(order_id: str):
    async with httpx.AsyncClient() as client:
        order_resp, = await asyncio.gather(
            client.get(f"http://order-service/api/orders/{order_id}"),
        )
        order = order_resp.json()

        product_tasks = [
            client.get(f"http://catalog-service/api/products/{item['product_id']}")
            for item in order["items"]
        ]
        customer_task = client.get(
            f"http://customer-service/api/customers/{order['customer_id']}"
        )

        results = await asyncio.gather(*product_tasks, customer_task)
        products = {r.json()["id"]: r.json() for r in results[:-1]}
        customer = results[-1].json()

    return {
        "order": order,
        "products": products,
        "customer": {"name": customer["name"], "email": customer["email"]},
    }

Observability

Without observability, debugging distributed systems is impossible.

Distributed Tracing with OpenTelemetry

from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

# Setup once at startup
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)

# Auto-instrument frameworks
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()

This propagates trace IDs across service boundaries automatically. A single user request generates a trace that spans all services it touches, visible in Jaeger or Grafana Tempo.

Structured Logging

import structlog

logger = structlog.get_logger()

@app.middleware("http")
async def log_requests(request, call_next):
    trace_id = request.headers.get("x-trace-id", str(uuid4()))
    structlog.contextvars.bind_contextvars(trace_id=trace_id)

    logger.info("request_started", method=request.method, path=request.url.path)
    response = await call_next(request)
    logger.info("request_completed", status=response.status_code)

    return response

Deployment with Docker and Kubernetes

Each service gets its own Dockerfile:

FROM python:3.12-slim

WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir .
COPY src/ src/

EXPOSE 8000
CMD ["uvicorn", "order.api.routes:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes manifests define how services discover each other:

apiVersion: v1
kind: Service
metadata:
  name: order-service
spec:
  selector:
    app: order-service
  ports:
    - port: 80
      targetPort: 8000
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-service
  template:
    spec:
      containers:
        - name: order
          image: registry/order-service:v1.2.3
          ports:
            - containerPort: 8000
          env:
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: order-secrets
                  key: database-url
          readinessProbe:
            httpGet:
              path: /health
              port: 8000
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "500m"

When to Use Microservices

SignalMonolithMicroservices
Team size< 20 engineers20+ engineers
Deployment frequencyWeekly/monthlyMultiple times daily
Scaling needsUniformDifferent services need different scaling
Domain complexitySingle domainMultiple distinct business domains
Organizational structureOne teamMultiple autonomous teams

The most successful microservices migrations happen incrementally: extract one service at a time from the monolith, starting with the domain that changes most frequently or has the most distinct scaling needs.

The one thing to remember: Python microservices succeed when service boundaries follow business domains, communication patterns match consistency needs (sync for queries, async for commands), and observability is built in from day one — not bolted on later.

pythonmicroservicesarchitecture

See Also