Python Task Chaining Workflows — Deep Dive

Celery Chain Internals

When you construct chain(a.s(), b.s(), c.s()), Celery doesn’t create three independent tasks. It builds a linked list of signatures. When a completes, its on_success callback invokes b with a’s result prepended to b’s arguments. The chain propagates through callbacks rather than a central coordinator.

# What chain(a.s(1), b.s(2), c.s(3)) actually does:
# 1. a is called with args=(1,)
# 2. a's result (say, 10) triggers b with args=(10, 2)
# 3. b's result (say, 20) triggers c with args=(20, 3)

This means:

  • The chain has no single “owner” process — each step triggers the next
  • If a worker dies between steps, the next step is never triggered (unless using acks_late)
  • The chain’s AsyncResult only tracks the last task’s result; intermediate results need the result backend

Immutable Signatures

Sometimes you don’t want the previous result injected:

from celery import chain

# `.si()` creates an immutable signature
workflow = chain(
    prepare_data.s(raw_input),
    process.si(config),    # ignores prepare_data result
    cleanup.si()           # ignores process result
)

Use immutable signatures when steps are sequential but don’t share data — they just need ordering guarantees.

Building a Custom Workflow Engine

For complex workflows with conditional branching, parallel steps, and checkpoint-resume, Celery primitives become limiting. Here’s a lightweight DAG-based engine:

import asyncio
import uuid
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, Optional

class StepStatus(Enum):
    PENDING = 'pending'
    RUNNING = 'running'
    COMPLETED = 'completed'
    FAILED = 'failed'
    SKIPPED = 'skipped'

@dataclass
class Step:
    name: str
    handler: Callable
    depends_on: list[str] = field(default_factory=list)
    condition: Optional[Callable] = None
    max_retries: int = 0
    retry_delay: float = 5.0
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: Optional[str] = None
    attempts: int = 0

class WorkflowEngine:
    def __init__(self, name):
        self.id = str(uuid.uuid4())
        self.name = name
        self.steps: dict[str, Step] = {}
        self.context: dict[str, Any] = {}

    def add_step(self, name, handler, depends_on=None,
                 condition=None, max_retries=0):
        self.steps[name] = Step(
            name=name,
            handler=handler,
            depends_on=depends_on or [],
            condition=condition,
            max_retries=max_retries,
        )

    def _ready_steps(self):
        ready = []
        for step in self.steps.values():
            if step.status != StepStatus.PENDING:
                continue
            deps_met = all(
                self.steps[d].status == StepStatus.COMPLETED
                for d in step.depends_on
            )
            if deps_met:
                if step.condition and not step.condition(
                    self.context
                ):
                    step.status = StepStatus.SKIPPED
                    continue
                ready.append(step)
        return ready

    async def _run_step(self, step):
        step.status = StepStatus.RUNNING
        step.attempts += 1
        try:
            result = await step.handler(self.context)
            step.result = result
            step.status = StepStatus.COMPLETED
            self.context[step.name] = result
        except Exception as e:
            if step.attempts <= step.max_retries:
                step.status = StepStatus.PENDING
                await asyncio.sleep(step.retry_delay)
            else:
                step.error = str(e)
                step.status = StepStatus.FAILED
                raise

    async def run(self, initial_context=None):
        if initial_context:
            self.context.update(initial_context)

        while True:
            ready = self._ready_steps()
            if not ready:
                # Check if we're done or stuck
                statuses = {s.status for s in
                           self.steps.values()}
                if StepStatus.RUNNING in statuses:
                    await asyncio.sleep(0.1)
                    continue
                if StepStatus.PENDING in statuses:
                    # Deadlock or failed dependency
                    failed = [s.name for s in
                             self.steps.values()
                             if s.status == StepStatus.FAILED]
                    raise RuntimeError(
                        f"Workflow stuck. Failed: {failed}"
                    )
                break  # All done

            # Run ready steps in parallel
            await asyncio.gather(
                *[self._run_step(s) for s in ready],
                return_exceptions=True
            )

        return self.context

# Usage
wf = WorkflowEngine('order-processing')

async def validate(ctx):
    return validate_order(ctx['order'])

async def charge(ctx):
    return charge_card(ctx['validate']['card_token'],
                       ctx['order']['total'])

async def reserve(ctx):
    return reserve_inventory(ctx['order']['items'])

async def notify(ctx):
    return send_confirmation(ctx['order']['email'])

wf.add_step('validate', validate)
wf.add_step('charge', charge, depends_on=['validate'],
            max_retries=2)
