Python Fan-Out Fan-In Pattern — Core Concepts
What Fan-Out/Fan-In Solves
Many problems follow the same shape: you have a collection of independent work items, each can be processed separately, and you need all results before proceeding. Sequential processing wastes time because items don’t depend on each other. Fan-out/fan-in exploits this independence.
The pattern has two phases:
- Fan-out: Distribute N tasks to workers running in parallel
- Fan-in: Collect all N results and combine them
Fan-Out/Fan-In in Python
With asyncio.gather
The simplest form for I/O-bound work:
import asyncio
async def fetch_price(product_id):
# Simulate API call
await asyncio.sleep(0.1)
return {'id': product_id, 'price': product_id * 9.99}
async def get_all_prices(product_ids):
# Fan-out: launch all fetches concurrently
tasks = [fetch_price(pid) for pid in product_ids]
# Fan-in: wait for all results
results = await asyncio.gather(*tasks)
return results
All fetches run concurrently on the event loop. gather returns results in the same order as the input tasks.
With concurrent.futures
For CPU-bound work or when you need process-level parallelism:
from concurrent.futures import ProcessPoolExecutor
def analyze_document(doc_path):
# CPU-intensive text analysis
return {'path': doc_path, 'word_count': count_words(doc_path)}
def analyze_all(doc_paths):
with ProcessPoolExecutor(max_workers=4) as executor:
# Fan-out
futures = {executor.submit(analyze_document, p): p
for p in doc_paths}
# Fan-in
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
as_completed yields results as workers finish — useful when you want to process results incrementally rather than waiting for all of them.
With Message Queues
For distributed systems where workers run on different machines:
- Fan-out: Producer publishes N messages to a queue (or topic)
- Workers: Multiple consumers pull messages and process independently
- Fan-in: Workers publish results to a result queue; an aggregator collects them
This scales beyond a single machine but adds complexity: you need to track how many results to expect, handle worker failures, and deal with timeouts for slow workers.
Controlling Concurrency
Unbounded fan-out is dangerous. Launching 10,000 tasks simultaneously can exhaust memory, overload APIs, or trigger rate limits.
Use a semaphore to limit concurrent work:
async def bounded_fan_out(items, handler, max_concurrent=50):
semaphore = asyncio.Semaphore(max_concurrent)
async def limited(item):
async with semaphore:
return await handler(item)
return await asyncio.gather(*[limited(i) for i in items])
Error Handling
asyncio.gather has a subtle default: if one task raises, it cancels nothing. Other tasks keep running, and the exception is raised when you access the result. Pass return_exceptions=True to get exceptions as return values instead of having them raised:
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
For concurrent.futures, wrap future.result() in a try/except per future.
Common Misconception
“Fan-out always makes things faster.” It doesn’t if the bottleneck is shared. If all tasks hit the same database, API, or file system, fan-out just moves the queue from your code to the bottleneck. The speedup comes from parallelizing genuinely independent I/O paths or utilizing multiple CPU cores.
When to Use Each Approach
| Scenario | Best Tool |
|---|---|
| Async I/O (HTTP, DB queries) | asyncio.gather with semaphore |
| CPU-bound work | ProcessPoolExecutor |
| Cross-machine distribution | Celery, RQ, or message queues |
| Streaming results | as_completed or async generators |
One thing to remember: Fan-out without concurrency limits is a denial-of-service attack on your own infrastructure. Always cap the parallelism with a semaphore, pool size, or queue depth.
See Also
- Python Dead Letter Queues What happens to messages that can't be delivered — and why Python systems need a lost-and-found box.
- Python Delayed Task Execution How Python programs schedule tasks to run later — like setting an alarm for your code.
- Python Distributed Locks How Python programs take turns with shared resources — like a bathroom door lock, but for computers.
- Python Message Deduplication Why computer messages sometimes get delivered twice — and how Python stops them from doing double damage.
- Python Priority Queue Patterns Why some tasks cut the line in Python — and how priority queues decide who goes first.