Python Circuit Breaker Pattern — Deep Dive

Building a Circuit Breaker from Scratch

A production-grade implementation needs thread safety, configurable thresholds, and proper state transitions:

import asyncio
import time
from enum import Enum
from dataclasses import dataclass, field
from collections import deque
from typing import Callable

class CircuitState(str, Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5
    success_threshold: int = 2  # Successes needed in half-open to close
    timeout: float = 30.0  # Seconds to wait before half-open
    window_size: float = 60.0  # Sliding window for failure counting
    slow_call_threshold: float = 10.0  # Seconds
    slow_call_rate_threshold: float = 0.5

class CircuitBreaker:
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self.config = config or CircuitBreakerConfig()
        self.state = CircuitState.CLOSED
        self._failures: deque = deque()
        self._successes_in_half_open = 0
        self._last_failure_time = 0.0
        self._lock = asyncio.Lock()
        self._listeners: list[Callable] = []

    def on_state_change(self, listener: Callable):
        self._listeners.append(listener)

    async def _notify(self, old_state, new_state):
        for listener in self._listeners:
            if asyncio.iscoroutinefunction(listener):
                await listener(self.name, old_state, new_state)
            else:
                listener(self.name, old_state, new_state)

    async def _transition(self, new_state: CircuitState):
        if self.state != new_state:
            old = self.state
            self.state = new_state
            await self._notify(old, new_state)

    def _clean_window(self):
        cutoff = time.monotonic() - self.config.window_size
        while self._failures and self._failures[0] < cutoff:
            self._failures.popleft()

    async def call(self, func, *args, **kwargs):
        async with self._lock:
            if self.state == CircuitState.OPEN:
                if time.monotonic() - self._last_failure_time >= self.config.timeout:
                    await self._transition(CircuitState.HALF_OPEN)
                    self._successes_in_half_open = 0
                else:
                    raise CircuitOpenError(
                        f"Circuit '{self.name}' is open. "
                        f"Retry after {self.config.timeout}s"
                    )

        start = time.monotonic()
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)

            duration = time.monotonic() - start
            await self._on_success(duration)
            return result

        except Exception as e:
            await self._on_failure()
            raise

    async def _on_success(self, duration: float):
        async with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self._successes_in_half_open += 1
                if self._successes_in_half_open >= self.config.success_threshold:
                    self._failures.clear()
                    await self._transition(CircuitState.CLOSED)

    async def _on_failure(self):
        async with self._lock:
            now = time.monotonic()
            self._failures.append(now)
            self._last_failure_time = now
            self._clean_window()

            if self.state == CircuitState.HALF_OPEN:
                await self._transition(CircuitState.OPEN)
            elif len(self._failures) >= self.config.failure_threshold:
                await self._transition(CircuitState.OPEN)

class CircuitOpenError(Exception):
    pass

Using the Circuit Breaker

As a Wrapper

payment_breaker = CircuitBreaker(
    "payment-api",
    CircuitBreakerConfig(failure_threshold=3, timeout=30.0),
)

async def charge_customer(amount: int, token: str):
    try:
        return await payment_breaker.call(
            _call_payment_api, amount=amount, token=token
        )
    except CircuitOpenError:
        # Fallback: queue for later processing
        await queue_payment(amount, token)
        return {"status": "queued", "message": "Payment will be processed shortly"}

async def _call_payment_api(amount: int, token: str):
    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.post(
            "https://payments.example.com/charge",
            json={"amount": amount, "token": token},
        )
        resp.raise_for_status()
        return resp.json()

As a Decorator

def circuit_breaker(breaker: CircuitBreaker):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            return await breaker.call(func, *args, **kwargs)
        return wrapper
    return decorator

recommendation_breaker = CircuitBreaker(
    "recommendations",
    CircuitBreakerConfig(failure_threshold=5, timeout=60.0),
)

@circuit_breaker(recommendation_breaker)
async def get_recommendations(user_id: int):
    async with httpx.AsyncClient(timeout=5) as client:
        resp = await client.get(f"https://recs.internal/users/{user_id}")
        resp.raise_for_status()
        return resp.json()

Using pybreaker

The pybreaker library provides a battle-tested implementation:

import pybreaker

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=30,
    exclude=[ValueError],  # Don't count these as failures
)

@db_breaker
def query_database(sql: str):
    return db.execute(sql)

pybreaker supports listeners for monitoring:

class CircuitMonitor(pybreaker.CircuitBreakerListener):
    def state_change(self, cb, old_state, new_state):
        logger.warning(
            f"Circuit '{cb.name}' changed: {old_state.name}{new_state.name}"
        )
        metrics.gauge(f"circuit.{cb.name}.state", 1 if new_state.name == "open" else 0)

    def failure(self, cb, exc):
        metrics.increment(f"circuit.{cb.name}.failures")

    def success(self, cb):
        metrics.increment(f"circuit.{cb.name}.successes")

db_breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=30,
    listeners=[CircuitMonitor()],
)

FastAPI Integration

Create a dependency that provides circuit-broken HTTP clients:

from fastapi import FastAPI, Depends, HTTPException

app = FastAPI()
breakers: dict[str, CircuitBreaker] = {}

def get_breaker(service: str) -> CircuitBreaker:
    if service not in breakers:
        breakers[service] = CircuitBreaker(service)
    return breakers[service]

