Incident Response Automation with Python — Deep Dive
Alert webhook handler
The foundation of incident automation is a webhook receiver that processes alerts from monitoring systems:
from fastapi import FastAPI, Request
from datetime import datetime, timezone
from enum import Enum
import logging
import uuid
logger = logging.getLogger(__name__)
app = FastAPI()
class Severity(str, Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class Incident:
def __init__(
self,
title: str,
severity: Severity,
source: str,
alert_data: dict,
):
self.id = f"INC-{uuid.uuid4().hex[:8].upper()}"
self.title = title
self.severity = severity
self.source = source
self.alert_data = alert_data
self.created_at = datetime.now(timezone.utc)
self.timeline: list[dict] = []
self.status = "triggered"
def add_event(self, action: str, details: str = ""):
self.timeline.append({
"timestamp": datetime.now(timezone.utc).isoformat(),
"action": action,
"details": details,
})
# In-memory store (use Redis/DB in production)
incidents: dict[str, Incident] = {}
@app.post("/webhooks/alertmanager")
async def handle_alertmanager(request: Request):
"""Process Prometheus Alertmanager webhooks."""
payload = await request.json()
for alert in payload.get("alerts", []):
labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
severity = Severity(labels.get("severity", "medium"))
title = annotations.get("summary", labels.get("alertname", "Unknown Alert"))
incident = Incident(
title=title,
severity=severity,
source="alertmanager",
alert_data=alert,
)
incidents[incident.id] = incident
incident.add_event("created", f"Alert: {title}")
logger.info(f"New incident {incident.id}: {title} ({severity.value})")
# Trigger the automated response pipeline
await run_incident_pipeline(incident)
return {"processed": len(payload.get("alerts", []))}
Automated runbook engine
Runbooks are encoded as Python classes with diagnostic and remediation steps:
from abc import ABC, abstractmethod
from dataclasses import dataclass
import subprocess
import httpx
import logging
logger = logging.getLogger(__name__)
@dataclass
class RunbookResult:
success: bool
action_taken: str
details: str
auto_remediated: bool = False
class Runbook(ABC):
"""Base class for incident runbooks."""
@abstractmethod
def matches(self, incident: Incident) -> bool:
"""Return True if this runbook applies to the incident."""
...
@abstractmethod
async def diagnose(self, incident: Incident) -> dict:
"""Gather diagnostic information."""
...
@abstractmethod
async def remediate(self, incident: Incident, diagnosis: dict) -> RunbookResult:
"""Attempt automated remediation."""
...
class HighErrorRateRunbook(Runbook):
"""Handle high API error rate incidents."""
def matches(self, incident: Incident) -> bool:
alert_name = incident.alert_data.get("labels", {}).get("alertname", "")
return "ErrorRate" in alert_name or "5xx" in alert_name
async def diagnose(self, incident: Incident) -> dict:
service = incident.alert_data.get("labels", {}).get("service", "unknown")
diagnosis = {"service": service, "checks": {}}
# Check if the service is responding
async with httpx.AsyncClient(timeout=5.0) as client:
try:
resp = await client.get(f"http://{service}.internal/health")
diagnosis["checks"]["health"] = {
"status": resp.status_code,
"healthy": resp.status_code == 200,
}
except httpx.RequestError as e:
diagnosis["checks"]["health"] = {
"status": "unreachable",
"error": str(e),
}
# Check recent deployments
result = subprocess.run(
["kubectl", "rollout", "history", f"deployment/{service}", "-n", "production"],
capture_output=True, text=True,
)
if result.returncode == 0:
lines = result.stdout.strip().split("\n")
diagnosis["checks"]["recent_deployments"] = lines[-3:]
# Check pod status
result = subprocess.run(
["kubectl", "get", "pods", "-l", f"app={service}", "-n", "production",
"-o", "json"],
capture_output=True, text=True,
)
if result.returncode == 0:
import json
pods = json.loads(result.stdout)
pod_statuses = []
for pod in pods.get("items", []):
name = pod["metadata"]["name"]
phase = pod["status"]["phase"]
restarts = sum(
cs.get("restartCount", 0)
for cs in pod["status"].get("containerStatuses", [])
)
pod_statuses.append({"name": name, "phase": phase, "restarts": restarts})
diagnosis["checks"]["pods"] = pod_statuses
return diagnosis
async def remediate(self, incident: Incident, diagnosis: dict) -> RunbookResult:
service = diagnosis["service"]
health = diagnosis["checks"].get("health", {})
pods = diagnosis["checks"].get("pods", [])
# If pods are crash-looping, try rolling back
crash_looping = any(p.get("restarts", 0) > 3 for p in pods)
if crash_looping:
result = subprocess.run(
["kubectl", "rollout", "undo", f"deployment/{service}", "-n", "production"],
capture_output=True, text=True,
)
if result.returncode == 0:
return RunbookResult(
success=True,
action_taken=f"Rolled back deployment/{service}",
details=result.stdout,
auto_remediated=True,
)
# If service is unreachable, restart pods
if health.get("status") == "unreachable":
result = subprocess.run(
["kubectl", "rollout", "restart", f"deployment/{service}", "-n", "production"],
capture_output=True, text=True,
)
if result.returncode == 0:
return RunbookResult(
success=True,
action_taken=f"Restarted deployment/{service}",
details="Service was unreachable, initiated rolling restart",
auto_remediated=True,
)
return RunbookResult(
success=False,
action_taken="No automatic remediation available",
details="Manual intervention required",
)
class DiskSpaceRunbook(Runbook):
"""Handle disk space alerts."""
def matches(self, incident: Incident) -> bool:
alert_name = incident.alert_data.get("labels", {}).get("alertname", "")
return "DiskSpace" in alert_name or "disk" in alert_name.lower()
async def diagnose(self, incident: Incident) -> dict:
instance = incident.alert_data.get("labels", {}).get("instance", "")
host = instance.split(":")[0] if instance else "unknown"
return {
"host": host,
"alert_value": incident.alert_data.get("value", "unknown"),
}
async def remediate(self, incident: Incident, diagnosis: dict) -> RunbookResult:
host = diagnosis["host"]
# Safe cleanup commands
cleanup_commands = [
"find /tmp -type f -mtime +7 -delete",
"journalctl --vacuum-time=3d",
"find /var/log -name '*.gz' -mtime +30 -delete",
]
cleaned = []
for cmd in cleanup_commands:
result = subprocess.run(
["ssh", host, cmd],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
cleaned.append(cmd)
if cleaned:
return RunbookResult(
success=True,
action_taken=f"Cleaned disk on {host}",
details=f"Ran {len(cleaned)} cleanup commands",
auto_remediated=True,
)
return RunbookResult(
success=False,
action_taken="Cleanup commands failed",
details="Manual disk investigation required",
)
Incident pipeline orchestrator
import asyncio
import logging
logger = logging.getLogger(__name__)
# Register all runbooks
RUNBOOKS = [
HighErrorRateRunbook(),
DiskSpaceRunbook(),
]
async def run_incident_pipeline(incident: Incident):
"""Execute the full incident response pipeline."""
# 1. Notify the incident channel
await notify_slack(incident, "🚨 New incident created")
# 2. Find matching runbook
matching_runbook = None
for runbook in RUNBOOKS:
if runbook.matches(incident):
matching_runbook = runbook
break
if not matching_runbook:
incident.add_event("triage", "No matching runbook found — manual response needed")
await notify_slack(incident, "No automated runbook available. Manual investigation required.")
await page_oncall(incident)
return
# 3. Run diagnostics
incident.add_event("diagnosing", f"Running {matching_runbook.__class__.__name__}")
diagnosis = await matching_runbook.diagnose(incident)
incident.add_event("diagnosed", str(diagnosis.get("checks", {}))[:500])
await notify_slack(
incident,
f"📋 Diagnosis complete:\n```{format_diagnosis(diagnosis)}```",
)
# 4. Attempt remediation
incident.add_event("remediating", "Attempting automated fix")
result = await matching_runbook.remediate(incident, diagnosis)
if result.auto_remediated:
incident.status = "auto-resolved"
incident.add_event("resolved", result.action_taken)
await notify_slack(
incident,
f"✅ Auto-remediated: {result.action_taken}\n{result.details}",
)
else:
incident.add_event("escalated", result.details)
await notify_slack(
incident,
f"⚠️ Auto-remediation failed: {result.details}\nPaging on-call engineer.",
)
await page_oncall(incident)
def format_diagnosis(diagnosis: dict) -> str:
lines = []
for key, value in diagnosis.get("checks", {}).items():
lines.append(f"{key}: {value}")
return "\n".join(lines) or "No diagnostic data"
Slack notification integration
import httpx
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/T.../B.../..."
SLACK_CHANNEL = "#incidents"
async def notify_slack(incident: Incident, message: str):
severity_emoji = {
Severity.CRITICAL: "🔴",
Severity.HIGH: "🟠",
Severity.MEDIUM: "🟡",
Severity.LOW: "🔵",
}
emoji = severity_emoji.get(incident.severity, "⚪")
payload = {
"channel": SLACK_CHANNEL,
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{emoji} {incident.id}: {incident.title}",
},
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Severity:* {incident.severity.value}"},
{"type": "mrkdwn", "text": f"*Status:* {incident.status}"},
{"type": "mrkdwn", "text": f"*Source:* {incident.source}"},
{"type": "mrkdwn", "text": f"*Time:* {incident.created_at:%H:%M UTC}"},
],
},
{
"type": "section",
"text": {"type": "mrkdwn", "text": message},
},
],
}
async with httpx.AsyncClient() as client:
await client.post(SLACK_WEBHOOK_URL, json=payload)
Post-mortem generation
from datetime import datetime, timezone
from pathlib import Path
def generate_postmortem(incident: Incident) -> str:
"""Generate a structured post-mortem document from incident data."""
duration = "Unknown"
if incident.timeline:
first = datetime.fromisoformat(incident.timeline[0]["timestamp"])
last = datetime.fromisoformat(incident.timeline[-1]["timestamp"])
minutes = (last - first).total_seconds() / 60
duration = f"{minutes:.0f} minutes"
timeline_md = "\n".join(
f"| {e['timestamp'][:19]} | {e['action']} | {e['details'][:80]} |"
for e in incident.timeline
)
return f"""# Post-Mortem: {incident.id}
## Summary
- **Title:** {incident.title}
- **Severity:** {incident.severity.value}
- **Duration:** {duration}
- **Date:** {incident.created_at:%Y-%m-%d}
- **Status:** {incident.status}
## Timeline
| Time (UTC) | Action | Details |
|------------|--------|---------|
{timeline_md}
## Root Cause
_To be filled in by the investigating engineer._
## Impact
_Describe user-facing impact, affected services, and blast radius._
## What Went Well
- Automated detection triggered within monitoring thresholds
- {"Auto-remediation resolved the issue" if incident.status == "auto-resolved" else "Runbook provided diagnostic context for responders"}
## What Could Be Improved
_Identify gaps in monitoring, automation, or response process._
## Action Items
| Action | Owner | Priority | Status |
|--------|-------|----------|--------|
| _Add action items here_ | | | |
---
_Generated automatically by incident response automation._
_Review and complete within 48 hours of resolution._
"""
def save_postmortem(incident: Incident, output_dir: str = "postmortems") -> str:
Path(output_dir).mkdir(parents=True, exist_ok=True)
filename = f"{incident.created_at:%Y-%m-%d}-{incident.id}.md"
path = Path(output_dir) / filename
path.write_text(generate_postmortem(incident))
return str(path)
PagerDuty escalation
import httpx
PAGERDUTY_API_KEY = "your-api-key"
PAGERDUTY_SERVICE_ID = "PXXXXXX"
async def page_oncall(incident: Incident):
"""Create a PagerDuty incident to page the on-call engineer."""
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.pagerduty.com/incidents",
headers={
"Authorization": f"Token token={PAGERDUTY_API_KEY}",
"Content-Type": "application/json",
},
json={
"incident": {
"type": "incident",
"title": f"{incident.id}: {incident.title}",
"service": {
"id": PAGERDUTY_SERVICE_ID,
"type": "service_reference",
},
"urgency": "high" if incident.severity in (
Severity.CRITICAL, Severity.HIGH
) else "low",
"body": {
"type": "incident_body",
"details": (
f"Automated diagnosis completed.\n"
f"Timeline events: {len(incident.timeline)}\n"
f"Source: {incident.source}\n"
),
},
}
},
)
resp.raise_for_status()
Tradeoffs
| Approach | MTTR reduction | Setup effort | Risk |
|---|---|---|---|
| Alert routing only | 10-20% | Low | None |
| Auto-diagnosis + manual fix | 30-50% | Medium | None |
| Auto-remediation (safe ops) | 50-70% | Medium | Low (idempotent ops) |
| Full auto-remediation | 70-90% | High | Medium (needs guardrails) |
Start with automated diagnosis — gathering context and posting it to the incident channel costs nothing and helps every incident. Add auto-remediation gradually, starting with the safest operations (restart a service, clear temp files) and expanding to riskier ones (rollback, scale changes) only after building confidence.
The one thing to remember: The highest-value automation isn’t the remediation itself — it’s the automated context gathering. Having logs, metrics, recent deployments, and pod status available in the incident channel within 30 seconds saves more time than any auto-fix, because it eliminates the most frustrating phase of incident response: figuring out what’s happening.
See Also
- Python Blue Green Deployments How Python helps teams switch between two identical server environments so updates never cause downtime
- Python Canary Releases Why teams send new code to just a few users first — and how Python manages the gradual rollout
- Python Chaos Engineering Why engineers deliberately break their own systems using Python — and how it prevents real disasters
- Python Compliance As Code How Python turns security rules and regulations into automated checks that run every time code changes
- Python Feature Branch Deployments How teams give every code branch its own live preview website using Python automation