Incident Response Automation with Python — Deep Dive

Alert webhook handler

The foundation of incident automation is a webhook receiver that processes alerts from monitoring systems:

from fastapi import FastAPI, Request
from datetime import datetime, timezone
from enum import Enum
import logging
import uuid

logger = logging.getLogger(__name__)
app = FastAPI()


class Severity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"


class Incident:
    def __init__(
        self,
        title: str,
        severity: Severity,
        source: str,
        alert_data: dict,
    ):
        self.id = f"INC-{uuid.uuid4().hex[:8].upper()}"
        self.title = title
        self.severity = severity
        self.source = source
        self.alert_data = alert_data
        self.created_at = datetime.now(timezone.utc)
        self.timeline: list[dict] = []
        self.status = "triggered"
    
    def add_event(self, action: str, details: str = ""):
        self.timeline.append({
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "action": action,
            "details": details,
        })


# In-memory store (use Redis/DB in production)
incidents: dict[str, Incident] = {}


@app.post("/webhooks/alertmanager")
async def handle_alertmanager(request: Request):
    """Process Prometheus Alertmanager webhooks."""
    payload = await request.json()
    
    for alert in payload.get("alerts", []):
        labels = alert.get("labels", {})
        annotations = alert.get("annotations", {})
        
        severity = Severity(labels.get("severity", "medium"))
        title = annotations.get("summary", labels.get("alertname", "Unknown Alert"))
        
        incident = Incident(
            title=title,
            severity=severity,
            source="alertmanager",
            alert_data=alert,
        )
        
        incidents[incident.id] = incident
        incident.add_event("created", f"Alert: {title}")
        
        logger.info(f"New incident {incident.id}: {title} ({severity.value})")
        
        # Trigger the automated response pipeline
        await run_incident_pipeline(incident)
    
    return {"processed": len(payload.get("alerts", []))}

Automated runbook engine

Runbooks are encoded as Python classes with diagnostic and remediation steps:

from abc import ABC, abstractmethod
from dataclasses import dataclass
import subprocess
import httpx
import logging

logger = logging.getLogger(__name__)


@dataclass
class RunbookResult:
    success: bool
    action_taken: str
    details: str
    auto_remediated: bool = False


class Runbook(ABC):
    """Base class for incident runbooks."""
    
    @abstractmethod
    def matches(self, incident: Incident) -> bool:
        """Return True if this runbook applies to the incident."""
        ...
    
    @abstractmethod
    async def diagnose(self, incident: Incident) -> dict:
        """Gather diagnostic information."""
        ...
    
    @abstractmethod
    async def remediate(self, incident: Incident, diagnosis: dict) -> RunbookResult:
        """Attempt automated remediation."""
        ...


class HighErrorRateRunbook(Runbook):
    """Handle high API error rate incidents."""
    
    def matches(self, incident: Incident) -> bool:
        alert_name = incident.alert_data.get("labels", {}).get("alertname", "")
        return "ErrorRate" in alert_name or "5xx" in alert_name
    
    async def diagnose(self, incident: Incident) -> dict:
        service = incident.alert_data.get("labels", {}).get("service", "unknown")
        
        diagnosis = {"service": service, "checks": {}}
        
        # Check if the service is responding
        async with httpx.AsyncClient(timeout=5.0) as client:
            try:
                resp = await client.get(f"http://{service}.internal/health")
                diagnosis["checks"]["health"] = {
                    "status": resp.status_code,
                    "healthy": resp.status_code == 200,
                }
            except httpx.RequestError as e:
                diagnosis["checks"]["health"] = {
                    "status": "unreachable",
                    "error": str(e),
                }
        
        # Check recent deployments
        result = subprocess.run(
            ["kubectl", "rollout", "history", f"deployment/{service}", "-n", "production"],
            capture_output=True, text=True,
        )
        if result.returncode == 0:
            lines = result.stdout.strip().split("\n")
            diagnosis["checks"]["recent_deployments"] = lines[-3:]
        
        # Check pod status
        result = subprocess.run(
            ["kubectl", "get", "pods", "-l", f"app={service}", "-n", "production",
             "-o", "json"],
            capture_output=True, text=True,
        )
        if result.returncode == 0:
            import json
            pods = json.loads(result.stdout)
            pod_statuses = []
            for pod in pods.get("items", []):
                name = pod["metadata"]["name"]
                phase = pod["status"]["phase"]
                restarts = sum(
                    cs.get("restartCount", 0)
                    for cs in pod["status"].get("containerStatuses", [])
                )
                pod_statuses.append({"name": name, "phase": phase, "restarts": restarts})
            diagnosis["checks"]["pods"] = pod_statuses
        
        return diagnosis
    
    async def remediate(self, incident: Incident, diagnosis: dict) -> RunbookResult:
        service = diagnosis["service"]
        health = diagnosis["checks"].get("health", {})
        pods = diagnosis["checks"].get("pods", [])
        
        # If pods are crash-looping, try rolling back
        crash_looping = any(p.get("restarts", 0) > 3 for p in pods)
        
        if crash_looping:
            result = subprocess.run(
                ["kubectl", "rollout", "undo", f"deployment/{service}", "-n", "production"],
                capture_output=True, text=True,
            )
            if result.returncode == 0:
                return RunbookResult(
                    success=True,
                    action_taken=f"Rolled back deployment/{service}",
                    details=result.stdout,
                    auto_remediated=True,
                )
        
        # If service is unreachable, restart pods
        if health.get("status") == "unreachable":
            result = subprocess.run(
                ["kubectl", "rollout", "restart", f"deployment/{service}", "-n", "production"],
                capture_output=True, text=True,
            )
            if result.returncode == 0:
                return RunbookResult(
                    success=True,
                    action_taken=f"Restarted deployment/{service}",
                    details="Service was unreachable, initiated rolling restart",
                    auto_remediated=True,
                )
        
        return RunbookResult(
            success=False,
            action_taken="No automatic remediation available",
            details="Manual intervention required",
        )