@app.get("/products/{product_id}")
async def get_product(product_id: int):
    breaker = get_breaker("product-service")

    try:
        product = await breaker.call(fetch_product, product_id)
    except CircuitOpenError:
        # Return cached version
        cached = await cache.get(f"product:{product_id}")
        if cached:
            return {**cached, "_source": "cache", "_circuit": "open"}
        raise HTTPException(503, "Product service unavailable")

    # Cache successful responses for fallback
    await cache.set(f"product:{product_id}", product, ex=3600)
    return product

Health Check Integration

Expose circuit breaker states in your health endpoint:

@app.get("/health/circuits")
async def circuit_health():
    return {
        name: {
            "state": breaker.state.value,
            "failure_count": len(breaker._failures),
        }
        for name, breaker in breakers.items()
    }

Circuit Breaker with Retry

Combine both patterns correctly — retries happen inside the circuit breaker:

from tenacity import retry, stop_after_attempt, wait_exponential_jitter

@circuit_breaker(payment_breaker)
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=0.5, max=5),
    retry=retry_if_exception_type(ConnectionError),
)
async def call_payment_service(data: dict):
    async with httpx.AsyncClient(timeout=5) as client:
        resp = await client.post("https://payments.internal/charge", json=data)
        resp.raise_for_status()
        return resp.json()

The retry decorator is applied first (inner), so retries happen before the circuit breaker sees a failure. If all 3 retries fail, the circuit breaker records one failure. After enough of these compound failures, the circuit trips.

Never put the circuit breaker inside the retry — that would retry against an open circuit, which is pointless.

Sliding Window Implementations

Count-Based Window

Track the last N calls. If more than X% failed, trip the circuit:

class CountBasedWindow:
    def __init__(self, size: int = 10):
        self.size = size
        self.results: deque[bool] = deque(maxlen=size)

    def record(self, success: bool):
        self.results.append(success)

    @property
    def failure_rate(self) -> float:
        if not self.results:
            return 0.0
        failures = sum(1 for r in self.results if not r)
        return failures / len(self.results)

Time-Based Window

Track calls within a time period. More responsive to sudden changes:

class TimeBasedWindow:
    def __init__(self, duration: float = 60.0):
        self.duration = duration
        self.calls: deque[tuple[float, bool]] = deque()

    def record(self, success: bool):
        now = time.monotonic()
        self.calls.append((now, success))
        self._clean()

    def _clean(self):
        cutoff = time.monotonic() - self.duration
        while self.calls and self.calls[0][0] < cutoff:
            self.calls.popleft()

    @property
    def failure_rate(self) -> float:
        self._clean()
        if not self.calls:
            return 0.0
        failures = sum(1 for _, success in self.calls if not success)
        return failures / len(self.calls)

Observability

Circuit breaker state changes are critical operational signals. Emit them as metrics and structured logs:

async def monitor_state_change(name: str, old_state, new_state):
    logger.warning(
        "circuit_state_change",
        extra={
            "circuit": name,
            "from": old_state.value,
            "to": new_state.value,
        },
    )

    # Prometheus metrics
    circuit_state_gauge.labels(circuit=name).set(
        {"closed": 0, "half_open": 0.5, "open": 1}[new_state.value]
    )

    # Alert on open circuits
    if new_state == CircuitState.OPEN:
        await alerting.send(
            severity="warning",
            message=f"Circuit breaker '{name}' tripped to OPEN",
        )

payment_breaker.on_state_change(monitor_state_change)

Dashboard essentials for circuit breakers:

  • State per circuit (closed/open/half-open) as a gauge
  • Failure rate per circuit over time
  • Number of requests rejected by open circuits
  • Time spent in open state per circuit

Testing Circuit Breakers

@pytest.mark.asyncio
async def test_circuit_trips_after_threshold():
    breaker = CircuitBreaker("test", CircuitBreakerConfig(failure_threshold=3))

    async def failing_call():
        raise ConnectionError("down")

    for _ in range(3):
        with pytest.raises(ConnectionError):
            await breaker.call(failing_call)

    # Circuit should now be open
    assert breaker.state == CircuitState.OPEN
    with pytest.raises(CircuitOpenError):
        await breaker.call(failing_call)

@pytest.mark.asyncio
async def test_circuit_recovers_in_half_open():
    breaker = CircuitBreaker(
        "test",
        CircuitBreakerConfig(failure_threshold=2, timeout=0.1, success_threshold=1),
    )

    async def failing(): raise ConnectionError()
    async def succeeding(): return "ok"

    # Trip the circuit
    for _ in range(2):
        with pytest.raises(ConnectionError):
            await breaker.call(failing)

    assert breaker.state == CircuitState.OPEN

    # Wait for timeout
    await asyncio.sleep(0.15)

    # Half-open: one success should close it
    result = await breaker.call(succeeding)
    assert result == "ok"
    assert breaker.state == CircuitState.CLOSED

The one thing to remember: A production circuit breaker needs a sliding window for failure tracking, state change listeners for observability, proper ordering with retries (retries inside, circuit breaker outside), and fallback strategies for every protected call.

pythonreliabilitypatterns

See Also

  • Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
  • Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
  • Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
  • Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.
  • Python Connection Draining How to shut down a Python server without hanging up on people mid-conversation — like a store that locks the entrance but lets shoppers finish.