Python Saga Pattern — Deep Dive

Building a Persistent Saga Orchestrator

A production saga orchestrator must survive crashes. The key is persisting state before executing each step so recovery is always possible.

Step Definition DSL

from dataclasses import dataclass, field
from typing import Callable, Awaitable, Optional
from enum import Enum

@dataclass
class SagaStep:
    name: str
    action: Callable[..., Awaitable]
    compensation: Optional[Callable[..., Awaitable]] = None
    timeout_seconds: float = 30.0
    max_retries: int = 3

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"
    COMPENSATED = "compensated"
    COMPENSATION_FAILED = "compensation_failed"

@dataclass
class StepResult:
    step_name: str
    status: StepStatus
    result: Optional[dict] = None
    error: Optional[str] = None
    attempts: int = 0

The Saga Engine

import asyncio
import json
import logging
from datetime import datetime
from uuid import uuid4

class SagaEngine:
    def __init__(self, saga_store, event_publisher):
        self.store = saga_store
        self.publisher = event_publisher

    async def execute(self, saga_id: str, steps: list[SagaStep], context: dict) -> dict:
        # Initialize saga record
        saga = await self.store.create_saga(saga_id, context, steps)

        for i, step in enumerate(steps):
            # Persist intent before executing
            await self.store.update_step(saga_id, step.name, StepStatus.RUNNING)

            try:
                result = await self._execute_with_retry(step, context)
                context[f"{step.name}_result"] = result

                await self.store.update_step(
                    saga_id, step.name, StepStatus.COMPLETED, result=result
                )

            except Exception as e:
                logging.error(f"Saga {saga_id} failed at step {step.name}: {e}")
                await self.store.update_step(
                    saga_id, step.name, StepStatus.FAILED, error=str(e)
                )

                # Compensate completed steps in reverse
                await self._compensate(saga_id, steps[:i], context)

                await self.publisher.publish("saga.failed", {
                    "saga_id": saga_id,
                    "failed_step": step.name,
                    "error": str(e),
                })
                return {"status": "failed", "failed_step": step.name}

        await self.publisher.publish("saga.completed", {"saga_id": saga_id})
        return {"status": "completed", "saga_id": saga_id}

    async def _execute_with_retry(self, step: SagaStep, context: dict) -> dict:
        last_error = None
        for attempt in range(1, step.max_retries + 1):
            try:
                return await asyncio.wait_for(
                    step.action(context),
                    timeout=step.timeout_seconds,
                )
            except asyncio.TimeoutError:
                last_error = f"Timeout after {step.timeout_seconds}s"
                logging.warning(
                    f"Step {step.name} attempt {attempt} timed out"
                )
            except Exception as e:
                last_error = str(e)
                logging.warning(
                    f"Step {step.name} attempt {attempt} failed: {e}"
                )
            if attempt < step.max_retries:
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
        raise RuntimeError(f"Step {step.name} failed after {step.max_retries} attempts: {last_error}")

    async def _compensate(self, saga_id: str, completed_steps: list[SagaStep], context: dict):
        await self.store.mark_compensating(saga_id)

        for step in reversed(completed_steps):
            if not step.compensation:
                continue

            await self.store.update_step(saga_id, step.name, StepStatus.COMPENSATING)

            try:
                await asyncio.wait_for(
                    step.compensation(context),
                    timeout=step.timeout_seconds * 2,  # More time for compensation
                )
                await self.store.update_step(saga_id, step.name, StepStatus.COMPENSATED)
            except Exception as e:
                logging.critical(
                    f"COMPENSATION FAILED for saga {saga_id}, step {step.name}: {e}"
                )
                await self.store.update_step(
                    saga_id, step.name, StepStatus.COMPENSATION_FAILED, error=str(e)
                )
                # Alert ops team — manual intervention needed
                await self.publisher.publish("saga.compensation_failed", {
                    "saga_id": saga_id,
                    "step": step.name,
                    "error": str(e),
                })

Using the Engine

# Define saga steps
async def charge_payment(context):
    result = await payment_client.charge(
        customer_id=context["customer_id"],
        amount=context["total"],
        idempotency_key=f"{context['saga_id']}-payment",
    )
    return {"payment_id": result.id, "amount": result.amount}

async def refund_payment(context):
    payment_id = context["charge_payment_result"]["payment_id"]
    await payment_client.refund(
        payment_id=payment_id,
        idempotency_key=f"{context['saga_id']}-refund",
    )