class DiskSpaceRunbook(Runbook):
    """Handle disk space alerts."""
    
    def matches(self, incident: Incident) -> bool:
        alert_name = incident.alert_data.get("labels", {}).get("alertname", "")
        return "DiskSpace" in alert_name or "disk" in alert_name.lower()
    
    async def diagnose(self, incident: Incident) -> dict:
        instance = incident.alert_data.get("labels", {}).get("instance", "")
        host = instance.split(":")[0] if instance else "unknown"
        
        return {
            "host": host,
            "alert_value": incident.alert_data.get("value", "unknown"),
        }
    
    async def remediate(self, incident: Incident, diagnosis: dict) -> RunbookResult:
        host = diagnosis["host"]
        
        # Safe cleanup commands
        cleanup_commands = [
            "find /tmp -type f -mtime +7 -delete",
            "journalctl --vacuum-time=3d",
            "find /var/log -name '*.gz' -mtime +30 -delete",
        ]
        
        cleaned = []
        for cmd in cleanup_commands:
            result = subprocess.run(
                ["ssh", host, cmd],
                capture_output=True, text=True, timeout=30,
            )
            if result.returncode == 0:
                cleaned.append(cmd)
        
        if cleaned:
            return RunbookResult(
                success=True,
                action_taken=f"Cleaned disk on {host}",
                details=f"Ran {len(cleaned)} cleanup commands",
                auto_remediated=True,
            )
        
        return RunbookResult(
            success=False,
            action_taken="Cleanup commands failed",
            details="Manual disk investigation required",
        )

Incident pipeline orchestrator

import asyncio
import logging

logger = logging.getLogger(__name__)

# Register all runbooks
RUNBOOKS = [
    HighErrorRateRunbook(),
    DiskSpaceRunbook(),
]


async def run_incident_pipeline(incident: Incident):
    """Execute the full incident response pipeline."""
    
    # 1. Notify the incident channel
    await notify_slack(incident, "🚨 New incident created")
    
    # 2. Find matching runbook
    matching_runbook = None
    for runbook in RUNBOOKS:
        if runbook.matches(incident):
            matching_runbook = runbook
            break
    
    if not matching_runbook:
        incident.add_event("triage", "No matching runbook found — manual response needed")
        await notify_slack(incident, "No automated runbook available. Manual investigation required.")
        await page_oncall(incident)
        return
    
    # 3. Run diagnostics
    incident.add_event("diagnosing", f"Running {matching_runbook.__class__.__name__}")
    diagnosis = await matching_runbook.diagnose(incident)
    incident.add_event("diagnosed", str(diagnosis.get("checks", {}))[:500])
    
    await notify_slack(
        incident,
        f"📋 Diagnosis complete:\n```{format_diagnosis(diagnosis)}```",
    )
    
    # 4. Attempt remediation
    incident.add_event("remediating", "Attempting automated fix")
    result = await matching_runbook.remediate(incident, diagnosis)
    
    if result.auto_remediated:
        incident.status = "auto-resolved"
        incident.add_event("resolved", result.action_taken)
        await notify_slack(
            incident,
            f"✅ Auto-remediated: {result.action_taken}\n{result.details}",
        )
    else:
        incident.add_event("escalated", result.details)
        await notify_slack(
            incident,
            f"⚠️ Auto-remediation failed: {result.details}\nPaging on-call engineer.",
        )
        await page_oncall(incident)


def format_diagnosis(diagnosis: dict) -> str:
    lines = []
    for key, value in diagnosis.get("checks", {}).items():
        lines.append(f"{key}: {value}")
    return "\n".join(lines) or "No diagnostic data"

Slack notification integration

import httpx

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/T.../B.../..."
SLACK_CHANNEL = "#incidents"


