Python Service Health Dashboards — Deep Dive
Async health check engine
For dashboards monitoring dozens of services, an async check engine outperforms thread pools by handling I/O-bound checks more efficiently:
import asyncio
import httpx
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Awaitable
class Status(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
@dataclass
class HealthResult:
service: str
status: Status
response_time_ms: float
message: str = ""
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class AsyncHealthEngine:
def __init__(self):
self.checks: list[tuple[str, Callable[[], Awaitable[HealthResult]]]] = []
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(timeout=10.0)
return self._client
def add_http_check(self, name: str, url: str, timeout: float = 5.0,
expected_status: int = 200, degraded_ms: float = 2000):
async def check() -> HealthResult:
client = await self._get_client()
start = time.monotonic()
try:
resp = await client.get(url, timeout=timeout)
elapsed = (time.monotonic() - start) * 1000
if resp.status_code != expected_status:
return HealthResult(name, Status.UNHEALTHY, elapsed,
f"Expected {expected_status}, got {resp.status_code}")
status = Status.DEGRADED if elapsed > degraded_ms else Status.HEALTHY
return HealthResult(name, status, elapsed)
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
return HealthResult(name, Status.UNHEALTHY, elapsed, str(e))
self.checks.append((name, check))
def add_tcp_check(self, name: str, host: str, port: int,
timeout: float = 3.0, degraded_ms: float = 500):
async def check() -> HealthResult:
start = time.monotonic()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host, port),
timeout=timeout
)
writer.close()
await writer.wait_closed()
elapsed = (time.monotonic() - start) * 1000
status = Status.DEGRADED if elapsed > degraded_ms else Status.HEALTHY
return HealthResult(name, status, elapsed)
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
return HealthResult(name, Status.UNHEALTHY, elapsed, str(e))
self.checks.append((name, check))
def add_custom_check(self, name: str, fn: Callable[[], Awaitable[HealthResult]]):
self.checks.append((name, fn))
async def run_all(self) -> dict:
tasks = [check_fn() for _, check_fn in self.checks]
results = await asyncio.gather(*tasks, return_exceptions=True)
health_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
name = self.checks[i][0]
health_results.append(
HealthResult(name, Status.UNHEALTHY, 0, str(result))
)
else:
health_results.append(result)
statuses = [r.status for r in health_results]
if Status.UNHEALTHY in statuses:
overall = Status.UNHEALTHY
elif Status.DEGRADED in statuses:
overall = Status.DEGRADED
else:
overall = Status.HEALTHY
return {
'overall': overall.value,
'services': [
{
'name': r.service,
'status': r.status.value,
'response_time_ms': round(r.response_time_ms, 1),
'message': r.message,
'metadata': r.metadata,
}
for r in sorted(health_results, key=lambda r: r.service)
],
'checked_at': time.time(),
}
async def close(self):
if self._client and not self._client.is_closed:
await self._client.aclose()
SLA tracking and uptime calculation
Tracking historical health data to calculate uptime percentages:
import json
from pathlib import Path
from collections import defaultdict
from datetime import datetime, timedelta
class SLATracker:
def __init__(self, data_dir: str = '/var/lib/health-dashboard'):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
def record(self, report: dict):
date_str = datetime.utcnow().strftime('%Y-%m-%d')
filepath = self.data_dir / f'checks-{date_str}.jsonl'
with open(filepath, 'a') as f:
f.write(json.dumps(report, default=str) + '\n')
def uptime_percentage(self, service: str, days: int = 30) -> dict:
total_checks = 0
healthy_checks = 0
degraded_checks = 0
for day_offset in range(days):
date = datetime.utcnow() - timedelta(days=day_offset)
filepath = self.data_dir / f'checks-{date.strftime("%Y-%m-%d")}.jsonl'
if not filepath.exists():
continue
for line in filepath.read_text().strip().split('\n'):
if not line:
continue
report = json.loads(line)
for svc in report.get('services', []):
if svc['name'] == service:
total_checks += 1
if svc['status'] == 'healthy':
healthy_checks += 1
elif svc['status'] == 'degraded':
degraded_checks += 1
if total_checks == 0:
return {'uptime': None, 'reason': 'no data'}
uptime = healthy_checks / total_checks * 100
availability = (healthy_checks + degraded_checks) / total_checks * 100
return {
'uptime_pct': round(uptime, 4),
'availability_pct': round(availability, 4),
'total_checks': total_checks,
'healthy': healthy_checks,
'degraded': degraded_checks,
'unhealthy': total_checks - healthy_checks - degraded_checks,
'period_days': days,
'sla_met': availability >= 99.9, # Three nines
}
def daily_summary(self, service: str, days: int = 7) -> list[dict]:
summaries = []
for day_offset in range(days):
date = datetime.utcnow() - timedelta(days=day_offset)
date_str = date.strftime('%Y-%m-%d')
filepath = self.data_dir / f'checks-{date_str}.jsonl'
total = healthy = 0
response_times = []
if filepath.exists():
for line in filepath.read_text().strip().split('\n'):
if not line:
continue
report = json.loads(line)
for svc in report.get('services', []):
if svc['name'] == service:
total += 1
if svc['status'] == 'healthy':
healthy += 1
response_times.append(svc.get('response_time_ms', 0))
summaries.append({
'date': date_str,
'uptime_pct': round(healthy / total * 100, 2) if total > 0 else None,
'avg_response_ms': round(sum(response_times) / len(response_times), 1)
if response_times else None,
'p95_response_ms': round(sorted(response_times)[int(len(response_times) * 0.95)], 1)
if len(response_times) > 1 else None,
'checks': total,
})
return summaries
Incident detection and tracking
Automatically detecting incidents from health check results:
from dataclasses import dataclass, field
@dataclass
class Incident:
service: str
started_at: float
resolved_at: float = 0
check_count: int = 0
is_active: bool = True
@property
def duration_minutes(self) -> float:
end = self.resolved_at if self.resolved_at else time.time()
return (end - self.started_at) / 60
class IncidentDetector:
def __init__(self, threshold: int = 3):
"""Require `threshold` consecutive failures to declare an incident."""
self.threshold = threshold
self.failure_counts: dict[str, int] = defaultdict(int)
self.active_incidents: dict[str, Incident] = {}
self.resolved_incidents: list[Incident] = []
def process(self, report: dict) -> dict:
new_incidents = []
resolved = []
for svc in report.get('services', []):
name = svc['name']
if svc['status'] == 'unhealthy':
self.failure_counts[name] += 1
if (self.failure_counts[name] >= self.threshold
and name not in self.active_incidents):
incident = Incident(
service=name,
started_at=report['checked_at'],
)
self.active_incidents[name] = incident
new_incidents.append(incident)
if name in self.active_incidents:
self.active_incidents[name].check_count += 1
else:
self.failure_counts[name] = 0
if name in self.active_incidents:
incident = self.active_incidents.pop(name)
incident.resolved_at = report['checked_at']
incident.is_active = False
self.resolved_incidents.append(incident)
resolved.append(incident)
return {
'new_incidents': [
{'service': i.service, 'started': i.started_at}
for i in new_incidents
],
'resolved_incidents': [
{'service': i.service, 'duration_min': round(i.duration_minutes, 1)}
for i in resolved
],
'active_incidents': [
{
'service': i.service,
'duration_min': round(i.duration_minutes, 1),
'checks_failed': i.check_count,
}
for i in self.active_incidents.values()
],
}
Dependency graph visualization
Services depend on each other. Visualizing these dependencies helps understand cascading failures:
class ServiceDependencyGraph:
def __init__(self):
self.dependencies: dict[str, list[str]] = {}
def add_service(self, name: str, depends_on: list[str]):
self.dependencies[name] = depends_on
def affected_by(self, failed_service: str) -> list[str]:
"""Find all services that would be affected if a service fails."""
affected = []
visited = set()
def dfs(service):
for svc, deps in self.dependencies.items():
if service in deps and svc not in visited:
visited.add(svc)
affected.append(svc)
dfs(svc)
dfs(failed_service)
return affected
def root_cause(self, unhealthy_services: list[str]) -> list[str]:
"""Identify likely root causes from a set of unhealthy services."""
roots = []
unhealthy_set = set(unhealthy_services)
for svc in unhealthy_services:
deps = self.dependencies.get(svc, [])
# If any dependency is also unhealthy, this service might be a victim
has_unhealthy_dep = any(d in unhealthy_set for d in deps)
if not has_unhealthy_dep:
roots.append(svc)
return roots
# Example usage
graph = ServiceDependencyGraph()
graph.add_service("web-frontend", ["api-gateway"])
graph.add_service("api-gateway", ["user-service", "product-service", "auth-service"])
graph.add_service("user-service", ["postgres", "redis"])
graph.add_service("product-service", ["postgres", "elasticsearch"])
graph.add_service("auth-service", ["postgres", "redis"])
graph.add_service("postgres", [])
graph.add_service("redis", [])
graph.add_service("elasticsearch", [])
# If postgres goes down, what breaks?
affected = graph.affected_by("postgres")
# ['user-service', 'product-service', 'auth-service', 'api-gateway', 'web-frontend']
Full dashboard with FastAPI and WebSockets
For a real-time dashboard that pushes updates to the browser:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import asyncio
import json
app = FastAPI()
engine = AsyncHealthEngine()
sla_tracker = SLATracker()
incident_detector = IncidentDetector()
# Configure checks
engine.add_http_check("API Gateway", "https://api.example.com/health")
engine.add_http_check("User Service", "https://users.internal/health")
engine.add_tcp_check("PostgreSQL", "db.internal", 5432)
engine.add_tcp_check("Redis", "redis.internal", 6379)
connected_clients: list[WebSocket] = []
async def health_check_loop():
"""Background task that runs checks and pushes to WebSocket clients."""
while True:
report = await engine.run_all()
sla_tracker.record(report)
incidents = incident_detector.process(report)
report['incidents'] = incidents
# Push to all connected clients
message = json.dumps(report, default=str)
disconnected = []
for ws in connected_clients:
try:
await ws.send_text(message)
except Exception:
disconnected.append(ws)
for ws in disconnected:
connected_clients.remove(ws)
await asyncio.sleep(15)
@app.on_event("startup")
async def startup():
asyncio.create_task(health_check_loop())
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connected_clients.append(websocket)
try:
while True:
await websocket.receive_text()
except Exception:
connected_clients.remove(websocket)
@app.get("/api/sla/{service}")
async def get_sla(service: str, days: int = 30):
return sla_tracker.uptime_percentage(service, days)
@app.get("/api/history/{service}")
async def get_history(service: str, days: int = 7):
return sla_tracker.daily_summary(service, days)
Check tuning for production
Timeout configuration
Different services need different timeout thresholds:
| Service type | Healthy threshold | Degraded threshold | Timeout |
|---|---|---|---|
| In-memory cache (Redis) | < 50ms | < 500ms | 3s |
| Database query | < 200ms | < 1000ms | 5s |
| Internal HTTP API | < 500ms | < 2000ms | 10s |
| External API | < 1000ms | < 5000ms | 15s |
Check frequency
- Critical path services (database, auth): every 15 seconds
- Supporting services (search, analytics): every 30-60 seconds
- External dependencies (third-party APIs): every 2-5 minutes (avoid rate limits)
Flap detection
Services that rapidly switch between healthy and unhealthy cause alert fatigue. Require multiple consecutive failures before changing status:
class FlapDetector:
def __init__(self, stable_count: int = 3):
self.stable_count = stable_count
self.pending_changes: dict[str, list[str]] = defaultdict(list)
self.current_status: dict[str, str] = {}
def get_stable_status(self, service: str, raw_status: str) -> str:
history = self.pending_changes[service]
history.append(raw_status)
if len(history) > self.stable_count:
history.pop(0)
if len(set(history)) == 1 and len(history) >= self.stable_count:
self.current_status[service] = raw_status
return self.current_status.get(service, raw_status)
One thing to remember: A production health dashboard is more than a grid of green and red dots — it is an observability system that combines async health checks, SLA tracking, incident detection with flap suppression, dependency-aware root cause analysis, and real-time push updates so operators can diagnose and resolve problems in seconds, not hours.
See Also
- Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
- Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
- Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
- Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
- Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.