async def reserve_inventory(context):
    result = await inventory_client.reserve(
        items=context["items"],
        idempotency_key=f"{context['saga_id']}-inventory",
    )
    return {"reservation_id": result.id}

async def release_inventory(context):
    reservation_id = context["reserve_inventory_result"]["reservation_id"]
    await inventory_client.release(
        reservation_id=reservation_id,
        idempotency_key=f"{context['saga_id']}-release",
    )

async def create_shipment(context):
    result = await shipping_client.create_shipment(
        order_id=context["order_id"],
        address=context["shipping_address"],
        idempotency_key=f"{context['saga_id']}-shipment",
    )
    return {"shipment_id": result.id, "tracking": result.tracking_number}

async def cancel_shipment(context):
    shipment_id = context["create_shipment_result"]["shipment_id"]
    await shipping_client.cancel(
        shipment_id=shipment_id,
        idempotency_key=f"{context['saga_id']}-cancel-shipment",
    )

# Compose and execute
order_saga_steps = [
    SagaStep("charge_payment", charge_payment, refund_payment, timeout_seconds=15),
    SagaStep("reserve_inventory", reserve_inventory, release_inventory, timeout_seconds=10),
    SagaStep("create_shipment", create_shipment, cancel_shipment, timeout_seconds=20),
]

result = await saga_engine.execute(
    saga_id=str(uuid4()),
    steps=order_saga_steps,
    context={
        "order_id": "order-123",
        "customer_id": "cust-456",
        "items": [{"sku": "W-001", "qty": 2}],
        "total": 49.99,
        "shipping_address": "123 Main St",
    }
)

Idempotent Compensations

Compensations may be called multiple times (due to retries or recovery). Every compensation must be idempotent.

Idempotency Key Pattern

class PaymentService:
    async def refund(self, payment_id: str, idempotency_key: str):
        # Check if this refund was already processed
        existing = await self.db.find_one(
            "refunds", {"idempotency_key": idempotency_key}
        )
        if existing:
            return existing  # Already processed

        # Process refund
        refund = await self.gateway.refund(payment_id)

        # Record with idempotency key
        await self.db.insert("refunds", {
            "idempotency_key": idempotency_key,
            "payment_id": payment_id,
            "refund_id": refund.id,
            "status": "completed",
        })
        return refund

State-Based Idempotency

class InventoryService:
    async def release(self, reservation_id: str, idempotency_key: str):
        reservation = await self.db.find_one(
            "reservations", {"id": reservation_id}
        )

        if not reservation:
            return  # Already released or never existed — idempotent success

        if reservation["status"] == "released":
            return  # Already compensated — idempotent success

        # Release the stock
        await self.db.update(
            "inventory",
            {"sku": reservation["sku"]},
            {"$inc": {"available": reservation["quantity"]}}
        )
        await self.db.update(
            "reservations",
            {"id": reservation_id},
            {"status": "released", "released_at": datetime.utcnow()}
        )

Choreography-Based Saga Implementation

For teams that prefer event-driven choreography:

# Each service manages its own saga participation

class PaymentSagaParticipant:
    def __init__(self, broker, payment_gateway, db):
        self.broker = broker
        self.gateway = payment_gateway
        self.db = db

    async def on_order_created(self, event: dict):
        order_id = event["data"]["order_id"]

        try:
            payment = await self.gateway.charge(
                amount=event["data"]["total"],
                customer_id=event["data"]["customer_id"],
            )
            await self.db.insert("payments", {
                "order_id": order_id,
                "payment_id": payment.id,
                "status": "charged",
            })
            await self.broker.publish("payment.completed", {
                "order_id": order_id,
                "payment_id": payment.id,
            })
        except Exception as e:
            await self.broker.publish("payment.failed", {
                "order_id": order_id,
                "error": str(e),
            })

    async def on_inventory_reservation_failed(self, event: dict):
        # Compensate: refund the payment
        order_id = event["data"]["order_id"]
        payment = await self.db.find_one("payments", {"order_id": order_id})

        if payment and payment["status"] == "charged":
            await self.gateway.refund(payment["payment_id"])
            await self.db.update(
                "payments",
                {"order_id": order_id},
                {"status": "refunded"},
            )
            await self.broker.publish("payment.refunded", {
                "order_id": order_id,
            })

Saga Routing Table

To manage choreography complexity, maintain an explicit routing table:

