Python Fan-Out Fan-In Pattern — Deep Dive
Pattern Variants
Fan-out/fan-in isn’t a single pattern — it’s a family of patterns with different collection semantics:
- Scatter-Gather — fan out to all workers, wait for all responses (classic)
- First-Response — fan out to multiple, use whichever responds first (redundancy)
- Quorum — fan out to N, proceed once M < N respond (consensus)
- Streaming — fan out and process results as they arrive (pipeline)
Each variant has different error handling, timeout, and cancellation semantics.
Advanced asyncio Patterns
TaskGroup (Python 3.11+)
asyncio.TaskGroup provides structured concurrency — if any task fails, all sibling tasks are cancelled:
import asyncio
async def fan_out_structured(items, handler):
results = {}
async with asyncio.TaskGroup() as tg:
for item in items:
async def process(i=item):
results[i] = await handler(i)
tg.create_task(process())
return results
This is safer than gather for cases where partial results are useless — if one fails, you want to abort everything rather than waste resources on the rest.
First-Response with Cancellation
async def first_response(coros):
"""Run multiple coroutines, return first result,
cancel the rest."""
tasks = [asyncio.create_task(c) for c in coros]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
# Cancel remaining tasks
for task in pending:
task.cancel()
# Suppress cancellation exceptions
await asyncio.gather(*pending, return_exceptions=True)
result_task = done.pop()
return result_task.result()
# Usage: query multiple DNS servers, use fastest response
result = await first_response([
resolve_dns('8.8.8.8', 'example.com'),
resolve_dns('1.1.1.1', 'example.com'),
resolve_dns('9.9.9.9', 'example.com'),
])
Quorum Collection
async def quorum_gather(coros, quorum_size):
"""Wait until quorum_size tasks complete successfully."""
tasks = [asyncio.create_task(c) for c in coros]
results = []
remaining = set(tasks)
while len(results) < quorum_size and remaining:
done, remaining = await asyncio.wait(
remaining, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
if task.exception() is None:
results.append(task.result())
# Cancel stragglers
for task in remaining:
task.cancel()
await asyncio.gather(*remaining, return_exceptions=True)
if len(results) < quorum_size:
raise RuntimeError(
f"Only {len(results)}/{quorum_size} succeeded"
)
return results
Backpressure-Aware Fan-Out
When fan-out produces results faster than fan-in can consume them, you need backpressure. Use a bounded queue between the fan-out and fan-in stages:
async def pipeline_with_backpressure(
items, processor, aggregator,
max_concurrent=20, buffer_size=50
):
queue = asyncio.Queue(maxsize=buffer_size)
semaphore = asyncio.Semaphore(max_concurrent)
async def worker(item):
async with semaphore:
result = await processor(item)
await queue.put(result) # blocks if buffer full
async def fan_in():
collected = []
processed = 0
while processed < len(items):
result = await queue.get()
collected.append(result)
processed += 1
return aggregator(collected)
producer = asyncio.gather(
*[worker(item) for item in items]
)
consumer = asyncio.create_task(fan_in())
await producer
return await consumer
The bounded queue (maxsize=50) means workers block when the buffer is full, naturally throttling fan-out speed to match fan-in consumption.
Distributed Fan-Out with Celery
Chord Pattern
Celery’s chord is the canonical distributed fan-out/fan-in: a group of tasks followed by a callback that receives all results.
from celery import chord, group
@app.task
def process_chunk(chunk_data):
return analyze(chunk_data)
@app.task
def aggregate_results(results):
return {
'total': sum(r['count'] for r in results),
'errors': [r for r in results if r.get('error')],
}
# Fan-out 100 chunks, fan-in with aggregate
job = chord(
[process_chunk.s(chunk) for chunk in data_chunks],
aggregate_results.s()
)
result = job.apply_async()
Chord Pitfalls
- Single point of failure — if the callback task fails, all fan-out work is wasted. Add retry logic to the callback.
- Memory pressure — the result backend must hold all intermediate results simultaneously. For large fan-outs (1000+ tasks), results can exhaust Redis memory.
- Never nest chords — Celery chords inside chords cause deadlocks with certain backends.
Alternative: Manual Result Tracking
For more control, skip chords and track results yourself:
@app.task(bind=True)
def process_and_track(self, item, job_id, total_count):
result = process(item)
# Atomic increment in Redis
completed = redis_client.incr(f"job:{job_id}:completed")
redis_client.lpush(f"job:{job_id}:results",
json.dumps(result))
if completed >= total_count:
# Last worker triggers aggregation
aggregate_job.delay(job_id)
@app.task
def aggregate_job(job_id):
results = redis_client.lrange(
f"job:{job_id}:results", 0, -1
)
parsed = [json.loads(r) for r in results]
final = combine(parsed)
# Store final result and clean up
redis_client.set(f"job:{job_id}:final",
json.dumps(final))
redis_client.delete(f"job:{job_id}:completed",
f"job:{job_id}:results")
This approach handles partial failures better — you can define completion as “N of M” rather than “all M.”
Streaming Fan-In with Async Generators
For cases where you want to process results as they arrive rather than waiting for all:
async def streaming_fan_out(items, handler, max_concurrent=20):
semaphore = asyncio.Semaphore(max_concurrent)
queue = asyncio.Queue()
async def worker(item):
async with semaphore:
result = await handler(item)
await queue.put(result)
async def producer():
await asyncio.gather(
*[worker(item) for item in items]
)
await queue.put(None) # sentinel
asyncio.create_task(producer())
while True:
result = await queue.get()
if result is None:
break
yield result
# Usage: process results as they stream in
async for result in streaming_fan_out(urls, fetch_url):
update_progress(result)
save_to_database(result)
Performance Characteristics
| Pattern | Latency | Throughput | Complexity |
|---|---|---|---|
asyncio.gather (unbounded) | Min of all tasks | Limited by event loop | Low |
asyncio.gather + semaphore | Varies with limit | Controlled | Low |
TaskGroup (structured) | Min of all tasks | Limited by event loop | Low |
ProcessPoolExecutor | Overhead per process | N × single-core | Medium |
| Celery chord | Broker + worker overhead | Cluster-wide | High |
| Manual Redis tracking | Broker overhead | Cluster-wide | High |
Error Recovery Strategies
Partial Retry
Don’t retry the entire fan-out if a few tasks fail:
async def fan_out_with_retry(items, handler,
max_retries=3):
results = {}
pending = list(items)
for attempt in range(max_retries + 1):
tasks = {
asyncio.create_task(handler(item)): item
for item in pending
}
done, _ = await asyncio.wait(tasks.keys())
failed = []
for task in done:
item = tasks[task]
if task.exception():
failed.append(item)
else:
results[item] = task.result()
if not failed:
break
pending = failed
return results, pending # results + still-failed items
Timeout per Task
Individual task timeouts prevent one slow task from blocking the entire fan-in:
async def timed_handler(item, handler, timeout=30):
try:
return await asyncio.wait_for(
handler(item), timeout=timeout
)
except asyncio.TimeoutError:
return {'item': item, 'error': 'timeout'}
One thing to remember: The key design decision in fan-out/fan-in is your collection semantics: do you need all results (scatter-gather), the first result (redundancy), or enough results (quorum)? This choice determines your error handling, timeout strategy, and cancellation behavior.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.
- Python Priority Queue Patterns Why some tasks cut the line in Python — and how priority queues decide who goes first.