Python Health Check Patterns — Deep Dive

A Composable Health Check Framework

Rather than scattering health logic across endpoints, build a registry of check functions that can be composed and queried:

import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Awaitable

class Status(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class CheckResult:
    name: str
    status: Status
    duration_ms: float
    detail: str = ""
    critical: bool = True

@dataclass
class HealthRegistry:
    checks: dict[str, Callable[[], Awaitable[CheckResult]]] = field(
        default_factory=dict
    )

    def register(self, name: str, check_fn, critical: bool = True):
        async def wrapped():
            start = time.monotonic()
            try:
                result = await asyncio.wait_for(check_fn(), timeout=5.0)
                duration = (time.monotonic() - start) * 1000
                return CheckResult(
                    name=name, status=result, 
                    duration_ms=duration, critical=critical,
                )
            except asyncio.TimeoutError:
                duration = (time.monotonic() - start) * 1000
                return CheckResult(
                    name=name, status=Status.UNHEALTHY,
                    duration_ms=duration, detail="Timeout",
                    critical=critical,
                )
            except Exception as e:
                duration = (time.monotonic() - start) * 1000
                return CheckResult(
                    name=name, status=Status.UNHEALTHY,
                    duration_ms=duration, detail=str(e),
                    critical=critical,
                )
        self.checks[name] = wrapped

    async def run_all(self) -> dict:
        results = await asyncio.gather(
            *(fn() for fn in self.checks.values())
        )
        overall = Status.HEALTHY
        for r in results:
            if r.status == Status.UNHEALTHY and r.critical:
                overall = Status.UNHEALTHY
                break
            if r.status != Status.HEALTHY:
                overall = Status.DEGRADED
        return {
            "status": overall.value,
            "checks": {r.name: {
                "status": r.status.value,
                "duration_ms": round(r.duration_ms, 2),
                "detail": r.detail,
            } for r in results},
        }

Each check is independently timed and wrapped with a timeout. No single slow dependency can block the entire health response.

Implementing Individual Checks

Database Check with Connection Pool Awareness

import asyncpg

async def check_postgres(pool: asyncpg.Pool) -> Status:
    idle = pool.get_idle_size()
    total = pool.get_size()
    
    # If pool is exhausted, we're degraded even if queries work
    if idle == 0 and total >= pool.get_max_size():
        return Status.DEGRADED
    
    async with pool.acquire() as conn:
        await conn.fetchval("SELECT 1")
    return Status.HEALTHY

This goes beyond SELECT 1 — it also monitors connection pool saturation, which is an early warning signal before queries start timing out.

Redis Check with Latency Threshold

import redis.asyncio as redis

async def check_redis(client: redis.Redis) -> Status:
    start = time.monotonic()
    await client.ping()
    latency_ms = (time.monotonic() - start) * 1000
    
    if latency_ms > 50:
        return Status.DEGRADED  # Slow but working
    return Status.HEALTHY

Disk Space Check

import shutil

async def check_disk() -> Status:
    usage = shutil.disk_usage("/")
    percent_used = usage.used / usage.total * 100
    
    if percent_used > 95:
        return Status.UNHEALTHY
    if percent_used > 85:
        return Status.DEGRADED
    return Status.HEALTHY

FastAPI Integration

Wire the registry into FastAPI with separate liveness and readiness endpoints:

from fastapi import FastAPI, Response
from contextlib import asynccontextmanager

registry = HealthRegistry()

@asynccontextmanager
async def lifespan(app: FastAPI):
    pool = await asyncpg.create_pool(dsn="postgresql://...")
    redis_client = redis.Redis.from_url("redis://...")
    
    registry.register("postgres", lambda: check_postgres(pool))
    registry.register("redis", lambda: check_redis(redis_client), critical=False)
    registry.register("disk", check_disk, critical=False)
    
    yield
    await pool.close()
    await redis_client.aclose()

app = FastAPI(lifespan=lifespan)

@app.get("/healthz")
async def liveness():
    """Lightweight — just confirms the process is responsive."""
    return {"status": "healthy"}

@app.get("/readyz")
async def readiness(response: Response):
    """Deep check — verifies all dependencies."""
    result = await registry.run_all()
    if result["status"] == "unhealthy":
        response.status_code = 503
    return result

Note the endpoint naming convention: /healthz for liveness (Kubernetes convention), /readyz for readiness. The z suffix avoids conflicts with user-facing /health routes.

Circuit-Broken Health Checks

If a dependency is known to be down, repeatedly probing it wastes time and can slow your health endpoint. Apply a mini circuit breaker:

class CircuitBrokenCheck:
    def __init__(self, check_fn, failure_threshold=3, reset_after=30.0):
        self._check_fn = check_fn
        self._failures = 0
        self._threshold = failure_threshold
        self._reset_after = reset_after
        self._tripped_at: float | None = None

    async def __call__(self) -> Status:
        if self._tripped_at:
            if time.monotonic() - self._tripped_at > self._reset_after:
                self._tripped_at = None  # Try again
            else:
                return Status.UNHEALTHY  # Known bad, skip probe

        try:
            result = await self._check_fn()
            self._failures = 0
            return result
        except Exception:
            self._failures += 1
            if self._failures >= self._threshold:
                self._tripped_at = time.monotonic()
            return Status.UNHEALTHY

After 3 consecutive failures, the check short-circuits for 30 seconds, returning unhealthy instantly without hitting the dependency.

Kubernetes Configuration

containers:
  - name: api
    livenessProbe:
      httpGet:
        path: /healthz
        port: 8080
      initialDelaySeconds: 5
      periodSeconds: 10
      failureThreshold: 3
    readinessProbe:
      httpGet:
        path: /readyz
        port: 8080
      initialDelaySeconds: 10
      periodSeconds: 5
      failureThreshold: 2
    startupProbe:
      httpGet:
        path: /healthz
        port: 8080
      initialDelaySeconds: 0
      periodSeconds: 2
      failureThreshold: 30

The startup probe gives the app up to 60 seconds to initialize (30 attempts × 2 seconds). Until the startup probe succeeds, liveness and readiness probes are disabled — preventing premature restarts of slow-starting apps.

Exposing Metrics from Health Checks

Health check results are valuable metrics. Export them to Prometheus:

from prometheus_client import Gauge, Histogram

health_status = Gauge(
    "app_health_check_status",
    "Health check result (1=healthy, 0.5=degraded, 0=unhealthy)",
    ["check_name"],
)
health_duration = Histogram(
    "app_health_check_duration_seconds",
    "Time spent running health check",
    ["check_name"],
)

async def run_and_export(registry: HealthRegistry):
    result = await registry.run_all()
    for name, check in result["checks"].items():
        status_val = {"healthy": 1.0, "degraded": 0.5, "unhealthy": 0.0}
        health_status.labels(check_name=name).set(
            status_val[check["status"]]
        )
        health_duration.labels(check_name=name).observe(
            check["duration_ms"] / 1000
        )
    return result

Now you can alert on app_health_check_status < 1 and graph check latency trends over time.

Anti-Patterns to Avoid

Health check that does real work. Never run migrations, cache warming, or data processing inside a health check. It should be read-only and side-effect-free.

Shared timeout for all checks. If you give one asyncio.wait_for to the entire check suite, one slow check eats the budget for all others. Timeout each check individually.

Checking transitive dependencies. Your app talks to an API gateway that talks to a payment service. If you check the payment service health from your app, you’re creating a fragile dependency chain. Check only what your code directly connects to.

One thing to remember: Health checks are a production observability tool — layer them (liveness/readiness/startup), timeout each probe individually, and only check direct dependencies to avoid cascading false-unhealthy states.

pythonproductionmonitoring

See Also