Python Bulkhead Pattern — Deep Dive
Semaphore-Based Bulkhead
The simplest bulkhead in Python uses asyncio.Semaphore to cap concurrent calls to a dependency:
import asyncio
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine
@dataclass
class AsyncBulkhead:
"""Limits concurrent calls to a dependency."""
name: str
max_concurrent: int
_semaphore: asyncio.Semaphore = field(init=False)
_rejected: int = field(default=0, init=False)
def __post_init__(self):
self._semaphore = asyncio.Semaphore(self.max_concurrent)
async def execute(self, func: Callable[..., Coroutine], *args: Any, **kwargs: Any) -> Any:
if self._semaphore.locked():
self._rejected += 1
raise BulkheadFullError(
f"Bulkhead '{self.name}' is full "
f"({self.max_concurrent} concurrent calls)"
)
async with self._semaphore:
return await func(*args, **kwargs)
@property
def available(self) -> int:
# _value is the internal counter of Semaphore
return self._semaphore._value
@property
def rejected_count(self) -> int:
return self._rejected
class BulkheadFullError(Exception):
pass
The key design choice: when the semaphore is locked (all permits taken), we reject immediately instead of queuing. This is the “fail fast” behavior that prevents cascading resource exhaustion.
Thread Pool Bulkhead for Blocking I/O
For synchronous libraries (database drivers, HTTP clients without async support), wrap calls in a dedicated thread pool:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
class ThreadPoolBulkhead:
def __init__(self, name: str, max_workers: int):
self.name = name
self._executor = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix=f"bulkhead-{name}",
)
self._max_workers = max_workers
async def execute(self, func, *args, **kwargs):
loop = asyncio.get_running_loop()
# ThreadPoolExecutor queues excess work, so we
# check active count to decide whether to reject.
if self._executor._work_queue.qsize() > self._max_workers:
raise BulkheadFullError(
f"Thread bulkhead '{self.name}' queue is overloaded"
)
return await loop.run_in_executor(
self._executor, partial(func, *args, **kwargs)
)
def shutdown(self):
self._executor.shutdown(wait=False)
Combining Bulkheads with Circuit Breakers
Bulkheads and circuit breakers solve different problems, but they’re powerful together. The bulkhead limits concurrent calls; the circuit breaker stops calling a service that’s clearly broken:
from enum import Enum
import time
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class ProtectedService:
"""Combines bulkhead isolation with circuit breaking."""
def __init__(self, name: str, max_concurrent: int,
failure_threshold: int = 5,
recovery_timeout: float = 30.0):
self.bulkhead = AsyncBulkhead(name, max_concurrent)
self._state = CircuitState.CLOSED
self._failure_count = 0
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
self._last_failure_time = 0.0
async def call(self, func, *args, **kwargs):
# Circuit breaker check first (cheaper than acquiring semaphore)
if self._state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time > self._recovery_timeout:
self._state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError(f"Circuit for '{self.bulkhead.name}' is open")
try:
result = await self.bulkhead.execute(func, *args, **kwargs)
self._on_success()
return result
except BulkheadFullError:
raise # Bulkhead rejection isn't a service failure
except Exception as exc:
self._on_failure()
raise
def _on_success(self):
self._failure_count = 0
self._state = CircuitState.CLOSED
def _on_failure(self):
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._failure_count >= self._failure_threshold:
self._state = CircuitState.OPEN
class CircuitOpenError(Exception):
pass
FastAPI Integration
Here’s a practical setup for a FastAPI application with bulkheaded dependencies:
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
# Define bulkheads at module level
payment_bulkhead = AsyncBulkhead("payment", max_concurrent=20)
email_bulkhead = AsyncBulkhead("email", max_concurrent=10)
search_bulkhead = AsyncBulkhead("search", max_concurrent=15)
app = FastAPI()
@app.post("/checkout")
async def checkout(order: dict):
try:
result = await payment_bulkhead.execute(
process_payment, order
)
except BulkheadFullError:
raise HTTPException(
status_code=503,
detail="Payment service is at capacity. Please retry.",
headers={"Retry-After": "5"},
)
return {"status": "paid", "transaction_id": result}
@app.post("/notify")
async def notify(user_id: str, message: str):
try:
await email_bulkhead.execute(send_email, user_id, message)
except BulkheadFullError:
# Non-critical: queue for later instead of failing
await queue_for_retry(user_id, message)
return {"status": "queued"}
return {"status": "sent"}
Monitoring and Observability
A bulkhead without monitoring is a black box. Export metrics for each bulkhead:
from prometheus_client import Gauge, Counter
class ObservableBulkhead(AsyncBulkhead):
def __init__(self, name: str, max_concurrent: int):
super().__init__(name, max_concurrent)
self._active_gauge = Gauge(
f"bulkhead_active_calls",
"Currently active calls",
["bulkhead_name"],
)
self._rejected_counter = Counter(
f"bulkhead_rejected_total",
"Total rejected calls",
["bulkhead_name"],
)
async def execute(self, func, *args, **kwargs):
try:
result = await super().execute(func, *args, **kwargs)
return result
except BulkheadFullError:
self._rejected_counter.labels(
bulkhead_name=self.name
).inc()
raise
async def _tracked_execute(self, func, *args, **kwargs):
self._active_gauge.labels(
bulkhead_name=self.name
).inc()
try:
return await func(*args, **kwargs)
finally:
self._active_gauge.labels(
bulkhead_name=self.name
).dec()
Sizing Bulkheads
Choosing the right size is the hardest part. Too small and you reject legitimate traffic; too large and you don’t get real isolation.
Rules of thumb:
- Start with the P99 concurrent call count under normal traffic (check your APM tool)
- Add 20-30% headroom for traffic spikes
- Set it well below your total thread/connection pool size
- Monitor rejection rates in production and adjust
| Service | Normal P99 Concurrency | Bulkhead Size | Total Pool |
|---|---|---|---|
| Payment | 12 | 20 | 100 threads |
| 5 | 10 | 100 threads | |
| Search | 8 | 15 | 100 threads |
| Reserved for other work | — | 55 | — |
The reserved capacity (55 threads) ensures your app can always handle health checks, static content, and admin endpoints regardless of what happens with external services.
Process-Level Bulkheads
For the strongest isolation, run each dependency interaction in its own process. Python’s multiprocessing or separate container services give true memory isolation:
from multiprocessing import Process, Queue
import os
def isolated_worker(task_queue: Queue, result_queue: Queue):
"""Runs in a separate process — crashes don't affect the parent."""
while True:
task = task_queue.get()
if task is None:
break
try:
result = process_risky_operation(task)
result_queue.put(("ok", result))
except Exception as exc:
result_queue.put(("error", str(exc)))
This is heavier than semaphore isolation, but it protects against memory leaks, segfaults in C extensions, and truly pathological failures that corrupt process state.
Tradeoffs
| Approach | Isolation Strength | Overhead | Complexity |
|---|---|---|---|
| Semaphore | Low | Minimal | Simple |
| Thread pool | Medium | Moderate | Moderate |
| Process | High | Significant | Complex |
| Container/service | Highest | Highest | Architectural |
Most Python web applications get the best value from semaphore or thread-pool bulkheads. Process-level isolation is reserved for cases where a dependency uses unreliable native code or you need crash fault isolation.
One thing to remember: Size your bulkheads based on measured concurrency, not guesses. Too tight starves legitimate traffic; too loose defeats the purpose. Monitor rejection rates in production and adjust iteratively.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.
- Python Connection Draining How to shut down a Python server without hanging up on people mid-conversation — like a store that locks the entrance but lets shoppers finish.