Python Readiness & Liveness Probes — Deep Dive
Liveness: Detecting the Undetectable
The hardest failures to catch are silent ones — a deadlocked event loop, a thread stuck on a lock, a process that’s technically alive but doing nothing. A good liveness probe detects these.
Event Loop Deadlock Detection
For asyncio-based apps, if the event loop is stuck, it can’t serve the liveness endpoint either. One approach: run the liveness server on a separate thread:
import asyncio
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
class LivenessState:
def __init__(self, max_staleness: float = 30.0):
self.last_heartbeat = time.monotonic()
self.max_staleness = max_staleness
def beat(self):
self.last_heartbeat = time.monotonic()
@property
def is_alive(self) -> bool:
return (time.monotonic() - self.last_heartbeat) < self.max_staleness
liveness = LivenessState()
class LivenessHandler(BaseHTTPRequestHandler):
def do_GET(self):
if liveness.is_alive:
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"status":"alive"}')
else:
self.send_response(503)
self.end_headers()
self.wfile.write(b'{"status":"deadlocked"}')
def log_message(self, format, *args):
pass # Suppress access logs
def start_liveness_server(port=8081):
server = HTTPServer(("0.0.0.0", port), LivenessHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server
The main event loop periodically calls liveness.beat():
async def heartbeat_loop():
while True:
liveness.beat()
await asyncio.sleep(5)
# In your app startup:
asyncio.create_task(heartbeat_loop())
start_liveness_server(port=8081)
If the event loop freezes, heartbeat_loop stops updating the timestamp, and the liveness endpoint (running on a separate thread) returns 503. Kubernetes restarts the container.
Thread Pool Exhaustion Detection
For sync-threaded apps (Gunicorn with sync workers), detect when all workers are busy:
import os
import psutil
def check_thread_health() -> bool:
process = psutil.Process(os.getpid())
threads = process.threads()
# Count threads in specific states
# If all threads are in I/O wait, the process is effectively stuck
stuck_threshold = len(threads) * 0.9
# Simplified: check if the process has handled
# any requests recently via a counter
return request_counter.value > last_check_counter
Readiness: Dependency-Aware Traffic Gating
Structured Readiness with Weighted Dependencies
Not all dependencies are equal. A failed cache degrades performance; a failed database stops everything:
from dataclasses import dataclass
from enum import Enum
class Impact(Enum):
CRITICAL = "critical" # Must be healthy for readiness
DEGRADED = "degraded" # App works but impaired
OPTIONAL = "optional" # Nice to have
@dataclass
class DependencyCheck:
name: str
check_fn: callable
impact: Impact
timeout: float = 3.0
class ReadinessChecker:
def __init__(self):
self.dependencies: list[DependencyCheck] = []
self._ready = False
def add(self, name: str, check_fn, impact: Impact, timeout: float = 3.0):
self.dependencies.append(
DependencyCheck(name, check_fn, impact, timeout)
)
async def check(self) -> dict:
results = {}
is_ready = True
checks = [
self._run_check(dep) for dep in self.dependencies
]
check_results = await asyncio.gather(*checks)
for dep, (healthy, detail) in zip(self.dependencies, check_results):
results[dep.name] = {
"healthy": healthy,
"impact": dep.impact.value,
"detail": detail,
}
if not healthy and dep.impact == Impact.CRITICAL:
is_ready = False
self._ready = is_ready
return {"ready": is_ready, "dependencies": results}
async def _run_check(self, dep: DependencyCheck) -> tuple[bool, str]:
try:
result = await asyncio.wait_for(
dep.check_fn(), timeout=dep.timeout
)
return (True, "ok")
except asyncio.TimeoutError:
return (False, f"timeout after {dep.timeout}s")
except Exception as e:
return (False, str(e))
Registration Example
readiness = ReadinessChecker()
async def check_db():
async with db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
async def check_redis():
await redis_client.ping()
async def check_s3():
await s3_client.head_bucket(Bucket="my-bucket")
readiness.add("postgres", check_db, Impact.CRITICAL)
readiness.add("redis", check_redis, Impact.DEGRADED, timeout=2.0)
readiness.add("s3", check_s3, Impact.OPTIONAL, timeout=5.0)
Startup Probes: Gating Initialization
For apps that load ML models or perform migrations:
class StartupGate:
def __init__(self):
self._checks: dict[str, bool] = {}
def require(self, name: str):
self._checks[name] = False
def mark_ready(self, name: str):
if name in self._checks:
self._checks[name] = True
@property
def is_started(self) -> bool:
return all(self._checks.values())
@property
def status(self) -> dict:
return {
"started": self.is_started,
"checks": dict(self._checks),
}
startup_gate = StartupGate()
startup_gate.require("database_migrations")
startup_gate.require("model_loading")
startup_gate.require("cache_warmup")
# During initialization:
async def init_app():
await run_migrations()
startup_gate.mark_ready("database_migrations")
await load_ml_model()
startup_gate.mark_ready("model_loading")
await warm_cache()
startup_gate.mark_ready("cache_warmup")
The startup endpoint:
@app.get("/startupz")
async def startup_check(response: Response):
status = startup_gate.status
if not status["started"]:
response.status_code = 503
return status
Complete Kubernetes Configuration
apiVersion: apps/v1
kind: Deployment
metadata:
name: python-api
spec:
replicas: 3
template:
spec:
terminationGracePeriodSeconds: 35
containers:
- name: api
image: myapp:latest
ports:
- containerPort: 8080
name: http
- containerPort: 8081
name: liveness
startupProbe:
httpGet:
path: /startupz
port: http
periodSeconds: 3
failureThreshold: 40 # 120s max startup
timeoutSeconds: 2
livenessProbe:
httpGet:
path: /healthz
port: liveness # Separate port!
periodSeconds: 10
failureThreshold: 3 # 30s tolerance
timeoutSeconds: 2
readinessProbe:
httpGet:
path: /readyz
port: http
periodSeconds: 5
failureThreshold: 2 # 10s to remove from LB
timeoutSeconds: 3
lifecycle:
preStop:
exec:
command: ["sleep", "5"]
Key decisions:
- Liveness on a separate port (8081) — if the main HTTP server is stuck, the liveness server on a separate thread still responds
- Startup probe with high failureThreshold — allows up to 120 seconds for model loading
- Readiness with low failureThreshold — removes unhealthy pods from traffic quickly
- preStop sleep — allows kube-proxy to update before the app begins shutdown
Probe Response Caching
Running dependency checks every 5 seconds under load can be wasteful. Cache results briefly:
import time
class CachedReadiness:
def __init__(self, checker: ReadinessChecker, ttl: float = 3.0):
self._checker = checker
self._ttl = ttl
self._cached: dict | None = None
self._cached_at: float = 0
async def check(self) -> dict:
now = time.monotonic()
if self._cached and (now - self._cached_at) < self._ttl:
return self._cached
result = await self._checker.check()
self._cached = result
self._cached_at = now
return result
A 3-second TTL means even with periodSeconds: 5, you never run checks more often than every 3 seconds, and concurrent probe requests share results.
Debugging Probe Failures
When probes fail in production, you need visibility. Add structured logging:
import structlog
logger = structlog.get_logger()
async def readiness_endpoint(response: Response):
result = await readiness.check()
if not result["ready"]:
failed = {
k: v for k, v in result["dependencies"].items()
if not v["healthy"]
}
logger.warning(
"readiness_check_failed",
failed_dependencies=failed,
)
response.status_code = 503
return result
Also emit Kubernetes events for repeated failures:
# Quick debug: check probe status
kubectl describe pod <pod-name> | grep -A5 "Conditions:"
kubectl get events --field-selector involvedObject.name=<pod-name>
Anti-Pattern: The Thundering Herd
If all pods check the database at the same time (synchronized probe intervals), you get a spike of SELECT 1 queries. Add jitter to your probe configuration or your check timing:
import random
async def jittered_check(check_fn, max_jitter: float = 1.0):
await asyncio.sleep(random.uniform(0, max_jitter))
return await check_fn()
One thing to remember: Liveness detects stuck processes (keep it simple, dependency-free, possibly on a separate thread). Readiness gates traffic based on dependency health. Startup prevents premature probing during initialization. Each probe has a distinct purpose — mixing them up causes outages.
See Also
- Python Ab Testing Framework How tech companies test two versions of something to see which one wins — explained with a lemonade stand experiment.
- Python Configuration Hierarchy How your Python app decides which settings to use — explained like layers of clothing on a cold day.
- Python Feature Flag Strategies How developers turn features on and off without redeploying — explained with a TV remote control analogy.
- Python Graceful Shutdown Why your Python app needs to say goodbye properly before it stops — explained with a restaurant closing analogy.
- Python Health Check Patterns Why your Python app needs regular check-ups — explained like a doctor's visit for software.