Python Bulkhead Pattern — Deep Dive

Semaphore-Based Bulkhead

The simplest bulkhead in Python uses asyncio.Semaphore to cap concurrent calls to a dependency:

import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine

@dataclass
class AsyncBulkhead:
    """Limits concurrent calls to a dependency."""
    name: str
    max_concurrent: int
    _semaphore: asyncio.Semaphore = field(init=False)
    _rejected: int = field(default=0, init=False)

    def __post_init__(self):
        self._semaphore = asyncio.Semaphore(self.max_concurrent)

    async def execute(self, func: Callable[..., Coroutine], *args: Any, **kwargs: Any) -> Any:
        if self._semaphore.locked():
            self._rejected += 1
            raise BulkheadFullError(
                f"Bulkhead '{self.name}' is full "
                f"({self.max_concurrent} concurrent calls)"
            )
        async with self._semaphore:
            return await func(*args, **kwargs)

    @property
    def available(self) -> int:
        # _value is the internal counter of Semaphore
        return self._semaphore._value

    @property
    def rejected_count(self) -> int:
        return self._rejected


class BulkheadFullError(Exception):
    pass

The key design choice: when the semaphore is locked (all permits taken), we reject immediately instead of queuing. This is the “fail fast” behavior that prevents cascading resource exhaustion.

Thread Pool Bulkhead for Blocking I/O

For synchronous libraries (database drivers, HTTP clients without async support), wrap calls in a dedicated thread pool:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial

class ThreadPoolBulkhead:
    def __init__(self, name: str, max_workers: int):
        self.name = name
        self._executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=f"bulkhead-{name}",
        )
        self._max_workers = max_workers

    async def execute(self, func, *args, **kwargs):
        loop = asyncio.get_running_loop()
        # ThreadPoolExecutor queues excess work, so we
        # check active count to decide whether to reject.
        if self._executor._work_queue.qsize() > self._max_workers:
            raise BulkheadFullError(
                f"Thread bulkhead '{self.name}' queue is overloaded"
            )
        return await loop.run_in_executor(
            self._executor, partial(func, *args, **kwargs)
        )

    def shutdown(self):
        self._executor.shutdown(wait=False)

Combining Bulkheads with Circuit Breakers

Bulkheads and circuit breakers solve different problems, but they’re powerful together. The bulkhead limits concurrent calls; the circuit breaker stops calling a service that’s clearly broken:

from enum import Enum
import time

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class ProtectedService:
    """Combines bulkhead isolation with circuit breaking."""

    def __init__(self, name: str, max_concurrent: int,
                 failure_threshold: int = 5,
                 recovery_timeout: float = 30.0):
        self.bulkhead = AsyncBulkhead(name, max_concurrent)
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._failure_threshold = failure_threshold
        self._recovery_timeout = recovery_timeout
        self._last_failure_time = 0.0

    async def call(self, func, *args, **kwargs):
        # Circuit breaker check first (cheaper than acquiring semaphore)
        if self._state == CircuitState.OPEN:
            if time.monotonic() - self._last_failure_time > self._recovery_timeout:
                self._state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError(f"Circuit for '{self.bulkhead.name}' is open")

        try:
            result = await self.bulkhead.execute(func, *args, **kwargs)
            self._on_success()
            return result
        except BulkheadFullError:
            raise  # Bulkhead rejection isn't a service failure
        except Exception as exc:
            self._on_failure()
            raise

    def _on_success(self):
        self._failure_count = 0
        self._state = CircuitState.CLOSED

    def _on_failure(self):
        self._failure_count += 1
        self._last_failure_time = time.monotonic()
        if self._failure_count >= self._failure_threshold:
            self._state = CircuitState.OPEN

class CircuitOpenError(Exception):
    pass

FastAPI Integration

Here’s a practical setup for a FastAPI application with bulkheaded dependencies:

from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager

# Define bulkheads at module level
payment_bulkhead = AsyncBulkhead("payment", max_concurrent=20)
email_bulkhead = AsyncBulkhead("email", max_concurrent=10)
search_bulkhead = AsyncBulkhead("search", max_concurrent=15)

