Python Alerting Patterns — Deep Dive

Production alerting is an engineering discipline, not a configuration exercise. This guide covers multi-window SLO alerting, Alertmanager internals, programmatic alert generation, anomaly detection, and operational patterns that keep on-call teams sane.

Multi-window, multi-burn-rate alerting

Google’s SRE book popularized this pattern. Instead of a single threshold, use multiple time windows and burn rates to catch both acute outages and slow degradation:

The math

Given an SLO of 99.9% (error budget = 0.1%), the monthly error budget is:

Budget = 30 days × 24 hours × 60 minutes × 0.001 = 43.2 minutes of downtime

Burn rate is how fast you consume the budget:

  • 1x burn rate = consuming at exactly the allowed pace
  • 14.4x = budget exhausted in 1 hour (if sustained for 5 min, already burned 1.2 minutes)
  • 6x = budget exhausted in ~7 hours
  • 1x = budget exhausted at end of month (normal)

Multi-window implementation

groups:
  - name: slo-alerts
    rules:
      # Page: 14.4x burn for 2 min AND 14.4x burn over 1 hour
      - alert: SLOFastBurn
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[5m]))
            / sum(rate(http_requests_total[5m]))
          ) > (14.4 * 0.001)
          and
          (
            sum(rate(http_requests_total{status=~"5.."}[1h]))
            / sum(rate(http_requests_total[1h]))
          ) > (14.4 * 0.001)
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Rapid SLO budget burn detected"

      # Ticket: 6x burn sustained over 30 min AND 6 hours
      - alert: SLOSlowBurn
        expr: |
          (
            sum(rate(http_requests_total{status=~"5.."}[30m]))
            / sum(rate(http_requests_total[30m]))
          ) > (6 * 0.001)
          and
          (
            sum(rate(http_requests_total{status=~"5.."}[6h]))
            / sum(rate(http_requests_total[6h]))
          ) > (6 * 0.001)
        for: 15m
        labels:
          severity: warning

The AND of short and long windows prevents both:

  • False positives: A 30-second spike triggers the short window but not the long window.
  • False negatives: A slow leak shows in the long window and triggers the short window check.

Alertmanager internals

Grouping

Alertmanager groups alerts by label to prevent notification floods:

route:
  group_by: ['alertname', 'service']
  group_wait: 30s      # wait before sending first notification
  group_interval: 5m   # wait before sending updates to a group
  repeat_interval: 4h  # re-send if still firing

If 10 instances of HighLatency fire simultaneously, they’re grouped into one notification. Without grouping, the on-call engineer gets 10 pages.

Inhibition

Suppress lower-severity alerts when higher ones are firing:

inhibit_rules:
  - source_match:
      severity: critical
    target_match:
      severity: warning
    equal: ['service']

If HighErrorRate (critical) is firing for the payment service, HighLatency (warning) for the same service is suppressed — the engineer already knows something is broken.

Silences

Temporary mute for planned maintenance:

import httpx
from datetime import datetime, timedelta

def silence_alert(alertmanager_url: str, service: str, duration_hours: int = 2):
    """Create a silence for a service during maintenance."""
    now = datetime.utcnow()
    response = httpx.post(f"{alertmanager_url}/api/v2/silences", json={
        "matchers": [
            {"name": "service", "value": service, "isRegex": False}
        ],
        "startsAt": now.isoformat() + "Z",
        "endsAt": (now + timedelta(hours=duration_hours)).isoformat() + "Z",
        "createdBy": "deploy-bot",
        "comment": f"Maintenance window for {service}"
    })
    response.raise_for_status()
    return response.json()["silenceID"]

Programmatic alerting from Python

Custom health check framework

from dataclasses import dataclass
from enum import Enum
from typing import Callable
import asyncio
import httpx

class Severity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class AlertRule:
    name: str
    check: Callable[[], bool]
    severity: Severity
    message: str
    runbook: str
    cooldown_seconds: int = 300

class AlertEngine:
    def __init__(self, notification_url: str):
        self.rules: list[AlertRule] = []
        self.last_fired: dict[str, float] = {}
        self.notification_url = notification_url

    def add_rule(self, rule: AlertRule):
        self.rules.append(rule)

    async def evaluate(self):
        now = asyncio.get_event_loop().time()
        for rule in self.rules:
            try:
                is_firing = rule.check()
            except Exception as e:
                is_firing = True
                rule.message = f"Alert check itself failed: {e}"

            if not is_firing:
                continue

            last = self.last_fired.get(rule.name, 0)
            if now - last < rule.cooldown_seconds:
                continue

            self.last_fired[rule.name] = now
            await self._notify(rule)

    async def _notify(self, rule: AlertRule):
        async with httpx.AsyncClient() as client:
            await client.post(self.notification_url, json={
                "text": (
                    f":{self._emoji(rule.severity)}: *[{rule.severity.value.upper()}]* "
                    f"{rule.name}\n{rule.message}\nRunbook: {rule.runbook}"
                )
            })

    def _emoji(self, severity: Severity) -> str:
        return {
            Severity.INFO: "information_source",
            Severity.WARNING: "warning",
            Severity.CRITICAL: "rotating_light"
        }[severity]

Usage:

engine = AlertEngine("https://hooks.slack.com/services/...")

engine.add_rule(AlertRule(
    name="payment-failure-rate",
    check=lambda: get_payment_failure_rate(minutes=10) > 0.05,
    severity=Severity.CRITICAL,
    message="Payment failure rate exceeds 5%",
    runbook="https://wiki.internal/runbooks/payments"
))

