Python Task Chaining Workflows — Deep Dive
Celery Chain Internals
When you construct chain(a.s(), b.s(), c.s()), Celery doesn’t create three independent tasks. It builds a linked list of signatures. When a completes, its on_success callback invokes b with a’s result prepended to b’s arguments. The chain propagates through callbacks rather than a central coordinator.
# What chain(a.s(1), b.s(2), c.s(3)) actually does:
# 1. a is called with args=(1,)
# 2. a's result (say, 10) triggers b with args=(10, 2)
# 3. b's result (say, 20) triggers c with args=(20, 3)
This means:
- The chain has no single “owner” process — each step triggers the next
- If a worker dies between steps, the next step is never triggered (unless using acks_late)
- The chain’s AsyncResult only tracks the last task’s result; intermediate results need the result backend
Immutable Signatures
Sometimes you don’t want the previous result injected:
from celery import chain
# `.si()` creates an immutable signature
workflow = chain(
prepare_data.s(raw_input),
process.si(config), # ignores prepare_data result
cleanup.si() # ignores process result
)
Use immutable signatures when steps are sequential but don’t share data — they just need ordering guarantees.
Building a Custom Workflow Engine
For complex workflows with conditional branching, parallel steps, and checkpoint-resume, Celery primitives become limiting. Here’s a lightweight DAG-based engine:
import asyncio
import uuid
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
class StepStatus(Enum):
PENDING = 'pending'
RUNNING = 'running'
COMPLETED = 'completed'
FAILED = 'failed'
SKIPPED = 'skipped'
@dataclass
class Step:
name: str
handler: Callable
depends_on: list[str] = field(default_factory=list)
condition: Optional[Callable] = None
max_retries: int = 0
retry_delay: float = 5.0
status: StepStatus = StepStatus.PENDING
result: Any = None
error: Optional[str] = None
attempts: int = 0
class WorkflowEngine:
def __init__(self, name):
self.id = str(uuid.uuid4())
self.name = name
self.steps: dict[str, Step] = {}
self.context: dict[str, Any] = {}
def add_step(self, name, handler, depends_on=None,
condition=None, max_retries=0):
self.steps[name] = Step(
name=name,
handler=handler,
depends_on=depends_on or [],
condition=condition,
max_retries=max_retries,
)
def _ready_steps(self):
ready = []
for step in self.steps.values():
if step.status != StepStatus.PENDING:
continue
deps_met = all(
self.steps[d].status == StepStatus.COMPLETED
for d in step.depends_on
)
if deps_met:
if step.condition and not step.condition(
self.context
):
step.status = StepStatus.SKIPPED
continue
ready.append(step)
return ready
async def _run_step(self, step):
step.status = StepStatus.RUNNING
step.attempts += 1
try:
result = await step.handler(self.context)
step.result = result
step.status = StepStatus.COMPLETED
self.context[step.name] = result
except Exception as e:
if step.attempts <= step.max_retries:
step.status = StepStatus.PENDING
await asyncio.sleep(step.retry_delay)
else:
step.error = str(e)
step.status = StepStatus.FAILED
raise
async def run(self, initial_context=None):
if initial_context:
self.context.update(initial_context)
while True:
ready = self._ready_steps()
if not ready:
# Check if we're done or stuck
statuses = {s.status for s in
self.steps.values()}
if StepStatus.RUNNING in statuses:
await asyncio.sleep(0.1)
continue
if StepStatus.PENDING in statuses:
# Deadlock or failed dependency
failed = [s.name for s in
self.steps.values()
if s.status == StepStatus.FAILED]
raise RuntimeError(
f"Workflow stuck. Failed: {failed}"
)
break # All done
# Run ready steps in parallel
await asyncio.gather(
*[self._run_step(s) for s in ready],
return_exceptions=True
)
return self.context
# Usage
wf = WorkflowEngine('order-processing')
async def validate(ctx):
return validate_order(ctx['order'])
async def charge(ctx):
return charge_card(ctx['validate']['card_token'],
ctx['order']['total'])
async def reserve(ctx):
return reserve_inventory(ctx['order']['items'])
async def notify(ctx):
return send_confirmation(ctx['order']['email'])
wf.add_step('validate', validate)
wf.add_step('charge', charge, depends_on=['validate'],
max_retries=2)
wf.add_step('reserve', reserve, depends_on=['validate'])
wf.add_step('notify', notify,
depends_on=['charge', 'reserve'])
result = await wf.run({'order': order_data})
Saga Pattern for Compensation
When a chain involves multiple services and a later step fails, you need to undo earlier steps. The saga pattern defines compensating actions for each step.
@dataclass
class SagaStep:
name: str
action: Callable
compensate: Callable
result: Any = None
class Saga:
def __init__(self):
self.steps: list[SagaStep] = []
self.completed: list[SagaStep] = []
def add_step(self, name, action, compensate):
self.steps.append(SagaStep(name, action, compensate))
async def execute(self, context):
for step in self.steps:
try:
step.result = await step.action(context)
context[step.name] = step.result
self.completed.append(step)
except Exception as e:
# Compensate in reverse order
await self._compensate(context)
raise RuntimeError(
f"Saga failed at {step.name}: {e}. "
f"Compensated {len(self.completed)} steps."
)
return context
async def _compensate(self, context):
for step in reversed(self.completed):
try:
await step.compensate(context)
except Exception as comp_error:
# Log but continue compensating
print(f"Compensation failed for "
f"{step.name}: {comp_error}")
# Usage
saga = Saga()
saga.add_step(
'payment',
action=lambda ctx: charge_card(ctx['card'], ctx['amount']),
compensate=lambda ctx: refund_card(ctx['payment']),
)
saga.add_step(
'inventory',
action=lambda ctx: reserve_items(ctx['items']),
compensate=lambda ctx: release_items(ctx['inventory']),
)
saga.add_step(
'shipping',
action=lambda ctx: create_shipment(ctx['address']),
compensate=lambda ctx: cancel_shipment(ctx['shipping']),
)
Checkpoint and Resume
For long-running workflows, persist state so they can resume after crashes:
import json
class CheckpointedWorkflow:
def __init__(self, workflow_id, storage):
self.id = workflow_id
self.storage = storage # Redis, DB, etc.
def save_checkpoint(self, step_name, context):
checkpoint = {
'workflow_id': self.id,
'completed_step': step_name,
'context': context,
'timestamp': time.time(),
}
self.storage.set(
f"workflow:{self.id}:checkpoint",
json.dumps(checkpoint)
)
def load_checkpoint(self):
raw = self.storage.get(
f"workflow:{self.id}:checkpoint"
)
if raw:
return json.loads(raw)
return None
async def run(self, steps, initial_context):
checkpoint = self.load_checkpoint()
context = initial_context
# Fast-forward past completed steps
start_index = 0
if checkpoint:
context = checkpoint['context']
completed = checkpoint['completed_step']
for i, (name, _) in enumerate(steps):
if name == completed:
start_index = i + 1
break
# Execute remaining steps
for name, handler in steps[start_index:]:
result = await handler(context)
context[name] = result
self.save_checkpoint(name, context)
return context
Distributed Chain Patterns
Event-Driven Chaining
Instead of direct task-to-task linking, emit events. Each service subscribes to the events it cares about:
OrderCreated → PaymentService → PaymentCompleted
PaymentCompleted → InventoryService → InventoryReserved
InventoryReserved → ShippingService → ShipmentCreated
Advantages: loose coupling, each service owns its own logic. Disadvantages: harder to trace the full chain, eventual consistency.
Orchestrator vs Choreography
Orchestrator (centralized): One service coordinates the chain, telling each step when to run. Easier to understand and debug. Celery chains use this model.
Choreography (decentralized): Each service knows what to do when it receives an event. No central coordinator. More resilient but harder to track.
For most Python applications, start with orchestration (Celery chains, custom workflow engine). Move to choreography when you need independent service deployment and the team is comfortable with event-driven debugging.
Performance Considerations
| Pattern | Overhead per Step | Resumable | Distributed |
|---|---|---|---|
| Function calls | ~microseconds | No | No |
| asyncio chain | ~microseconds | No | No |
| Celery chain | ~10-100ms | Partially | Yes |
| Custom DAG engine | ~1-10ms | With checkpoints | With broker |
| Event-driven | ~10-100ms | With event store | Yes |
The overhead matters when chains have many small steps. If each step takes milliseconds, Celery’s per-step overhead (broker round-trip) can dominate. For sub-millisecond steps, use in-process chaining and distribute at a coarser granularity.
One thing to remember: The saga pattern with compensating transactions is the production answer for multi-service chains. When step 5 of 7 fails, you don’t want to figure out manually what to undo — define compensations upfront, and the saga handles rollback automatically.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.