Python Alerting Patterns — Deep Dive
Production alerting is an engineering discipline, not a configuration exercise. This guide covers multi-window SLO alerting, Alertmanager internals, programmatic alert generation, anomaly detection, and operational patterns that keep on-call teams sane.
Multi-window, multi-burn-rate alerting
Google’s SRE book popularized this pattern. Instead of a single threshold, use multiple time windows and burn rates to catch both acute outages and slow degradation:
The math
Given an SLO of 99.9% (error budget = 0.1%), the monthly error budget is:
Budget = 30 days × 24 hours × 60 minutes × 0.001 = 43.2 minutes of downtime
Burn rate is how fast you consume the budget:
- 1x burn rate = consuming at exactly the allowed pace
- 14.4x = budget exhausted in 1 hour (if sustained for 5 min, already burned 1.2 minutes)
- 6x = budget exhausted in ~7 hours
- 1x = budget exhausted at end of month (normal)
Multi-window implementation
groups:
- name: slo-alerts
rules:
# Page: 14.4x burn for 2 min AND 14.4x burn over 1 hour
- alert: SLOFastBurn
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[5m]))
/ sum(rate(http_requests_total[5m]))
) > (14.4 * 0.001)
and
(
sum(rate(http_requests_total{status=~"5.."}[1h]))
/ sum(rate(http_requests_total[1h]))
) > (14.4 * 0.001)
for: 2m
labels:
severity: critical
annotations:
summary: "Rapid SLO budget burn detected"
# Ticket: 6x burn sustained over 30 min AND 6 hours
- alert: SLOSlowBurn
expr: |
(
sum(rate(http_requests_total{status=~"5.."}[30m]))
/ sum(rate(http_requests_total[30m]))
) > (6 * 0.001)
and
(
sum(rate(http_requests_total{status=~"5.."}[6h]))
/ sum(rate(http_requests_total[6h]))
) > (6 * 0.001)
for: 15m
labels:
severity: warning
The AND of short and long windows prevents both:
- False positives: A 30-second spike triggers the short window but not the long window.
- False negatives: A slow leak shows in the long window and triggers the short window check.
Alertmanager internals
Grouping
Alertmanager groups alerts by label to prevent notification floods:
route:
group_by: ['alertname', 'service']
group_wait: 30s # wait before sending first notification
group_interval: 5m # wait before sending updates to a group
repeat_interval: 4h # re-send if still firing
If 10 instances of HighLatency fire simultaneously, they’re grouped into one notification. Without grouping, the on-call engineer gets 10 pages.
Inhibition
Suppress lower-severity alerts when higher ones are firing:
inhibit_rules:
- source_match:
severity: critical
target_match:
severity: warning
equal: ['service']
If HighErrorRate (critical) is firing for the payment service, HighLatency (warning) for the same service is suppressed — the engineer already knows something is broken.
Silences
Temporary mute for planned maintenance:
import httpx
from datetime import datetime, timedelta
def silence_alert(alertmanager_url: str, service: str, duration_hours: int = 2):
"""Create a silence for a service during maintenance."""
now = datetime.utcnow()
response = httpx.post(f"{alertmanager_url}/api/v2/silences", json={
"matchers": [
{"name": "service", "value": service, "isRegex": False}
],
"startsAt": now.isoformat() + "Z",
"endsAt": (now + timedelta(hours=duration_hours)).isoformat() + "Z",
"createdBy": "deploy-bot",
"comment": f"Maintenance window for {service}"
})
response.raise_for_status()
return response.json()["silenceID"]
Programmatic alerting from Python
Custom health check framework
from dataclasses import dataclass
from enum import Enum
from typing import Callable
import asyncio
import httpx
class Severity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class AlertRule:
name: str
check: Callable[[], bool]
severity: Severity
message: str
runbook: str
cooldown_seconds: int = 300
class AlertEngine:
def __init__(self, notification_url: str):
self.rules: list[AlertRule] = []
self.last_fired: dict[str, float] = {}
self.notification_url = notification_url
def add_rule(self, rule: AlertRule):
self.rules.append(rule)
async def evaluate(self):
now = asyncio.get_event_loop().time()
for rule in self.rules:
try:
is_firing = rule.check()
except Exception as e:
is_firing = True
rule.message = f"Alert check itself failed: {e}"
if not is_firing:
continue
last = self.last_fired.get(rule.name, 0)
if now - last < rule.cooldown_seconds:
continue
self.last_fired[rule.name] = now
await self._notify(rule)
async def _notify(self, rule: AlertRule):
async with httpx.AsyncClient() as client:
await client.post(self.notification_url, json={
"text": (
f":{self._emoji(rule.severity)}: *[{rule.severity.value.upper()}]* "
f"{rule.name}\n{rule.message}\nRunbook: {rule.runbook}"
)
})
def _emoji(self, severity: Severity) -> str:
return {
Severity.INFO: "information_source",
Severity.WARNING: "warning",
Severity.CRITICAL: "rotating_light"
}[severity]
Usage:
engine = AlertEngine("https://hooks.slack.com/services/...")
engine.add_rule(AlertRule(
name="payment-failure-rate",
check=lambda: get_payment_failure_rate(minutes=10) > 0.05,
severity=Severity.CRITICAL,
message="Payment failure rate exceeds 5%",
runbook="https://wiki.internal/runbooks/payments"
))
# Run every minute
while True:
await engine.evaluate()
await asyncio.sleep(60)
Anomaly detection alerts
For metrics without obvious thresholds, use statistical anomaly detection:
import numpy as np
from collections import deque
class AnomalyDetector:
def __init__(self, window_size: int = 60, z_threshold: float = 3.0):
self.window = deque(maxlen=window_size)
self.z_threshold = z_threshold
def observe(self, value: float) -> bool:
"""Returns True if value is anomalous."""
self.window.append(value)
if len(self.window) < 10:
return False
mean = np.mean(self.window)
std = np.std(self.window)
if std == 0:
return False
z_score = abs(value - mean) / std
return z_score > self.z_threshold
# Usage
latency_detector = AnomalyDetector(window_size=120, z_threshold=3.5)
for sample in metrics_stream():
if latency_detector.observe(sample.p95_latency):
fire_alert(f"Anomalous latency detected: {sample.p95_latency:.3f}s")
This catches gradual degradation that static thresholds miss.
Alert fatigue mitigation
Measuring alert quality
Track these metrics for your alerting system itself:
ALERT_METRICS = {
"total_alerts": Counter("alerts_total", "Total alerts fired", ["name", "severity"]),
"actionable_alerts": Counter("alerts_actionable_total", "Alerts that required action"),
"time_to_ack": Histogram("alert_ack_seconds", "Time to acknowledge alert"),
"time_to_resolve": Histogram("alert_resolve_seconds", "Time from fire to resolve"),
}
Target: >80% of alerts should be actionable. If a rule fires 50 times a month and only 5 require action, delete or tune it.
Alert review process
Monthly review meeting:
- List all alerts that fired in the past month.
- For each: was it actionable? Was the runbook sufficient?
- Delete alerts with <50% action rate.
- Improve runbooks for alerts where resolution took >30 minutes.
- Add alerts for incidents that had no alert.
Deduplication and flapping prevention
# Alertmanager route with dedup
route:
group_wait: 1m
group_interval: 10m
repeat_interval: 4h
For alerts that flap (fire/resolve/fire rapidly), add hysteresis:
# Prometheus rule with longer "for" duration
- alert: HighLatency
expr: histogram_quantile(0.95, rate(http_request_seconds_bucket[5m])) > 0.5
for: 10m # must be high for 10 minutes, not just a spike
Escalation patterns
Time-based escalation
# PagerDuty-style escalation
route:
routes:
- match:
severity: critical
receiver: oncall-primary
continue: true
routes:
# If not acknowledged in 15 minutes, escalate
- match:
severity: critical
receiver: oncall-secondary
group_wait: 15m
Python-based escalation with state tracking
class EscalationManager:
def __init__(self):
self.active_alerts: dict[str, dict] = {}
async def handle_alert(self, alert_name: str, severity: str):
now = time.time()
if alert_name not in self.active_alerts:
self.active_alerts[alert_name] = {
"first_fired": now,
"escalation_level": 0
}
await notify_oncall_primary(alert_name)
else:
state = self.active_alerts[alert_name]
elapsed = now - state["first_fired"]
if elapsed > 900 and state["escalation_level"] < 1: # 15 min
state["escalation_level"] = 1
await notify_oncall_secondary(alert_name)
elif elapsed > 3600 and state["escalation_level"] < 2: # 1 hour
state["escalation_level"] = 2
await notify_engineering_manager(alert_name)
Testing alerts
Unit testing alert rules
# test_alert_rules.py
import subprocess
import yaml
def test_high_error_rate_fires():
"""Verify alert rule fires on synthetic data."""
result = subprocess.run(
["promtool", "test", "rules", "test_data/high_error_rate.yml"],
capture_output=True, text=True
)
assert result.returncode == 0, f"Alert test failed: {result.stderr}"
Prometheus’s promtool can evaluate rules against synthetic time-series data, ensuring your PromQL expressions trigger when expected.
Chaos testing
Periodically inject failures and verify alerts fire:
async def chaos_test_alerting():
"""Inject a known failure and verify alert fires within SLA."""
inject_500_errors(rate=0.05, duration_seconds=120)
alert_fired = await wait_for_alert("HighErrorRate", timeout_seconds=300)
assert alert_fired, "Alert did not fire within 5 minutes of injected failure"
clear_injected_errors()
One thing to remember: Alerting is a feedback loop. Measure the quality of your alerts (actionable rate, time-to-resolve), review them monthly, and ruthlessly delete noisy rules. An alerting system you trust is worth more than one that covers every edge case.
See Also
- Python Correlation Ids Correlation IDs are name tags for requests — they let you follow one visitor's journey through a crowded theme park of services.
- Python Grafana Dashboards Python Grafana turns boring numbers from your Python app into colorful, real-time dashboards — like a car's dashboard but for your code.
- Python Log Aggregation Elk ELK collects scattered log files from all your services into one searchable place — like gathering every sticky note in the office into a single filing cabinet.
- Python Logging Best Practices Treat logs like a flight recorder so you can understand failures after they happen, not just during development.
- Python Logging Handlers Think of logging handlers as mailboxes that decide where your app's messages end up — screen, file, or faraway server.