wf.add_step('reserve', reserve, depends_on=['validate'])
wf.add_step('notify', notify,
            depends_on=['charge', 'reserve'])

result = await wf.run({'order': order_data})

Saga Pattern for Compensation

When a chain involves multiple services and a later step fails, you need to undo earlier steps. The saga pattern defines compensating actions for each step.

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensate: Callable
    result: Any = None

class Saga:
    def __init__(self):
        self.steps: list[SagaStep] = []
        self.completed: list[SagaStep] = []

    def add_step(self, name, action, compensate):
        self.steps.append(SagaStep(name, action, compensate))

    async def execute(self, context):
        for step in self.steps:
            try:
                step.result = await step.action(context)
                context[step.name] = step.result
                self.completed.append(step)
            except Exception as e:
                # Compensate in reverse order
                await self._compensate(context)
                raise RuntimeError(
                    f"Saga failed at {step.name}: {e}. "
                    f"Compensated {len(self.completed)} steps."
                )
        return context

    async def _compensate(self, context):
        for step in reversed(self.completed):
            try:
                await step.compensate(context)
            except Exception as comp_error:
                # Log but continue compensating
                print(f"Compensation failed for "
                      f"{step.name}: {comp_error}")

# Usage
saga = Saga()
saga.add_step(
    'payment',
    action=lambda ctx: charge_card(ctx['card'], ctx['amount']),
    compensate=lambda ctx: refund_card(ctx['payment']),
)
saga.add_step(
    'inventory',
    action=lambda ctx: reserve_items(ctx['items']),
    compensate=lambda ctx: release_items(ctx['inventory']),
)
saga.add_step(
    'shipping',
    action=lambda ctx: create_shipment(ctx['address']),
    compensate=lambda ctx: cancel_shipment(ctx['shipping']),
)

Checkpoint and Resume

For long-running workflows, persist state so they can resume after crashes:

import json

class CheckpointedWorkflow:
    def __init__(self, workflow_id, storage):
        self.id = workflow_id
        self.storage = storage  # Redis, DB, etc.

    def save_checkpoint(self, step_name, context):
        checkpoint = {
            'workflow_id': self.id,
            'completed_step': step_name,
            'context': context,
            'timestamp': time.time(),
        }
        self.storage.set(
            f"workflow:{self.id}:checkpoint",
            json.dumps(checkpoint)
        )

    def load_checkpoint(self):
        raw = self.storage.get(
            f"workflow:{self.id}:checkpoint"
        )
        if raw:
            return json.loads(raw)
        return None

    async def run(self, steps, initial_context):
        checkpoint = self.load_checkpoint()
        context = initial_context

        # Fast-forward past completed steps
        start_index = 0
        if checkpoint:
            context = checkpoint['context']
            completed = checkpoint['completed_step']
            for i, (name, _) in enumerate(steps):
                if name == completed:
                    start_index = i + 1
                    break

        # Execute remaining steps
        for name, handler in steps[start_index:]:
            result = await handler(context)
            context[name] = result
            self.save_checkpoint(name, context)

        return context

Distributed Chain Patterns

Event-Driven Chaining

Instead of direct task-to-task linking, emit events. Each service subscribes to the events it cares about:

OrderCreated → PaymentService → PaymentCompleted
PaymentCompleted → InventoryService → InventoryReserved
InventoryReserved → ShippingService → ShipmentCreated

Advantages: loose coupling, each service owns its own logic. Disadvantages: harder to trace the full chain, eventual consistency.

Orchestrator vs Choreography

Orchestrator (centralized): One service coordinates the chain, telling each step when to run. Easier to understand and debug. Celery chains use this model.

Choreography (decentralized): Each service knows what to do when it receives an event. No central coordinator. More resilient but harder to track.

For most Python applications, start with orchestration (Celery chains, custom workflow engine). Move to choreography when you need independent service deployment and the team is comfortable with event-driven debugging.

Performance Considerations

PatternOverhead per StepResumableDistributed
Function calls~microsecondsNoNo
asyncio chain~microsecondsNoNo
Celery chain~10-100msPartiallyYes
Custom DAG engine~1-10msWith checkpointsWith broker
Event-driven~10-100msWith event storeYes

The overhead matters when chains have many small steps. If each step takes milliseconds, Celery’s per-step overhead (broker round-trip) can dominate. For sub-millisecond steps, use in-process chaining and distribute at a coarser granularity.

One thing to remember: The saga pattern with compensating transactions is the production answer for multi-service chains. When step 5 of 7 fails, you don’t want to figure out manually what to undo — define compensations upfront, and the saga handles rollback automatically.

pythontask-processingpatterns

See Also