Python Task Chaining Workflows — Core Concepts
What Task Chaining Solves
Many workflows are sequential: step B needs the result of step A, step C needs B’s result. Running these steps as independent tasks with manual coordination is error-prone. Task chaining formalizes the dependency: you declare the sequence once, and the system ensures each step runs in order with the right inputs.
Common examples:
- ETL pipelines: extract → transform → load
- Media processing: download → transcode → thumbnail → upload
- Order fulfillment: validate → charge → reserve inventory → notify
Celery Chains
Celery’s chain primitive links tasks so each one passes its result to the next:
from celery import chain
@app.task
def download_image(url):
return fetch(url) # returns file path
@app.task
def resize_image(path, size=(800, 600)):
return resize(path, size) # returns new path
@app.task
def upload_image(path, bucket='images'):
return upload_to_s3(path, bucket) # returns URL
# Build the chain
workflow = chain(
download_image.s('https://example.com/photo.jpg'),
resize_image.s(),
upload_image.s()
)
# Execute
result = workflow.apply_async()
The .s() method creates a “signature” — a serializable description of the task call. In a chain, each task’s return value is prepended to the next task’s arguments.
Asyncio Pipelines
For in-process chaining with async code:
async def run_chain(initial_input, *steps):
result = initial_input
for step in steps:
result = await step(result)
return result
# Usage
final = await run_chain(
'https://example.com/photo.jpg',
download_image,
resize_image,
upload_image,
)
This is simpler but doesn’t distribute across machines or survive process restarts.
Error Handling in Chains
When a step in the chain fails, what happens?
Celery Behavior
By default, a chain stops at the first failure. The remaining tasks never execute. The chain’s AsyncResult shows the error from the failed step.
You can attach error callbacks:
workflow = chain(
step_one.s(),
step_two.s(),
step_three.s()
).on_error(handle_chain_failure.s())
Retry Within Chains
Individual tasks can retry without breaking the chain:
@app.task(bind=True, max_retries=3)
def step_two(self, data):
try:
return process(data)
except TemporaryError as exc:
raise self.retry(exc=exc, countdown=30)
The chain waits for the retry to succeed (or exhaust retries) before continuing.
Beyond Linear Chains
Celery Groups + Chains (Chords)
Combine parallel and sequential execution:
from celery import chain, group, chord
# Process multiple images in parallel, then notify
workflow = chord(
[process_image.s(url) for url in image_urls],
notify_completion.s()
)
Conditional Branching
Choose the next step based on the previous result:
@app.task
def route_order(order_data):
if order_data['total'] > 1000:
return high_value_processing.delay(order_data)
else:
return standard_processing.delay(order_data)
This breaks the chain abstraction — you’re manually dispatching instead of declaring. For complex conditional workflows, consider dedicated workflow engines.
Workflow State
Where does the chain’s state live?
- Celery chains — state is in the result backend (Redis, database). Each task stores its result, and the chain tracks progress via task IDs.
- Asyncio — state is in memory. Lost on crash.
- Database-backed workflows — each step records its status in a workflow table. Resumable after crashes.
For business-critical workflows, prefer database-backed state so you can inspect, audit, and resume workflows.
Common Misconception
“Task chaining is just calling functions in sequence.” It’s not — chaining adds distribution (steps run on different workers/machines), persistence (the chain survives process restarts), retry semantics (individual steps can retry independently), and observability (you can inspect chain progress). Regular function calls give you none of this.
One thing to remember: Celery’s chain(a.s(), b.s(), c.s()) is the Python standard for distributed task pipelines — it handles argument passing, error propagation, and retry logic out of the box.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Fan Out Fan In Pattern How Python splits big jobs into small pieces, runs them all at once, then puts the results back together.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.