Python Fan-Out Fan-In Pattern — Core Concepts

What Fan-Out/Fan-In Solves

Many problems follow the same shape: you have a collection of independent work items, each can be processed separately, and you need all results before proceeding. Sequential processing wastes time because items don’t depend on each other. Fan-out/fan-in exploits this independence.

The pattern has two phases:

  • Fan-out: Distribute N tasks to workers running in parallel
  • Fan-in: Collect all N results and combine them

Fan-Out/Fan-In in Python

With asyncio.gather

The simplest form for I/O-bound work:

import asyncio

async def fetch_price(product_id):
    # Simulate API call
    await asyncio.sleep(0.1)
    return {'id': product_id, 'price': product_id * 9.99}

async def get_all_prices(product_ids):
    # Fan-out: launch all fetches concurrently
    tasks = [fetch_price(pid) for pid in product_ids]
    # Fan-in: wait for all results
    results = await asyncio.gather(*tasks)
    return results

All fetches run concurrently on the event loop. gather returns results in the same order as the input tasks.

With concurrent.futures

For CPU-bound work or when you need process-level parallelism:

from concurrent.futures import ProcessPoolExecutor

def analyze_document(doc_path):
    # CPU-intensive text analysis
    return {'path': doc_path, 'word_count': count_words(doc_path)}

def analyze_all(doc_paths):
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Fan-out
        futures = {executor.submit(analyze_document, p): p
                   for p in doc_paths}
        # Fan-in
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    return results

as_completed yields results as workers finish — useful when you want to process results incrementally rather than waiting for all of them.

With Message Queues

For distributed systems where workers run on different machines:

  1. Fan-out: Producer publishes N messages to a queue (or topic)
  2. Workers: Multiple consumers pull messages and process independently
  3. Fan-in: Workers publish results to a result queue; an aggregator collects them

This scales beyond a single machine but adds complexity: you need to track how many results to expect, handle worker failures, and deal with timeouts for slow workers.

Controlling Concurrency

Unbounded fan-out is dangerous. Launching 10,000 tasks simultaneously can exhaust memory, overload APIs, or trigger rate limits.

Use a semaphore to limit concurrent work:

async def bounded_fan_out(items, handler, max_concurrent=50):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def limited(item):
        async with semaphore:
            return await handler(item)

    return await asyncio.gather(*[limited(i) for i in items])

Error Handling

asyncio.gather has a subtle default: if one task raises, it cancels nothing. Other tasks keep running, and the exception is raised when you access the result. Pass return_exceptions=True to get exceptions as return values instead of having them raised:

results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]

For concurrent.futures, wrap future.result() in a try/except per future.

Common Misconception

“Fan-out always makes things faster.” It doesn’t if the bottleneck is shared. If all tasks hit the same database, API, or file system, fan-out just moves the queue from your code to the bottleneck. The speedup comes from parallelizing genuinely independent I/O paths or utilizing multiple CPU cores.

When to Use Each Approach

ScenarioBest Tool
Async I/O (HTTP, DB queries)asyncio.gather with semaphore
CPU-bound workProcessPoolExecutor
Cross-machine distributionCelery, RQ, or message queues
Streaming resultsas_completed or async generators

One thing to remember: Fan-out without concurrency limits is a denial-of-service attack on your own infrastructure. Always cap the parallelism with a semaphore, pool size, or queue depth.

pythonconcurrencypatterns

See Also