Python Saga Pattern — Deep Dive
Building a Persistent Saga Orchestrator
A production saga orchestrator must survive crashes. The key is persisting state before executing each step so recovery is always possible.
Step Definition DSL
from dataclasses import dataclass, field
from typing import Callable, Awaitable, Optional
from enum import Enum
@dataclass
class SagaStep:
name: str
action: Callable[..., Awaitable]
compensation: Optional[Callable[..., Awaitable]] = None
timeout_seconds: float = 30.0
max_retries: int = 3
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
COMPENSATION_FAILED = "compensation_failed"
@dataclass
class StepResult:
step_name: str
status: StepStatus
result: Optional[dict] = None
error: Optional[str] = None
attempts: int = 0
The Saga Engine
import asyncio
import json
import logging
from datetime import datetime
from uuid import uuid4
class SagaEngine:
def __init__(self, saga_store, event_publisher):
self.store = saga_store
self.publisher = event_publisher
async def execute(self, saga_id: str, steps: list[SagaStep], context: dict) -> dict:
# Initialize saga record
saga = await self.store.create_saga(saga_id, context, steps)
for i, step in enumerate(steps):
# Persist intent before executing
await self.store.update_step(saga_id, step.name, StepStatus.RUNNING)
try:
result = await self._execute_with_retry(step, context)
context[f"{step.name}_result"] = result
await self.store.update_step(
saga_id, step.name, StepStatus.COMPLETED, result=result
)
except Exception as e:
logging.error(f"Saga {saga_id} failed at step {step.name}: {e}")
await self.store.update_step(
saga_id, step.name, StepStatus.FAILED, error=str(e)
)
# Compensate completed steps in reverse
await self._compensate(saga_id, steps[:i], context)
await self.publisher.publish("saga.failed", {
"saga_id": saga_id,
"failed_step": step.name,
"error": str(e),
})
return {"status": "failed", "failed_step": step.name}
await self.publisher.publish("saga.completed", {"saga_id": saga_id})
return {"status": "completed", "saga_id": saga_id}
async def _execute_with_retry(self, step: SagaStep, context: dict) -> dict:
last_error = None
for attempt in range(1, step.max_retries + 1):
try:
return await asyncio.wait_for(
step.action(context),
timeout=step.timeout_seconds,
)
except asyncio.TimeoutError:
last_error = f"Timeout after {step.timeout_seconds}s"
logging.warning(
f"Step {step.name} attempt {attempt} timed out"
)
except Exception as e:
last_error = str(e)
logging.warning(
f"Step {step.name} attempt {attempt} failed: {e}"
)
if attempt < step.max_retries:
await asyncio.sleep(2 ** attempt) # Exponential backoff
raise RuntimeError(f"Step {step.name} failed after {step.max_retries} attempts: {last_error}")
async def _compensate(self, saga_id: str, completed_steps: list[SagaStep], context: dict):
await self.store.mark_compensating(saga_id)
for step in reversed(completed_steps):
if not step.compensation:
continue
await self.store.update_step(saga_id, step.name, StepStatus.COMPENSATING)
try:
await asyncio.wait_for(
step.compensation(context),
timeout=step.timeout_seconds * 2, # More time for compensation
)
await self.store.update_step(saga_id, step.name, StepStatus.COMPENSATED)
except Exception as e:
logging.critical(
f"COMPENSATION FAILED for saga {saga_id}, step {step.name}: {e}"
)
await self.store.update_step(
saga_id, step.name, StepStatus.COMPENSATION_FAILED, error=str(e)
)
# Alert ops team — manual intervention needed
await self.publisher.publish("saga.compensation_failed", {
"saga_id": saga_id,
"step": step.name,
"error": str(e),
})
Using the Engine
# Define saga steps
async def charge_payment(context):
result = await payment_client.charge(
customer_id=context["customer_id"],
amount=context["total"],
idempotency_key=f"{context['saga_id']}-payment",
)
return {"payment_id": result.id, "amount": result.amount}
async def refund_payment(context):
payment_id = context["charge_payment_result"]["payment_id"]
await payment_client.refund(
payment_id=payment_id,
idempotency_key=f"{context['saga_id']}-refund",
)
async def reserve_inventory(context):
result = await inventory_client.reserve(
items=context["items"],
idempotency_key=f"{context['saga_id']}-inventory",
)
return {"reservation_id": result.id}
async def release_inventory(context):
reservation_id = context["reserve_inventory_result"]["reservation_id"]
await inventory_client.release(
reservation_id=reservation_id,
idempotency_key=f"{context['saga_id']}-release",
)
async def create_shipment(context):
result = await shipping_client.create_shipment(
order_id=context["order_id"],
address=context["shipping_address"],
idempotency_key=f"{context['saga_id']}-shipment",
)
return {"shipment_id": result.id, "tracking": result.tracking_number}
async def cancel_shipment(context):
shipment_id = context["create_shipment_result"]["shipment_id"]
await shipping_client.cancel(
shipment_id=shipment_id,
idempotency_key=f"{context['saga_id']}-cancel-shipment",
)
# Compose and execute
order_saga_steps = [
SagaStep("charge_payment", charge_payment, refund_payment, timeout_seconds=15),
SagaStep("reserve_inventory", reserve_inventory, release_inventory, timeout_seconds=10),
SagaStep("create_shipment", create_shipment, cancel_shipment, timeout_seconds=20),
]
result = await saga_engine.execute(
saga_id=str(uuid4()),
steps=order_saga_steps,
context={
"order_id": "order-123",
"customer_id": "cust-456",
"items": [{"sku": "W-001", "qty": 2}],
"total": 49.99,
"shipping_address": "123 Main St",
}
)
Idempotent Compensations
Compensations may be called multiple times (due to retries or recovery). Every compensation must be idempotent.
Idempotency Key Pattern
class PaymentService:
async def refund(self, payment_id: str, idempotency_key: str):
# Check if this refund was already processed
existing = await self.db.find_one(
"refunds", {"idempotency_key": idempotency_key}
)
if existing:
return existing # Already processed
# Process refund
refund = await self.gateway.refund(payment_id)
# Record with idempotency key
await self.db.insert("refunds", {
"idempotency_key": idempotency_key,
"payment_id": payment_id,
"refund_id": refund.id,
"status": "completed",
})
return refund
State-Based Idempotency
class InventoryService:
async def release(self, reservation_id: str, idempotency_key: str):
reservation = await self.db.find_one(
"reservations", {"id": reservation_id}
)
if not reservation:
return # Already released or never existed — idempotent success
if reservation["status"] == "released":
return # Already compensated — idempotent success
# Release the stock
await self.db.update(
"inventory",
{"sku": reservation["sku"]},
{"$inc": {"available": reservation["quantity"]}}
)
await self.db.update(
"reservations",
{"id": reservation_id},
{"status": "released", "released_at": datetime.utcnow()}
)
Choreography-Based Saga Implementation
For teams that prefer event-driven choreography:
# Each service manages its own saga participation
class PaymentSagaParticipant:
def __init__(self, broker, payment_gateway, db):
self.broker = broker
self.gateway = payment_gateway
self.db = db
async def on_order_created(self, event: dict):
order_id = event["data"]["order_id"]
try:
payment = await self.gateway.charge(
amount=event["data"]["total"],
customer_id=event["data"]["customer_id"],
)
await self.db.insert("payments", {
"order_id": order_id,
"payment_id": payment.id,
"status": "charged",
})
await self.broker.publish("payment.completed", {
"order_id": order_id,
"payment_id": payment.id,
})
except Exception as e:
await self.broker.publish("payment.failed", {
"order_id": order_id,
"error": str(e),
})
async def on_inventory_reservation_failed(self, event: dict):
# Compensate: refund the payment
order_id = event["data"]["order_id"]
payment = await self.db.find_one("payments", {"order_id": order_id})
if payment and payment["status"] == "charged":
await self.gateway.refund(payment["payment_id"])
await self.db.update(
"payments",
{"order_id": order_id},
{"status": "refunded"},
)
await self.broker.publish("payment.refunded", {
"order_id": order_id,
})
Saga Routing Table
To manage choreography complexity, maintain an explicit routing table:
SAGA_ROUTING = {
"order.created": [
("payment-service", "on_order_created"),
],
"payment.completed": [
("inventory-service", "on_payment_completed"),
],
"payment.failed": [
("order-service", "on_payment_failed"),
],
"inventory.reserved": [
("shipping-service", "on_inventory_reserved"),
],
"inventory.reservation_failed": [
("payment-service", "on_inventory_reservation_failed"),
("order-service", "on_inventory_reservation_failed"),
],
}
Document this table — it’s the only place the full workflow is visible in a choreography saga.
Handling Timeout and Partial Failures
Saga Timeout Watchdog
class SagaTimeoutWatchdog:
def __init__(self, saga_store, saga_engine, max_saga_duration=300):
self.store = saga_store
self.engine = saga_engine
self.max_duration = max_saga_duration # seconds
async def check_stale_sagas(self):
"""Run periodically (e.g., every 60 seconds)"""
cutoff = datetime.utcnow() - timedelta(seconds=self.max_duration)
stale_sagas = await self.store.find_sagas(
status__in=["running", "compensating"],
updated_at__lt=cutoff,
)
for saga in stale_sagas:
logging.warning(f"Saga {saga['saga_id']} appears stuck, triggering recovery")
if saga["status"] == "running":
# Trigger compensation from the last completed step
await self.engine.recover_and_compensate(saga["saga_id"])
elif saga["status"] == "compensating":
# Retry failed compensations
await self.engine.retry_compensations(saga["saga_id"])
await self.publisher.publish("saga.timeout", {
"saga_id": saga["saga_id"],
"stuck_at": saga["current_step"],
})
The “Pending” State Pattern
For irreversible steps, use a two-phase approach:
order_saga_steps = [
SagaStep("charge_payment", charge_payment, refund_payment),
SagaStep("reserve_inventory", reserve_inventory, release_inventory),
# Email is irreversible — use pending confirmation
SagaStep("prepare_confirmation", prepare_email, delete_draft),
SagaStep("create_shipment", create_shipment, cancel_shipment),
# Only send the email after all reversible steps complete
SagaStep("send_confirmation", send_prepared_email, None),
]
The email is drafted (reversible) early in the saga but only sent (irreversible) at the very end. If any earlier step fails, the draft is deleted.
Testing Sagas
async def test_saga_compensates_on_inventory_failure():
payment_client = MockPaymentClient(should_succeed=True)
inventory_client = MockInventoryClient(should_succeed=False)
shipping_client = MockShippingClient(should_succeed=True)
engine = SagaEngine(InMemorySagaStore(), InMemoryPublisher())
steps = build_order_saga_steps(payment_client, inventory_client, shipping_client)
result = await engine.execute("test-saga-1", steps, {
"order_id": "o-1", "customer_id": "c-1",
"items": [], "total": 50.0, "shipping_address": "123 St"
})
assert result["status"] == "failed"
assert result["failed_step"] == "reserve_inventory"
assert payment_client.refund_called # Payment was compensated
assert not shipping_client.create_called # Shipping never started
async def test_saga_handles_compensation_failure():
payment_client = MockPaymentClient(should_succeed=True, refund_fails=True)
inventory_client = MockInventoryClient(should_succeed=False)
engine = SagaEngine(InMemorySagaStore(), InMemoryPublisher())
steps = build_order_saga_steps(payment_client, inventory_client, MockShippingClient())
result = await engine.execute("test-saga-2", steps, {...})
assert result["status"] == "failed"
# Verify compensation failure was logged and alerted
saga = await engine.store.get_saga("test-saga-2")
assert any(
s["status"] == "compensation_failed" for s in saga["steps"]
)
Monitoring and Alerting
| Metric | Purpose | Alert |
|---|---|---|
| saga_completed_total | Successful sagas | Sudden drops |
| saga_failed_total | Failed sagas (compensated) | Rate > 5% |
| saga_compensation_failed_total | Manual intervention needed | Any occurrence |
| saga_duration_seconds | End-to-end saga time | p99 > 30s |
| saga_stale_count | Stuck sagas | Count > 0 |
The one thing to remember: Production sagas in Python require persistent state for crash recovery, idempotent compensations for safe retries, and a timeout watchdog for stuck executions — the saga engine is only as reliable as its weakest compensation handler.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.