SAGA_ROUTING = {
    "order.created": [
        ("payment-service", "on_order_created"),
    ],
    "payment.completed": [
        ("inventory-service", "on_payment_completed"),
    ],
    "payment.failed": [
        ("order-service", "on_payment_failed"),
    ],
    "inventory.reserved": [
        ("shipping-service", "on_inventory_reserved"),
    ],
    "inventory.reservation_failed": [
        ("payment-service", "on_inventory_reservation_failed"),
        ("order-service", "on_inventory_reservation_failed"),
    ],
}

Document this table — it’s the only place the full workflow is visible in a choreography saga.

Handling Timeout and Partial Failures

Saga Timeout Watchdog

class SagaTimeoutWatchdog:
    def __init__(self, saga_store, saga_engine, max_saga_duration=300):
        self.store = saga_store
        self.engine = saga_engine
        self.max_duration = max_saga_duration  # seconds

    async def check_stale_sagas(self):
        """Run periodically (e.g., every 60 seconds)"""
        cutoff = datetime.utcnow() - timedelta(seconds=self.max_duration)
        stale_sagas = await self.store.find_sagas(
            status__in=["running", "compensating"],
            updated_at__lt=cutoff,
        )

        for saga in stale_sagas:
            logging.warning(f"Saga {saga['saga_id']} appears stuck, triggering recovery")

            if saga["status"] == "running":
                # Trigger compensation from the last completed step
                await self.engine.recover_and_compensate(saga["saga_id"])
            elif saga["status"] == "compensating":
                # Retry failed compensations
                await self.engine.retry_compensations(saga["saga_id"])

            await self.publisher.publish("saga.timeout", {
                "saga_id": saga["saga_id"],
                "stuck_at": saga["current_step"],
            })

The “Pending” State Pattern

For irreversible steps, use a two-phase approach:

order_saga_steps = [
    SagaStep("charge_payment", charge_payment, refund_payment),
    SagaStep("reserve_inventory", reserve_inventory, release_inventory),
    # Email is irreversible — use pending confirmation
    SagaStep("prepare_confirmation", prepare_email, delete_draft),
    SagaStep("create_shipment", create_shipment, cancel_shipment),
    # Only send the email after all reversible steps complete
    SagaStep("send_confirmation", send_prepared_email, None),
]

The email is drafted (reversible) early in the saga but only sent (irreversible) at the very end. If any earlier step fails, the draft is deleted.

Testing Sagas

async def test_saga_compensates_on_inventory_failure():
    payment_client = MockPaymentClient(should_succeed=True)
    inventory_client = MockInventoryClient(should_succeed=False)
    shipping_client = MockShippingClient(should_succeed=True)

    engine = SagaEngine(InMemorySagaStore(), InMemoryPublisher())
    steps = build_order_saga_steps(payment_client, inventory_client, shipping_client)

    result = await engine.execute("test-saga-1", steps, {
        "order_id": "o-1", "customer_id": "c-1",
        "items": [], "total": 50.0, "shipping_address": "123 St"
    })

    assert result["status"] == "failed"
    assert result["failed_step"] == "reserve_inventory"
    assert payment_client.refund_called  # Payment was compensated
    assert not shipping_client.create_called  # Shipping never started

async def test_saga_handles_compensation_failure():
    payment_client = MockPaymentClient(should_succeed=True, refund_fails=True)
    inventory_client = MockInventoryClient(should_succeed=False)

    engine = SagaEngine(InMemorySagaStore(), InMemoryPublisher())
    steps = build_order_saga_steps(payment_client, inventory_client, MockShippingClient())

    result = await engine.execute("test-saga-2", steps, {...})

    assert result["status"] == "failed"
    # Verify compensation failure was logged and alerted
    saga = await engine.store.get_saga("test-saga-2")
    assert any(
        s["status"] == "compensation_failed" for s in saga["steps"]
    )

Monitoring and Alerting

MetricPurposeAlert
saga_completed_totalSuccessful sagasSudden drops
saga_failed_totalFailed sagas (compensated)Rate > 5%
saga_compensation_failed_totalManual intervention neededAny occurrence
saga_duration_secondsEnd-to-end saga timep99 > 30s
saga_stale_countStuck sagasCount > 0

The one thing to remember: Production sagas in Python require persistent state for crash recovery, idempotent compensations for safe retries, and a timeout watchdog for stuck executions — the saga engine is only as reliable as its weakest compensation handler.

pythonarchitecturepatterns

See Also