Python Health Check Patterns — Deep Dive
A Composable Health Check Framework
Rather than scattering health logic across endpoints, build a registry of check functions that can be composed and queried:
import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, Awaitable
class Status(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class CheckResult:
name: str
status: Status
duration_ms: float
detail: str = ""
critical: bool = True
@dataclass
class HealthRegistry:
checks: dict[str, Callable[[], Awaitable[CheckResult]]] = field(
default_factory=dict
)
def register(self, name: str, check_fn, critical: bool = True):
async def wrapped():
start = time.monotonic()
try:
result = await asyncio.wait_for(check_fn(), timeout=5.0)
duration = (time.monotonic() - start) * 1000
return CheckResult(
name=name, status=result,
duration_ms=duration, critical=critical,
)
except asyncio.TimeoutError:
duration = (time.monotonic() - start) * 1000
return CheckResult(
name=name, status=Status.UNHEALTHY,
duration_ms=duration, detail="Timeout",
critical=critical,
)
except Exception as e:
duration = (time.monotonic() - start) * 1000
return CheckResult(
name=name, status=Status.UNHEALTHY,
duration_ms=duration, detail=str(e),
critical=critical,
)
self.checks[name] = wrapped
async def run_all(self) -> dict:
results = await asyncio.gather(
*(fn() for fn in self.checks.values())
)
overall = Status.HEALTHY
for r in results:
if r.status == Status.UNHEALTHY and r.critical:
overall = Status.UNHEALTHY
break
if r.status != Status.HEALTHY:
overall = Status.DEGRADED
return {
"status": overall.value,
"checks": {r.name: {
"status": r.status.value,
"duration_ms": round(r.duration_ms, 2),
"detail": r.detail,
} for r in results},
}
Each check is independently timed and wrapped with a timeout. No single slow dependency can block the entire health response.
Implementing Individual Checks
Database Check with Connection Pool Awareness
import asyncpg
async def check_postgres(pool: asyncpg.Pool) -> Status:
idle = pool.get_idle_size()
total = pool.get_size()
# If pool is exhausted, we're degraded even if queries work
if idle == 0 and total >= pool.get_max_size():
return Status.DEGRADED
async with pool.acquire() as conn:
await conn.fetchval("SELECT 1")
return Status.HEALTHY
This goes beyond SELECT 1 — it also monitors connection pool saturation, which is an early warning signal before queries start timing out.
Redis Check with Latency Threshold
import redis.asyncio as redis
async def check_redis(client: redis.Redis) -> Status:
start = time.monotonic()
await client.ping()
latency_ms = (time.monotonic() - start) * 1000
if latency_ms > 50:
return Status.DEGRADED # Slow but working
return Status.HEALTHY
Disk Space Check
import shutil
async def check_disk() -> Status:
usage = shutil.disk_usage("/")
percent_used = usage.used / usage.total * 100
if percent_used > 95:
return Status.UNHEALTHY
if percent_used > 85:
return Status.DEGRADED
return Status.HEALTHY
FastAPI Integration
Wire the registry into FastAPI with separate liveness and readiness endpoints:
from fastapi import FastAPI, Response
from contextlib import asynccontextmanager
registry = HealthRegistry()
@asynccontextmanager
async def lifespan(app: FastAPI):
pool = await asyncpg.create_pool(dsn="postgresql://...")
redis_client = redis.Redis.from_url("redis://...")
registry.register("postgres", lambda: check_postgres(pool))
registry.register("redis", lambda: check_redis(redis_client), critical=False)
registry.register("disk", check_disk, critical=False)
yield
await pool.close()
await redis_client.aclose()
app = FastAPI(lifespan=lifespan)
@app.get("/healthz")
async def liveness():
"""Lightweight — just confirms the process is responsive."""
return {"status": "healthy"}
@app.get("/readyz")
async def readiness(response: Response):
"""Deep check — verifies all dependencies."""
result = await registry.run_all()
if result["status"] == "unhealthy":
response.status_code = 503
return result
Note the endpoint naming convention: /healthz for liveness (Kubernetes convention), /readyz for readiness. The z suffix avoids conflicts with user-facing /health routes.
Circuit-Broken Health Checks
If a dependency is known to be down, repeatedly probing it wastes time and can slow your health endpoint. Apply a mini circuit breaker:
class CircuitBrokenCheck:
def __init__(self, check_fn, failure_threshold=3, reset_after=30.0):
self._check_fn = check_fn
self._failures = 0
self._threshold = failure_threshold
self._reset_after = reset_after
self._tripped_at: float | None = None
async def __call__(self) -> Status:
if self._tripped_at:
if time.monotonic() - self._tripped_at > self._reset_after:
self._tripped_at = None # Try again
else:
return Status.UNHEALTHY # Known bad, skip probe
try:
result = await self._check_fn()
self._failures = 0
return result
except Exception:
self._failures += 1
if self._failures >= self._threshold:
self._tripped_at = time.monotonic()
return Status.UNHEALTHY
After 3 consecutive failures, the check short-circuits for 30 seconds, returning unhealthy instantly without hitting the dependency.
Kubernetes Configuration
containers:
- name: api
livenessProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 3
readinessProbe:
httpGet:
path: /readyz
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
failureThreshold: 2
startupProbe:
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 0
periodSeconds: 2
failureThreshold: 30
The startup probe gives the app up to 60 seconds to initialize (30 attempts × 2 seconds). Until the startup probe succeeds, liveness and readiness probes are disabled — preventing premature restarts of slow-starting apps.
Exposing Metrics from Health Checks
Health check results are valuable metrics. Export them to Prometheus:
from prometheus_client import Gauge, Histogram
health_status = Gauge(
"app_health_check_status",
"Health check result (1=healthy, 0.5=degraded, 0=unhealthy)",
["check_name"],
)
health_duration = Histogram(
"app_health_check_duration_seconds",
"Time spent running health check",
["check_name"],
)
async def run_and_export(registry: HealthRegistry):
result = await registry.run_all()
for name, check in result["checks"].items():
status_val = {"healthy": 1.0, "degraded": 0.5, "unhealthy": 0.0}
health_status.labels(check_name=name).set(
status_val[check["status"]]
)
health_duration.labels(check_name=name).observe(
check["duration_ms"] / 1000
)
return result
Now you can alert on app_health_check_status < 1 and graph check latency trends over time.
Anti-Patterns to Avoid
Health check that does real work. Never run migrations, cache warming, or data processing inside a health check. It should be read-only and side-effect-free.
Shared timeout for all checks. If you give one asyncio.wait_for to the entire check suite, one slow check eats the budget for all others. Timeout each check individually.
Checking transitive dependencies. Your app talks to an API gateway that talks to a payment service. If you check the payment service health from your app, you’re creating a fragile dependency chain. Check only what your code directly connects to.
One thing to remember: Health checks are a production observability tool — layer them (liveness/readiness/startup), timeout each probe individually, and only check direct dependencies to avoid cascading false-unhealthy states.
See Also
- Python Ab Testing Framework How tech companies test two versions of something to see which one wins — explained with a lemonade stand experiment.
- Python Configuration Hierarchy How your Python app decides which settings to use — explained like layers of clothing on a cold day.
- Python Feature Flag Strategies How developers turn features on and off without redeploying — explained with a TV remote control analogy.
- Python Graceful Shutdown Why your Python app needs to say goodbye properly before it stops — explained with a restaurant closing analogy.
- Python Readiness Liveness Probes The two questions every cloud platform asks your Python app — explained with a school attendance analogy.