Python Circuit Breaker Pattern — Deep Dive
Building a Circuit Breaker from Scratch
A production-grade implementation needs thread safety, configurable thresholds, and proper state transitions:
import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
from collections import deque
from typing import Callable
class CircuitState(str, Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5
success_threshold: int = 2 # Successes needed in half-open to close
timeout: float = 30.0 # Seconds to wait before half-open
window_size: float = 60.0 # Sliding window for failure counting
slow_call_threshold: float = 10.0 # Seconds
slow_call_rate_threshold: float = 0.5
class CircuitBreaker:
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self.config = config or CircuitBreakerConfig()
self.state = CircuitState.CLOSED
self._failures: deque = deque()
self._successes_in_half_open = 0
self._last_failure_time = 0.0
self._lock = asyncio.Lock()
self._listeners: list[Callable] = []
def on_state_change(self, listener: Callable):
self._listeners.append(listener)
async def _notify(self, old_state, new_state):
for listener in self._listeners:
if asyncio.iscoroutinefunction(listener):
await listener(self.name, old_state, new_state)
else:
listener(self.name, old_state, new_state)
async def _transition(self, new_state: CircuitState):
if self.state != new_state:
old = self.state
self.state = new_state
await self._notify(old, new_state)
def _clean_window(self):
cutoff = time.monotonic() - self.config.window_size
while self._failures and self._failures[0] < cutoff:
self._failures.popleft()
async def call(self, func, *args, **kwargs):
async with self._lock:
if self.state == CircuitState.OPEN:
if time.monotonic() - self._last_failure_time >= self.config.timeout:
await self._transition(CircuitState.HALF_OPEN)
self._successes_in_half_open = 0
else:
raise CircuitOpenError(
f"Circuit '{self.name}' is open. "
f"Retry after {self.config.timeout}s"
)
start = time.monotonic()
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
duration = time.monotonic() - start
await self._on_success(duration)
return result
except Exception as e:
await self._on_failure()
raise
async def _on_success(self, duration: float):
async with self._lock:
if self.state == CircuitState.HALF_OPEN:
self._successes_in_half_open += 1
if self._successes_in_half_open >= self.config.success_threshold:
self._failures.clear()
await self._transition(CircuitState.CLOSED)
async def _on_failure(self):
async with self._lock:
now = time.monotonic()
self._failures.append(now)
self._last_failure_time = now
self._clean_window()
if self.state == CircuitState.HALF_OPEN:
await self._transition(CircuitState.OPEN)
elif len(self._failures) >= self.config.failure_threshold:
await self._transition(CircuitState.OPEN)
class CircuitOpenError(Exception):
pass
Using the Circuit Breaker
As a Wrapper
payment_breaker = CircuitBreaker(
"payment-api",
CircuitBreakerConfig(failure_threshold=3, timeout=30.0),
)
async def charge_customer(amount: int, token: str):
try:
return await payment_breaker.call(
_call_payment_api, amount=amount, token=token
)
except CircuitOpenError:
# Fallback: queue for later processing
await queue_payment(amount, token)
return {"status": "queued", "message": "Payment will be processed shortly"}
async def _call_payment_api(amount: int, token: str):
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
"https://payments.example.com/charge",
json={"amount": amount, "token": token},
)
resp.raise_for_status()
return resp.json()
As a Decorator
def circuit_breaker(breaker: CircuitBreaker):
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
return await breaker.call(func, *args, **kwargs)
return wrapper
return decorator
recommendation_breaker = CircuitBreaker(
"recommendations",
CircuitBreakerConfig(failure_threshold=5, timeout=60.0),
)
@circuit_breaker(recommendation_breaker)
async def get_recommendations(user_id: int):
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.get(f"https://recs.internal/users/{user_id}")
resp.raise_for_status()
return resp.json()
Using pybreaker
The pybreaker library provides a battle-tested implementation:
import pybreaker
db_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=30,
exclude=[ValueError], # Don't count these as failures
)
@db_breaker
def query_database(sql: str):
return db.execute(sql)
pybreaker supports listeners for monitoring:
class CircuitMonitor(pybreaker.CircuitBreakerListener):
def state_change(self, cb, old_state, new_state):
logger.warning(
f"Circuit '{cb.name}' changed: {old_state.name} → {new_state.name}"
)
metrics.gauge(f"circuit.{cb.name}.state", 1 if new_state.name == "open" else 0)
def failure(self, cb, exc):
metrics.increment(f"circuit.{cb.name}.failures")
def success(self, cb):
metrics.increment(f"circuit.{cb.name}.successes")
db_breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=30,
listeners=[CircuitMonitor()],
)
FastAPI Integration
Create a dependency that provides circuit-broken HTTP clients:
from fastapi import FastAPI, Depends, HTTPException
app = FastAPI()
breakers: dict[str, CircuitBreaker] = {}
def get_breaker(service: str) -> CircuitBreaker:
if service not in breakers:
breakers[service] = CircuitBreaker(service)
return breakers[service]
@app.get("/products/{product_id}")
async def get_product(product_id: int):
breaker = get_breaker("product-service")
try:
product = await breaker.call(fetch_product, product_id)
except CircuitOpenError:
# Return cached version
cached = await cache.get(f"product:{product_id}")
if cached:
return {**cached, "_source": "cache", "_circuit": "open"}
raise HTTPException(503, "Product service unavailable")
# Cache successful responses for fallback
await cache.set(f"product:{product_id}", product, ex=3600)
return product
Health Check Integration
Expose circuit breaker states in your health endpoint:
@app.get("/health/circuits")
async def circuit_health():
return {
name: {
"state": breaker.state.value,
"failure_count": len(breaker._failures),
}
for name, breaker in breakers.items()
}
Circuit Breaker with Retry
Combine both patterns correctly — retries happen inside the circuit breaker:
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
@circuit_breaker(payment_breaker)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.5, max=5),
retry=retry_if_exception_type(ConnectionError),
)
async def call_payment_service(data: dict):
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.post("https://payments.internal/charge", json=data)
resp.raise_for_status()
return resp.json()
The retry decorator is applied first (inner), so retries happen before the circuit breaker sees a failure. If all 3 retries fail, the circuit breaker records one failure. After enough of these compound failures, the circuit trips.
Never put the circuit breaker inside the retry — that would retry against an open circuit, which is pointless.
Sliding Window Implementations
Count-Based Window
Track the last N calls. If more than X% failed, trip the circuit:
class CountBasedWindow:
def __init__(self, size: int = 10):
self.size = size
self.results: deque[bool] = deque(maxlen=size)
def record(self, success: bool):
self.results.append(success)
@property
def failure_rate(self) -> float:
if not self.results:
return 0.0
failures = sum(1 for r in self.results if not r)
return failures / len(self.results)
Time-Based Window
Track calls within a time period. More responsive to sudden changes:
class TimeBasedWindow:
def __init__(self, duration: float = 60.0):
self.duration = duration
self.calls: deque[tuple[float, bool]] = deque()
def record(self, success: bool):
now = time.monotonic()
self.calls.append((now, success))
self._clean()
def _clean(self):
cutoff = time.monotonic() - self.duration
while self.calls and self.calls[0][0] < cutoff:
self.calls.popleft()
@property
def failure_rate(self) -> float:
self._clean()
if not self.calls:
return 0.0
failures = sum(1 for _, success in self.calls if not success)
return failures / len(self.calls)
Observability
Circuit breaker state changes are critical operational signals. Emit them as metrics and structured logs:
async def monitor_state_change(name: str, old_state, new_state):
logger.warning(
"circuit_state_change",
extra={
"circuit": name,
"from": old_state.value,
"to": new_state.value,
},
)
# Prometheus metrics
circuit_state_gauge.labels(circuit=name).set(
{"closed": 0, "half_open": 0.5, "open": 1}[new_state.value]
)
# Alert on open circuits
if new_state == CircuitState.OPEN:
await alerting.send(
severity="warning",
message=f"Circuit breaker '{name}' tripped to OPEN",
)
payment_breaker.on_state_change(monitor_state_change)
Dashboard essentials for circuit breakers:
- State per circuit (closed/open/half-open) as a gauge
- Failure rate per circuit over time
- Number of requests rejected by open circuits
- Time spent in open state per circuit
Testing Circuit Breakers
@pytest.mark.asyncio
async def test_circuit_trips_after_threshold():
breaker = CircuitBreaker("test", CircuitBreakerConfig(failure_threshold=3))
async def failing_call():
raise ConnectionError("down")
for _ in range(3):
with pytest.raises(ConnectionError):
await breaker.call(failing_call)
# Circuit should now be open
assert breaker.state == CircuitState.OPEN
with pytest.raises(CircuitOpenError):
await breaker.call(failing_call)
@pytest.mark.asyncio
async def test_circuit_recovers_in_half_open():
breaker = CircuitBreaker(
"test",
CircuitBreakerConfig(failure_threshold=2, timeout=0.1, success_threshold=1),
)
async def failing(): raise ConnectionError()
async def succeeding(): return "ok"
# Trip the circuit
for _ in range(2):
with pytest.raises(ConnectionError):
await breaker.call(failing)
assert breaker.state == CircuitState.OPEN
# Wait for timeout
await asyncio.sleep(0.15)
# Half-open: one success should close it
result = await breaker.call(succeeding)
assert result == "ok"
assert breaker.state == CircuitState.CLOSED
The one thing to remember: A production circuit breaker needs a sliding window for failure tracking, state change listeners for observability, proper ordering with retries (retries inside, circuit breaker outside), and fallback strategies for every protected call.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.
- Python Connection Draining How to shut down a Python server without hanging up on people mid-conversation — like a store that locks the entrance but lets shoppers finish.