Python Service Health Dashboards — Deep Dive

Async health check engine

For dashboards monitoring dozens of services, an async check engine outperforms thread pools by handling I/O-bound checks more efficiently:

import asyncio
import httpx
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Awaitable

class Status(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

@dataclass
class HealthResult:
    service: str
    status: Status
    response_time_ms: float
    message: str = ""
    metadata: dict = None

    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

class AsyncHealthEngine:
    def __init__(self):
        self.checks: list[tuple[str, Callable[[], Awaitable[HealthResult]]]] = []
        self._client: httpx.AsyncClient | None = None

    async def _get_client(self) -> httpx.AsyncClient:
        if self._client is None or self._client.is_closed:
            self._client = httpx.AsyncClient(timeout=10.0)
        return self._client

    def add_http_check(self, name: str, url: str, timeout: float = 5.0,
                        expected_status: int = 200, degraded_ms: float = 2000):
        async def check() -> HealthResult:
            client = await self._get_client()
            start = time.monotonic()
            try:
                resp = await client.get(url, timeout=timeout)
                elapsed = (time.monotonic() - start) * 1000

                if resp.status_code != expected_status:
                    return HealthResult(name, Status.UNHEALTHY, elapsed,
                                      f"Expected {expected_status}, got {resp.status_code}")

                status = Status.DEGRADED if elapsed > degraded_ms else Status.HEALTHY
                return HealthResult(name, status, elapsed)
            except Exception as e:
                elapsed = (time.monotonic() - start) * 1000
                return HealthResult(name, Status.UNHEALTHY, elapsed, str(e))

        self.checks.append((name, check))

    def add_tcp_check(self, name: str, host: str, port: int,
                       timeout: float = 3.0, degraded_ms: float = 500):
        async def check() -> HealthResult:
            start = time.monotonic()
            try:
                reader, writer = await asyncio.wait_for(
                    asyncio.open_connection(host, port),
                    timeout=timeout
                )
                writer.close()
                await writer.wait_closed()
                elapsed = (time.monotonic() - start) * 1000
                status = Status.DEGRADED if elapsed > degraded_ms else Status.HEALTHY
                return HealthResult(name, status, elapsed)
            except Exception as e:
                elapsed = (time.monotonic() - start) * 1000
                return HealthResult(name, Status.UNHEALTHY, elapsed, str(e))

        self.checks.append((name, check))

    def add_custom_check(self, name: str, fn: Callable[[], Awaitable[HealthResult]]):
        self.checks.append((name, fn))

    async def run_all(self) -> dict:
        tasks = [check_fn() for _, check_fn in self.checks]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        health_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                name = self.checks[i][0]
                health_results.append(
                    HealthResult(name, Status.UNHEALTHY, 0, str(result))
                )
            else:
                health_results.append(result)

        statuses = [r.status for r in health_results]
        if Status.UNHEALTHY in statuses:
            overall = Status.UNHEALTHY
        elif Status.DEGRADED in statuses:
            overall = Status.DEGRADED
        else:
            overall = Status.HEALTHY

        return {
            'overall': overall.value,
            'services': [
                {
                    'name': r.service,
                    'status': r.status.value,
                    'response_time_ms': round(r.response_time_ms, 1),
                    'message': r.message,
                    'metadata': r.metadata,
                }
                for r in sorted(health_results, key=lambda r: r.service)
            ],
            'checked_at': time.time(),
        }

    async def close(self):
        if self._client and not self._client.is_closed:
            await self._client.aclose()

SLA tracking and uptime calculation

Tracking historical health data to calculate uptime percentages:

import json
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timedelta

class SLATracker:
    def __init__(self, data_dir: str = '/var/lib/health-dashboard'):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(parents=True, exist_ok=True)

    def record(self, report: dict):
        date_str = datetime.utcnow().strftime('%Y-%m-%d')
        filepath = self.data_dir / f'checks-{date_str}.jsonl'
        with open(filepath, 'a') as f:
            f.write(json.dumps(report, default=str) + '\n')

    def uptime_percentage(self, service: str, days: int = 30) -> dict:
        total_checks = 0
        healthy_checks = 0
        degraded_checks = 0

        for day_offset in range(days):
            date = datetime.utcnow() - timedelta(days=day_offset)
            filepath = self.data_dir / f'checks-{date.strftime("%Y-%m-%d")}.jsonl'

            if not filepath.exists():
                continue

            for line in filepath.read_text().strip().split('\n'):
                if not line:
                    continue
                report = json.loads(line)
                for svc in report.get('services', []):
                    if svc['name'] == service:
                        total_checks += 1
                        if svc['status'] == 'healthy':
                            healthy_checks += 1
                        elif svc['status'] == 'degraded':
                            degraded_checks += 1

        if total_checks == 0:
            return {'uptime': None, 'reason': 'no data'}

        uptime = healthy_checks / total_checks * 100
        availability = (healthy_checks + degraded_checks) / total_checks * 100

        return {
            'uptime_pct': round(uptime, 4),
            'availability_pct': round(availability, 4),
            'total_checks': total_checks,
            'healthy': healthy_checks,
            'degraded': degraded_checks,
            'unhealthy': total_checks - healthy_checks - degraded_checks,
            'period_days': days,
            'sla_met': availability >= 99.9,  # Three nines
        }

    def daily_summary(self, service: str, days: int = 7) -> list[dict]:
        summaries = []
        for day_offset in range(days):
            date = datetime.utcnow() - timedelta(days=day_offset)
            date_str = date.strftime('%Y-%m-%d')
            filepath = self.data_dir / f'checks-{date_str}.jsonl'

            total = healthy = 0
            response_times = []

            if filepath.exists():
                for line in filepath.read_text().strip().split('\n'):
                    if not line:
                        continue
                    report = json.loads(line)
                    for svc in report.get('services', []):
                        if svc['name'] == service:
                            total += 1
                            if svc['status'] == 'healthy':
                                healthy += 1
                            response_times.append(svc.get('response_time_ms', 0))

            summaries.append({
                'date': date_str,
                'uptime_pct': round(healthy / total * 100, 2) if total > 0 else None,
                'avg_response_ms': round(sum(response_times) / len(response_times), 1)
                    if response_times else None,
                'p95_response_ms': round(sorted(response_times)[int(len(response_times) * 0.95)], 1)
                    if len(response_times) > 1 else None,
                'checks': total,
            })

        return summaries

Incident detection and tracking

Automatically detecting incidents from health check results:

from dataclasses import dataclass, field

@dataclass
class Incident:
    service: str
    started_at: float
    resolved_at: float = 0
    check_count: int = 0
    is_active: bool = True

    @property
    def duration_minutes(self) -> float:
        end = self.resolved_at if self.resolved_at else time.time()
        return (end - self.started_at) / 60

class IncidentDetector:
    def __init__(self, threshold: int = 3):
        """Require `threshold` consecutive failures to declare an incident."""
        self.threshold = threshold
        self.failure_counts: dict[str, int] = defaultdict(int)
        self.active_incidents: dict[str, Incident] = {}
        self.resolved_incidents: list[Incident] = []

    def process(self, report: dict) -> dict:
        new_incidents = []
        resolved = []

        for svc in report.get('services', []):
            name = svc['name']

            if svc['status'] == 'unhealthy':
                self.failure_counts[name] += 1

                if (self.failure_counts[name] >= self.threshold
                        and name not in self.active_incidents):
                    incident = Incident(
                        service=name,
                        started_at=report['checked_at'],
                    )
                    self.active_incidents[name] = incident
                    new_incidents.append(incident)

                if name in self.active_incidents:
                    self.active_incidents[name].check_count += 1

            else:
                self.failure_counts[name] = 0

                if name in self.active_incidents:
                    incident = self.active_incidents.pop(name)
                    incident.resolved_at = report['checked_at']
                    incident.is_active = False
                    self.resolved_incidents.append(incident)
                    resolved.append(incident)

        return {
            'new_incidents': [
                {'service': i.service, 'started': i.started_at}
                for i in new_incidents
            ],
            'resolved_incidents': [
                {'service': i.service, 'duration_min': round(i.duration_minutes, 1)}
                for i in resolved
            ],
            'active_incidents': [
                {
                    'service': i.service,
                    'duration_min': round(i.duration_minutes, 1),
                    'checks_failed': i.check_count,
                }
                for i in self.active_incidents.values()
            ],
        }

Dependency graph visualization

Services depend on each other. Visualizing these dependencies helps understand cascading failures:

class ServiceDependencyGraph:
    def __init__(self):
        self.dependencies: dict[str, list[str]] = {}

    def add_service(self, name: str, depends_on: list[str]):
        self.dependencies[name] = depends_on

    def affected_by(self, failed_service: str) -> list[str]:
        """Find all services that would be affected if a service fails."""
        affected = []
        visited = set()

        def dfs(service):
            for svc, deps in self.dependencies.items():
                if service in deps and svc not in visited:
                    visited.add(svc)
                    affected.append(svc)
                    dfs(svc)

        dfs(failed_service)
        return affected

    def root_cause(self, unhealthy_services: list[str]) -> list[str]:
        """Identify likely root causes from a set of unhealthy services."""
        roots = []
        unhealthy_set = set(unhealthy_services)

        for svc in unhealthy_services:
            deps = self.dependencies.get(svc, [])
            # If any dependency is also unhealthy, this service might be a victim
            has_unhealthy_dep = any(d in unhealthy_set for d in deps)
            if not has_unhealthy_dep:
                roots.append(svc)

        return roots

# Example usage
graph = ServiceDependencyGraph()
graph.add_service("web-frontend", ["api-gateway"])
graph.add_service("api-gateway", ["user-service", "product-service", "auth-service"])
graph.add_service("user-service", ["postgres", "redis"])
graph.add_service("product-service", ["postgres", "elasticsearch"])
graph.add_service("auth-service", ["postgres", "redis"])
graph.add_service("postgres", [])
graph.add_service("redis", [])
graph.add_service("elasticsearch", [])

# If postgres goes down, what breaks?
affected = graph.affected_by("postgres")
# ['user-service', 'product-service', 'auth-service', 'api-gateway', 'web-frontend']

Full dashboard with FastAPI and WebSockets

For a real-time dashboard that pushes updates to the browser:

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import asyncio
import json

app = FastAPI()
engine = AsyncHealthEngine()
sla_tracker = SLATracker()
incident_detector = IncidentDetector()

# Configure checks
engine.add_http_check("API Gateway", "https://api.example.com/health")
engine.add_http_check("User Service", "https://users.internal/health")
engine.add_tcp_check("PostgreSQL", "db.internal", 5432)
engine.add_tcp_check("Redis", "redis.internal", 6379)

connected_clients: list[WebSocket] = []

async def health_check_loop():
    """Background task that runs checks and pushes to WebSocket clients."""
    while True:
        report = await engine.run_all()
        sla_tracker.record(report)
        incidents = incident_detector.process(report)
        report['incidents'] = incidents

        # Push to all connected clients
        message = json.dumps(report, default=str)
        disconnected = []
        for ws in connected_clients:
            try:
                await ws.send_text(message)
            except Exception:
                disconnected.append(ws)

        for ws in disconnected:
            connected_clients.remove(ws)

        await asyncio.sleep(15)

@app.on_event("startup")
async def startup():
    asyncio.create_task(health_check_loop())

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    connected_clients.append(websocket)
    try:
        while True:
            await websocket.receive_text()
    except Exception:
        connected_clients.remove(websocket)

@app.get("/api/sla/{service}")
async def get_sla(service: str, days: int = 30):
    return sla_tracker.uptime_percentage(service, days)

@app.get("/api/history/{service}")
async def get_history(service: str, days: int = 7):
    return sla_tracker.daily_summary(service, days)

Check tuning for production

Timeout configuration

Different services need different timeout thresholds:

Service typeHealthy thresholdDegraded thresholdTimeout
In-memory cache (Redis)< 50ms< 500ms3s
Database query< 200ms< 1000ms5s
Internal HTTP API< 500ms< 2000ms10s
External API< 1000ms< 5000ms15s

Check frequency

  • Critical path services (database, auth): every 15 seconds
  • Supporting services (search, analytics): every 30-60 seconds
  • External dependencies (third-party APIs): every 2-5 minutes (avoid rate limits)

Flap detection

Services that rapidly switch between healthy and unhealthy cause alert fatigue. Require multiple consecutive failures before changing status:

class FlapDetector:
    def __init__(self, stable_count: int = 3):
        self.stable_count = stable_count
        self.pending_changes: dict[str, list[str]] = defaultdict(list)
        self.current_status: dict[str, str] = {}

    def get_stable_status(self, service: str, raw_status: str) -> str:
        history = self.pending_changes[service]
        history.append(raw_status)

        if len(history) > self.stable_count:
            history.pop(0)

        if len(set(history)) == 1 and len(history) >= self.stable_count:
            self.current_status[service] = raw_status

        return self.current_status.get(service, raw_status)

One thing to remember: A production health dashboard is more than a grid of green and red dots — it is an observability system that combines async health checks, SLA tracking, incident detection with flap suppression, dependency-aware root cause analysis, and real-time push updates so operators can diagnose and resolve problems in seconds, not hours.

pythonmonitoringdashboardssystem-administrationdevops

See Also

  • Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
  • Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
  • Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
  • Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
  • Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.