Python Fan-Out Fan-In Pattern — Deep Dive

Pattern Variants

Fan-out/fan-in isn’t a single pattern — it’s a family of patterns with different collection semantics:

  1. Scatter-Gather — fan out to all workers, wait for all responses (classic)
  2. First-Response — fan out to multiple, use whichever responds first (redundancy)
  3. Quorum — fan out to N, proceed once M < N respond (consensus)
  4. Streaming — fan out and process results as they arrive (pipeline)

Each variant has different error handling, timeout, and cancellation semantics.

Advanced asyncio Patterns

TaskGroup (Python 3.11+)

asyncio.TaskGroup provides structured concurrency — if any task fails, all sibling tasks are cancelled:

import asyncio

async def fan_out_structured(items, handler):
    results = {}
    async with asyncio.TaskGroup() as tg:
        for item in items:
            async def process(i=item):
                results[i] = await handler(i)
            tg.create_task(process())
    return results

This is safer than gather for cases where partial results are useless — if one fails, you want to abort everything rather than waste resources on the rest.

First-Response with Cancellation

async def first_response(coros):
    """Run multiple coroutines, return first result,
    cancel the rest."""
    tasks = [asyncio.create_task(c) for c in coros]
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
    # Suppress cancellation exceptions
    await asyncio.gather(*pending, return_exceptions=True)

    result_task = done.pop()
    return result_task.result()

# Usage: query multiple DNS servers, use fastest response
result = await first_response([
    resolve_dns('8.8.8.8', 'example.com'),
    resolve_dns('1.1.1.1', 'example.com'),
    resolve_dns('9.9.9.9', 'example.com'),
])

Quorum Collection

async def quorum_gather(coros, quorum_size):
    """Wait until quorum_size tasks complete successfully."""
    tasks = [asyncio.create_task(c) for c in coros]
    results = []
    remaining = set(tasks)

    while len(results) < quorum_size and remaining:
        done, remaining = await asyncio.wait(
            remaining, return_when=asyncio.FIRST_COMPLETED
        )
        for task in done:
            if task.exception() is None:
                results.append(task.result())

    # Cancel stragglers
    for task in remaining:
        task.cancel()
    await asyncio.gather(*remaining, return_exceptions=True)

    if len(results) < quorum_size:
        raise RuntimeError(
            f"Only {len(results)}/{quorum_size} succeeded"
        )
    return results

Backpressure-Aware Fan-Out

When fan-out produces results faster than fan-in can consume them, you need backpressure. Use a bounded queue between the fan-out and fan-in stages:

async def pipeline_with_backpressure(
    items, processor, aggregator,
    max_concurrent=20, buffer_size=50
):
    queue = asyncio.Queue(maxsize=buffer_size)
    semaphore = asyncio.Semaphore(max_concurrent)

    async def worker(item):
        async with semaphore:
            result = await processor(item)
            await queue.put(result)  # blocks if buffer full

    async def fan_in():
        collected = []
        processed = 0
        while processed < len(items):
            result = await queue.get()
            collected.append(result)
            processed += 1
        return aggregator(collected)

    producer = asyncio.gather(
        *[worker(item) for item in items]
    )
    consumer = asyncio.create_task(fan_in())

    await producer
    return await consumer

The bounded queue (maxsize=50) means workers block when the buffer is full, naturally throttling fan-out speed to match fan-in consumption.

Distributed Fan-Out with Celery

Chord Pattern

Celery’s chord is the canonical distributed fan-out/fan-in: a group of tasks followed by a callback that receives all results.

from celery import chord, group

@app.task
def process_chunk(chunk_data):
    return analyze(chunk_data)

@app.task
def aggregate_results(results):
    return {
        'total': sum(r['count'] for r in results),
        'errors': [r for r in results if r.get('error')],
    }

# Fan-out 100 chunks, fan-in with aggregate
job = chord(
    [process_chunk.s(chunk) for chunk in data_chunks],
    aggregate_results.s()
)
result = job.apply_async()

Chord Pitfalls

  1. Single point of failure — if the callback task fails, all fan-out work is wasted. Add retry logic to the callback.
  2. Memory pressure — the result backend must hold all intermediate results simultaneously. For large fan-outs (1000+ tasks), results can exhaust Redis memory.
  3. Never nest chords — Celery chords inside chords cause deadlocks with certain backends.

Alternative: Manual Result Tracking

For more control, skip chords and track results yourself:

@app.task(bind=True)
def process_and_track(self, item, job_id, total_count):
    result = process(item)

    # Atomic increment in Redis
    completed = redis_client.incr(f"job:{job_id}:completed")
    redis_client.lpush(f"job:{job_id}:results",
                       json.dumps(result))

    if completed >= total_count:
        # Last worker triggers aggregation
        aggregate_job.delay(job_id)

@app.task
def aggregate_job(job_id):
    results = redis_client.lrange(
        f"job:{job_id}:results", 0, -1
    )
    parsed = [json.loads(r) for r in results]
    final = combine(parsed)
    # Store final result and clean up
    redis_client.set(f"job:{job_id}:final",
                     json.dumps(final))
    redis_client.delete(f"job:{job_id}:completed",
                        f"job:{job_id}:results")

This approach handles partial failures better — you can define completion as “N of M” rather than “all M.”

Streaming Fan-In with Async Generators

For cases where you want to process results as they arrive rather than waiting for all:

async def streaming_fan_out(items, handler, max_concurrent=20):
    semaphore = asyncio.Semaphore(max_concurrent)
    queue = asyncio.Queue()

    async def worker(item):
        async with semaphore:
            result = await handler(item)
            await queue.put(result)

    async def producer():
        await asyncio.gather(
            *[worker(item) for item in items]
        )
        await queue.put(None)  # sentinel

    asyncio.create_task(producer())

    while True:
        result = await queue.get()
        if result is None:
            break
        yield result

# Usage: process results as they stream in
async for result in streaming_fan_out(urls, fetch_url):
    update_progress(result)
    save_to_database(result)

Performance Characteristics

PatternLatencyThroughputComplexity
asyncio.gather (unbounded)Min of all tasksLimited by event loopLow
asyncio.gather + semaphoreVaries with limitControlledLow
TaskGroup (structured)Min of all tasksLimited by event loopLow
ProcessPoolExecutorOverhead per processN × single-coreMedium
Celery chordBroker + worker overheadCluster-wideHigh
Manual Redis trackingBroker overheadCluster-wideHigh

Error Recovery Strategies

Partial Retry

Don’t retry the entire fan-out if a few tasks fail:

async def fan_out_with_retry(items, handler,
                              max_retries=3):
    results = {}
    pending = list(items)

    for attempt in range(max_retries + 1):
        tasks = {
            asyncio.create_task(handler(item)): item
            for item in pending
        }
        done, _ = await asyncio.wait(tasks.keys())

        failed = []
        for task in done:
            item = tasks[task]
            if task.exception():
                failed.append(item)
            else:
                results[item] = task.result()

        if not failed:
            break
        pending = failed

    return results, pending  # results + still-failed items

Timeout per Task

Individual task timeouts prevent one slow task from blocking the entire fan-in:

async def timed_handler(item, handler, timeout=30):
    try:
        return await asyncio.wait_for(
            handler(item), timeout=timeout
        )
    except asyncio.TimeoutError:
        return {'item': item, 'error': 'timeout'}

One thing to remember: The key design decision in fan-out/fan-in is your collection semantics: do you need all results (scatter-gather), the first result (redundancy), or enough results (quorum)? This choice determines your error handling, timeout strategy, and cancellation behavior.

pythonconcurrencypatterns

See Also