async def notify_slack(incident: Incident, message: str):
    severity_emoji = {
        Severity.CRITICAL: "🔴",
        Severity.HIGH: "🟠",
        Severity.MEDIUM: "🟡",
        Severity.LOW: "🔵",
    }
    
    emoji = severity_emoji.get(incident.severity, "⚪")
    
    payload = {
        "channel": SLACK_CHANNEL,
        "blocks": [
            {
                "type": "header",
                "text": {
                    "type": "plain_text",
                    "text": f"{emoji} {incident.id}: {incident.title}",
                },
            },
            {
                "type": "section",
                "fields": [
                    {"type": "mrkdwn", "text": f"*Severity:* {incident.severity.value}"},
                    {"type": "mrkdwn", "text": f"*Status:* {incident.status}"},
                    {"type": "mrkdwn", "text": f"*Source:* {incident.source}"},
                    {"type": "mrkdwn", "text": f"*Time:* {incident.created_at:%H:%M UTC}"},
                ],
            },
            {
                "type": "section",
                "text": {"type": "mrkdwn", "text": message},
            },
        ],
    }
    
    async with httpx.AsyncClient() as client:
        await client.post(SLACK_WEBHOOK_URL, json=payload)

Post-mortem generation

from datetime import datetime, timezone
from pathlib import Path


def generate_postmortem(incident: Incident) -> str:
    """Generate a structured post-mortem document from incident data."""
    
    duration = "Unknown"
    if incident.timeline:
        first = datetime.fromisoformat(incident.timeline[0]["timestamp"])
        last = datetime.fromisoformat(incident.timeline[-1]["timestamp"])
        minutes = (last - first).total_seconds() / 60
        duration = f"{minutes:.0f} minutes"
    
    timeline_md = "\n".join(
        f"| {e['timestamp'][:19]} | {e['action']} | {e['details'][:80]} |"
        for e in incident.timeline
    )
    
    return f"""# Post-Mortem: {incident.id}

## Summary
- **Title:** {incident.title}
- **Severity:** {incident.severity.value}
- **Duration:** {duration}
- **Date:** {incident.created_at:%Y-%m-%d}
- **Status:** {incident.status}

## Timeline

| Time (UTC) | Action | Details |
|------------|--------|---------|
{timeline_md}

## Root Cause

_To be filled in by the investigating engineer._

## Impact

_Describe user-facing impact, affected services, and blast radius._

## What Went Well

- Automated detection triggered within monitoring thresholds
- {"Auto-remediation resolved the issue" if incident.status == "auto-resolved" else "Runbook provided diagnostic context for responders"}

## What Could Be Improved

_Identify gaps in monitoring, automation, or response process._

## Action Items

| Action | Owner | Priority | Status |
|--------|-------|----------|--------|
| _Add action items here_ | | | |

---
_Generated automatically by incident response automation._
_Review and complete within 48 hours of resolution._
"""


def save_postmortem(incident: Incident, output_dir: str = "postmortems") -> str:
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    filename = f"{incident.created_at:%Y-%m-%d}-{incident.id}.md"
    path = Path(output_dir) / filename
    path.write_text(generate_postmortem(incident))
    return str(path)

PagerDuty escalation

import httpx

PAGERDUTY_API_KEY = "your-api-key"
PAGERDUTY_SERVICE_ID = "PXXXXXX"


async def page_oncall(incident: Incident):
    """Create a PagerDuty incident to page the on-call engineer."""
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://api.pagerduty.com/incidents",
            headers={
                "Authorization": f"Token token={PAGERDUTY_API_KEY}",
                "Content-Type": "application/json",
            },
            json={
                "incident": {
                    "type": "incident",
                    "title": f"{incident.id}: {incident.title}",
                    "service": {
                        "id": PAGERDUTY_SERVICE_ID,
                        "type": "service_reference",
                    },
                    "urgency": "high" if incident.severity in (
                        Severity.CRITICAL, Severity.HIGH
                    ) else "low",
                    "body": {
                        "type": "incident_body",
                        "details": (
                            f"Automated diagnosis completed.\n"
                            f"Timeline events: {len(incident.timeline)}\n"
                            f"Source: {incident.source}\n"
                        ),
                    },
                }
            },
        )
        resp.raise_for_status()

Tradeoffs

ApproachMTTR reductionSetup effortRisk
Alert routing only10-20%LowNone
Auto-diagnosis + manual fix30-50%MediumNone
Auto-remediation (safe ops)50-70%MediumLow (idempotent ops)
Full auto-remediation70-90%HighMedium (needs guardrails)

Start with automated diagnosis — gathering context and posting it to the incident channel costs nothing and helps every incident. Add auto-remediation gradually, starting with the safest operations (restart a service, clear temp files) and expanding to riskier ones (rollback, scale changes) only after building confidence.

The one thing to remember: The highest-value automation isn’t the remediation itself — it’s the automated context gathering. Having logs, metrics, recent deployments, and pod status available in the incident channel within 30 seconds saves more time than any auto-fix, because it eliminates the most frustrating phase of incident response: figuring out what’s happening.

pythonincident-responseautomationsre

See Also