app = FastAPI()

@app.post("/checkout")
async def checkout(order: dict):
    try:
        result = await payment_bulkhead.execute(
            process_payment, order
        )
    except BulkheadFullError:
        raise HTTPException(
            status_code=503,
            detail="Payment service is at capacity. Please retry.",
            headers={"Retry-After": "5"},
        )
    return {"status": "paid", "transaction_id": result}

@app.post("/notify")
async def notify(user_id: str, message: str):
    try:
        await email_bulkhead.execute(send_email, user_id, message)
    except BulkheadFullError:
        # Non-critical: queue for later instead of failing
        await queue_for_retry(user_id, message)
        return {"status": "queued"}
    return {"status": "sent"}

Monitoring and Observability

A bulkhead without monitoring is a black box. Export metrics for each bulkhead:

from prometheus_client import Gauge, Counter

class ObservableBulkhead(AsyncBulkhead):
    def __init__(self, name: str, max_concurrent: int):
        super().__init__(name, max_concurrent)
        self._active_gauge = Gauge(
            f"bulkhead_active_calls",
            "Currently active calls",
            ["bulkhead_name"],
        )
        self._rejected_counter = Counter(
            f"bulkhead_rejected_total",
            "Total rejected calls",
            ["bulkhead_name"],
        )

    async def execute(self, func, *args, **kwargs):
        try:
            result = await super().execute(func, *args, **kwargs)
            return result
        except BulkheadFullError:
            self._rejected_counter.labels(
                bulkhead_name=self.name
            ).inc()
            raise

    async def _tracked_execute(self, func, *args, **kwargs):
        self._active_gauge.labels(
            bulkhead_name=self.name
        ).inc()
        try:
            return await func(*args, **kwargs)
        finally:
            self._active_gauge.labels(
                bulkhead_name=self.name
            ).dec()

Sizing Bulkheads

Choosing the right size is the hardest part. Too small and you reject legitimate traffic; too large and you don’t get real isolation.

Rules of thumb:

  • Start with the P99 concurrent call count under normal traffic (check your APM tool)
  • Add 20-30% headroom for traffic spikes
  • Set it well below your total thread/connection pool size
  • Monitor rejection rates in production and adjust
ServiceNormal P99 ConcurrencyBulkhead SizeTotal Pool
Payment1220100 threads
Email510100 threads
Search815100 threads
Reserved for other work55

The reserved capacity (55 threads) ensures your app can always handle health checks, static content, and admin endpoints regardless of what happens with external services.

Process-Level Bulkheads

For the strongest isolation, run each dependency interaction in its own process. Python’s multiprocessing or separate container services give true memory isolation:

from multiprocessing import Process, Queue
import os

def isolated_worker(task_queue: Queue, result_queue: Queue):
    """Runs in a separate process — crashes don't affect the parent."""
    while True:
        task = task_queue.get()
        if task is None:
            break
        try:
            result = process_risky_operation(task)
            result_queue.put(("ok", result))
        except Exception as exc:
            result_queue.put(("error", str(exc)))

This is heavier than semaphore isolation, but it protects against memory leaks, segfaults in C extensions, and truly pathological failures that corrupt process state.

Tradeoffs

ApproachIsolation StrengthOverheadComplexity
SemaphoreLowMinimalSimple
Thread poolMediumModerateModerate
ProcessHighSignificantComplex
Container/serviceHighestHighestArchitectural

Most Python web applications get the best value from semaphore or thread-pool bulkheads. Process-level isolation is reserved for cases where a dependency uses unreliable native code or you need crash fault isolation.

One thing to remember: Size your bulkheads based on measured concurrency, not guesses. Too tight starves legitimate traffic; too loose defeats the purpose. Monitor rejection rates in production and adjust iteratively.

pythonreliabilitypatterns

See Also

  • Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
  • Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
  • Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
  • Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.
  • Python Connection Draining How to shut down a Python server without hanging up on people mid-conversation — like a store that locks the entrance but lets shoppers finish.