Python Readiness & Liveness Probes — Deep Dive

Liveness: Detecting the Undetectable

The hardest failures to catch are silent ones — a deadlocked event loop, a thread stuck on a lock, a process that’s technically alive but doing nothing. A good liveness probe detects these.

Event Loop Deadlock Detection

For asyncio-based apps, if the event loop is stuck, it can’t serve the liveness endpoint either. One approach: run the liveness server on a separate thread:

import asyncio
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler

class LivenessState:
    def __init__(self, max_staleness: float = 30.0):
        self.last_heartbeat = time.monotonic()
        self.max_staleness = max_staleness

    def beat(self):
        self.last_heartbeat = time.monotonic()

    @property
    def is_alive(self) -> bool:
        return (time.monotonic() - self.last_heartbeat) < self.max_staleness

liveness = LivenessState()

class LivenessHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if liveness.is_alive:
            self.send_response(200)
            self.end_headers()
            self.wfile.write(b'{"status":"alive"}')
        else:
            self.send_response(503)
            self.end_headers()
            self.wfile.write(b'{"status":"deadlocked"}')

    def log_message(self, format, *args):
        pass  # Suppress access logs

def start_liveness_server(port=8081):
    server = HTTPServer(("0.0.0.0", port), LivenessHandler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()
    return server

The main event loop periodically calls liveness.beat():

async def heartbeat_loop():
    while True:
        liveness.beat()
        await asyncio.sleep(5)

# In your app startup:
asyncio.create_task(heartbeat_loop())
start_liveness_server(port=8081)

If the event loop freezes, heartbeat_loop stops updating the timestamp, and the liveness endpoint (running on a separate thread) returns 503. Kubernetes restarts the container.

Thread Pool Exhaustion Detection

For sync-threaded apps (Gunicorn with sync workers), detect when all workers are busy:

import os
import psutil

def check_thread_health() -> bool:
    process = psutil.Process(os.getpid())
    threads = process.threads()
    
    # Count threads in specific states
    # If all threads are in I/O wait, the process is effectively stuck
    stuck_threshold = len(threads) * 0.9
    
    # Simplified: check if the process has handled
    # any requests recently via a counter
    return request_counter.value > last_check_counter

Readiness: Dependency-Aware Traffic Gating

Structured Readiness with Weighted Dependencies

Not all dependencies are equal. A failed cache degrades performance; a failed database stops everything:

from dataclasses import dataclass
from enum import Enum

class Impact(Enum):
    CRITICAL = "critical"      # Must be healthy for readiness
    DEGRADED = "degraded"      # App works but impaired
    OPTIONAL = "optional"      # Nice to have

@dataclass
class DependencyCheck:
    name: str
    check_fn: callable
    impact: Impact
    timeout: float = 3.0

class ReadinessChecker:
    def __init__(self):
        self.dependencies: list[DependencyCheck] = []
        self._ready = False

    def add(self, name: str, check_fn, impact: Impact, timeout: float = 3.0):
        self.dependencies.append(
            DependencyCheck(name, check_fn, impact, timeout)
        )

    async def check(self) -> dict:
        results = {}
        is_ready = True
        
        checks = [
            self._run_check(dep) for dep in self.dependencies
        ]
        check_results = await asyncio.gather(*checks)
        
        for dep, (healthy, detail) in zip(self.dependencies, check_results):
            results[dep.name] = {
                "healthy": healthy,
                "impact": dep.impact.value,
                "detail": detail,
            }
            if not healthy and dep.impact == Impact.CRITICAL:
                is_ready = False
        
        self._ready = is_ready
        return {"ready": is_ready, "dependencies": results}

    async def _run_check(self, dep: DependencyCheck) -> tuple[bool, str]:
        try:
            result = await asyncio.wait_for(
                dep.check_fn(), timeout=dep.timeout
            )
            return (True, "ok")
        except asyncio.TimeoutError:
            return (False, f"timeout after {dep.timeout}s")
        except Exception as e:
            return (False, str(e))

Registration Example

readiness = ReadinessChecker()

async def check_db():
    async with db_pool.acquire() as conn:
        await conn.fetchval("SELECT 1")

async def check_redis():
    await redis_client.ping()

async def check_s3():
    await s3_client.head_bucket(Bucket="my-bucket")

readiness.add("postgres", check_db, Impact.CRITICAL)
readiness.add("redis", check_redis, Impact.DEGRADED, timeout=2.0)
readiness.add("s3", check_s3, Impact.OPTIONAL, timeout=5.0)

Startup Probes: Gating Initialization

For apps that load ML models or perform migrations:

class StartupGate:
    def __init__(self):
        self._checks: dict[str, bool] = {}
    
    def require(self, name: str):
        self._checks[name] = False
    
    def mark_ready(self, name: str):
        if name in self._checks:
            self._checks[name] = True
    
    @property
    def is_started(self) -> bool:
        return all(self._checks.values())
    
    @property
    def status(self) -> dict:
        return {
            "started": self.is_started,
            "checks": dict(self._checks),
        }

startup_gate = StartupGate()
startup_gate.require("database_migrations")
startup_gate.require("model_loading")
startup_gate.require("cache_warmup")

# During initialization:
async def init_app():
    await run_migrations()
    startup_gate.mark_ready("database_migrations")
    
    await load_ml_model()
    startup_gate.mark_ready("model_loading")
    
    await warm_cache()
    startup_gate.mark_ready("cache_warmup")

The startup endpoint:

@app.get("/startupz")
async def startup_check(response: Response):
    status = startup_gate.status
    if not status["started"]:
        response.status_code = 503
    return status

Complete Kubernetes Configuration

apiVersion: apps/v1
kind: Deployment
metadata:
  name: python-api
spec:
  replicas: 3
  template:
    spec:
      terminationGracePeriodSeconds: 35
      containers:
        - name: api
          image: myapp:latest
          ports:
            - containerPort: 8080
              name: http
            - containerPort: 8081
              name: liveness
          
          startupProbe:
            httpGet:
              path: /startupz
              port: http
            periodSeconds: 3
            failureThreshold: 40    # 120s max startup
            timeoutSeconds: 2
          
          livenessProbe:
            httpGet:
              path: /healthz
              port: liveness        # Separate port!
            periodSeconds: 10
            failureThreshold: 3     # 30s tolerance
            timeoutSeconds: 2
          
          readinessProbe:
            httpGet:
              path: /readyz
              port: http
            periodSeconds: 5
            failureThreshold: 2     # 10s to remove from LB
            timeoutSeconds: 3
          
          lifecycle:
            preStop:
              exec:
                command: ["sleep", "5"]

Key decisions:

  • Liveness on a separate port (8081) — if the main HTTP server is stuck, the liveness server on a separate thread still responds
  • Startup probe with high failureThreshold — allows up to 120 seconds for model loading
  • Readiness with low failureThreshold — removes unhealthy pods from traffic quickly
  • preStop sleep — allows kube-proxy to update before the app begins shutdown

Probe Response Caching

Running dependency checks every 5 seconds under load can be wasteful. Cache results briefly:

import time

class CachedReadiness:
    def __init__(self, checker: ReadinessChecker, ttl: float = 3.0):
        self._checker = checker
        self._ttl = ttl
        self._cached: dict | None = None
        self._cached_at: float = 0

    async def check(self) -> dict:
        now = time.monotonic()
        if self._cached and (now - self._cached_at) < self._ttl:
            return self._cached
        result = await self._checker.check()
        self._cached = result
        self._cached_at = now
        return result

A 3-second TTL means even with periodSeconds: 5, you never run checks more often than every 3 seconds, and concurrent probe requests share results.

Debugging Probe Failures

When probes fail in production, you need visibility. Add structured logging:

import structlog

logger = structlog.get_logger()

async def readiness_endpoint(response: Response):
    result = await readiness.check()
    
    if not result["ready"]:
        failed = {
            k: v for k, v in result["dependencies"].items() 
            if not v["healthy"]
        }
        logger.warning(
            "readiness_check_failed",
            failed_dependencies=failed,
        )
        response.status_code = 503
    
    return result

Also emit Kubernetes events for repeated failures:

# Quick debug: check probe status
kubectl describe pod <pod-name> | grep -A5 "Conditions:"
kubectl get events --field-selector involvedObject.name=<pod-name>

Anti-Pattern: The Thundering Herd

If all pods check the database at the same time (synchronized probe intervals), you get a spike of SELECT 1 queries. Add jitter to your probe configuration or your check timing:

import random

async def jittered_check(check_fn, max_jitter: float = 1.0):
    await asyncio.sleep(random.uniform(0, max_jitter))
    return await check_fn()

One thing to remember: Liveness detects stuck processes (keep it simple, dependency-free, possibly on a separate thread). Readiness gates traffic based on dependency health. Startup prevents premature probing during initialization. Each probe has a distinct purpose — mixing them up causes outages.

pythonkubernetesproduction

See Also