# Run every minute
while True:
    await engine.evaluate()
    await asyncio.sleep(60)

Anomaly detection alerts

For metrics without obvious thresholds, use statistical anomaly detection:

import numpy as np
from collections import deque

class AnomalyDetector:
    def __init__(self, window_size: int = 60, z_threshold: float = 3.0):
        self.window = deque(maxlen=window_size)
        self.z_threshold = z_threshold

    def observe(self, value: float) -> bool:
        """Returns True if value is anomalous."""
        self.window.append(value)
        if len(self.window) < 10:
            return False

        mean = np.mean(self.window)
        std = np.std(self.window)
        if std == 0:
            return False

        z_score = abs(value - mean) / std
        return z_score > self.z_threshold

# Usage
latency_detector = AnomalyDetector(window_size=120, z_threshold=3.5)

for sample in metrics_stream():
    if latency_detector.observe(sample.p95_latency):
        fire_alert(f"Anomalous latency detected: {sample.p95_latency:.3f}s")

This catches gradual degradation that static thresholds miss.

Alert fatigue mitigation

Measuring alert quality

Track these metrics for your alerting system itself:

ALERT_METRICS = {
    "total_alerts": Counter("alerts_total", "Total alerts fired", ["name", "severity"]),
    "actionable_alerts": Counter("alerts_actionable_total", "Alerts that required action"),
    "time_to_ack": Histogram("alert_ack_seconds", "Time to acknowledge alert"),
    "time_to_resolve": Histogram("alert_resolve_seconds", "Time from fire to resolve"),
}

Target: >80% of alerts should be actionable. If a rule fires 50 times a month and only 5 require action, delete or tune it.

Alert review process

Monthly review meeting:

  1. List all alerts that fired in the past month.
  2. For each: was it actionable? Was the runbook sufficient?
  3. Delete alerts with <50% action rate.
  4. Improve runbooks for alerts where resolution took >30 minutes.
  5. Add alerts for incidents that had no alert.

Deduplication and flapping prevention

# Alertmanager route with dedup
route:
  group_wait: 1m
  group_interval: 10m
  repeat_interval: 4h

For alerts that flap (fire/resolve/fire rapidly), add hysteresis:

# Prometheus rule with longer "for" duration
- alert: HighLatency
  expr: histogram_quantile(0.95, rate(http_request_seconds_bucket[5m])) > 0.5
  for: 10m  # must be high for 10 minutes, not just a spike

Escalation patterns

Time-based escalation

# PagerDuty-style escalation
route:
  routes:
    - match:
        severity: critical
      receiver: oncall-primary
      continue: true
      routes:
        # If not acknowledged in 15 minutes, escalate
        - match:
            severity: critical
          receiver: oncall-secondary
          group_wait: 15m

Python-based escalation with state tracking

class EscalationManager:
    def __init__(self):
        self.active_alerts: dict[str, dict] = {}

    async def handle_alert(self, alert_name: str, severity: str):
        now = time.time()
        if alert_name not in self.active_alerts:
            self.active_alerts[alert_name] = {
                "first_fired": now,
                "escalation_level": 0
            }
            await notify_oncall_primary(alert_name)
        else:
            state = self.active_alerts[alert_name]
            elapsed = now - state["first_fired"]
            if elapsed > 900 and state["escalation_level"] < 1:  # 15 min
                state["escalation_level"] = 1
                await notify_oncall_secondary(alert_name)
            elif elapsed > 3600 and state["escalation_level"] < 2:  # 1 hour
                state["escalation_level"] = 2
                await notify_engineering_manager(alert_name)

Testing alerts

Unit testing alert rules

# test_alert_rules.py
import subprocess
import yaml

def test_high_error_rate_fires():
    """Verify alert rule fires on synthetic data."""
    result = subprocess.run(
        ["promtool", "test", "rules", "test_data/high_error_rate.yml"],
        capture_output=True, text=True
    )
    assert result.returncode == 0, f"Alert test failed: {result.stderr}"

Prometheus’s promtool can evaluate rules against synthetic time-series data, ensuring your PromQL expressions trigger when expected.

Chaos testing

Periodically inject failures and verify alerts fire:

async def chaos_test_alerting():
    """Inject a known failure and verify alert fires within SLA."""
    inject_500_errors(rate=0.05, duration_seconds=120)
    alert_fired = await wait_for_alert("HighErrorRate", timeout_seconds=300)
    assert alert_fired, "Alert did not fire within 5 minutes of injected failure"
    clear_injected_errors()

One thing to remember: Alerting is a feedback loop. Measure the quality of your alerts (actionable rate, time-to-resolve), review them monthly, and ruthlessly delete noisy rules. An alerting system you trust is worth more than one that covers every edge case.

pythonobservabilitysrearchitecture

See Also

  • Python Correlation Ids Correlation IDs are name tags for requests — they let you follow one visitor's journey through a crowded theme park of services.
  • Python Grafana Dashboards Python Grafana turns boring numbers from your Python app into colorful, real-time dashboards — like a car's dashboard but for your code.
  • Python Log Aggregation Elk ELK collects scattered log files from all your services into one searchable place — like gathering every sticky note in the office into a single filing cabinet.
  • Python Logging Best Practices Treat logs like a flight recorder so you can understand failures after they happen, not just during development.
  • Python Logging Handlers Think of logging handlers as mailboxes that decide where your app's messages end up — screen, file, or faraway server.