Chaos Engineering with Python — Deep Dive

Chaos Toolkit architecture

The Chaos Toolkit (CTK) is the de facto open-source framework for chaos engineering in Python. It models experiments as a series of probes (observations) and actions (fault injections), with steady-state hypotheses that define pass/fail criteria.

Install and verify:

pip install chaostoolkit chaostoolkit-kubernetes chaostoolkit-aws
chaos --version

An experiment file (experiment.json) defines the full lifecycle:

{
  "title": "Pod termination resilience",
  "description": "Verify the service recovers when a pod is killed",
  "steady-state-hypothesis": {
    "title": "Service responds normally",
    "probes": [
      {
        "type": "probe",
        "name": "api-responds-200",
        "tolerance": 200,
        "provider": {
          "type": "http",
          "url": "https://api.example.com/health"
        }
      }
    ]
  },
  "method": [
    {
      "type": "action",
      "name": "kill-random-pod",
      "provider": {
        "type": "python",
        "module": "chaosk8s.pod.actions",
        "func": "terminate_pods",
        "arguments": {
          "label_selector": "app=payment-service",
          "ns": "production",
          "qty": 1
        }
      },
      "pauses": {
        "after": 30
      }
    }
  ],
  "rollbacks": [
    {
      "type": "action",
      "name": "scale-back-up",
      "provider": {
        "type": "python",
        "module": "chaosk8s.deployment.actions",
        "func": "scale_deployment",
        "arguments": {
          "name": "payment-service",
          "replicas": 3,
          "ns": "production"
        }
      }
    }
  ]
}

Run it with chaos run experiment.json. CTK checks the steady state before and after the method, runs rollbacks if things go wrong, and produces a journal file with detailed results.

Writing custom fault injectors

When off-the-shelf plugins don’t cover your failure modes, you write Python extensions. CTK discovers functions by module path:

# file: chaosext/network.py
import subprocess
import logging

logger = logging.getLogger("chaostoolkit")


def inject_latency(
    interface: str = "eth0",
    latency_ms: int = 300,
    jitter_ms: int = 50,
    duration_seconds: int = 60,
) -> dict:
    """Add network latency using tc (traffic control).
    
    Requires root or CAP_NET_ADMIN capability.
    """
    add_cmd = [
        "tc", "qdisc", "add", "dev", interface, "root", "netem",
        "delay", f"{latency_ms}ms", f"{jitter_ms}ms", "distribution", "normal",
    ]
    remove_cmd = [
        "tc", "qdisc", "del", "dev", interface, "root", "netem",
    ]
    
    logger.info(f"Injecting {latency_ms}ms ± {jitter_ms}ms latency on {interface}")
    subprocess.run(add_cmd, check=True)
    
    return {
        "injected": True,
        "interface": interface,
        "latency_ms": latency_ms,
        "cleanup_cmd": " ".join(remove_cmd),
    }


def remove_latency(interface: str = "eth0") -> dict:
    """Remove injected latency — use as a rollback action."""
    subprocess.run(
        ["tc", "qdisc", "del", "dev", interface, "root", "netem"],
        check=True,
    )
    return {"removed": True, "interface": interface}

Reference it in your experiment:

{
  "type": "action",
  "name": "add-network-latency",
  "provider": {
    "type": "python",
    "module": "chaosext.network",
    "func": "inject_latency",
    "arguments": {"latency_ms": 500, "duration_seconds": 120}
  }
}

AWS fault injection with boto3

For cloud infrastructure, direct boto3 calls give fine-grained control:

import boto3
import random
import time
from datetime import datetime


class AWSChaosExperiment:
    def __init__(self, region: str = "us-east-1"):
        self.ec2 = boto3.client("ec2", region_name=region)
        self.cloudwatch = boto3.client("cloudwatch", region_name=region)
        self.experiment_id = f"chaos-{datetime.now():%Y%m%d-%H%M%S}"
    
    def get_instances_by_tag(self, tag_key: str, tag_value: str) -> list[str]:
        response = self.ec2.describe_instances(
            Filters=[
                {"Name": f"tag:{tag_key}", "Values": [tag_value]},
                {"Name": "instance-state-name", "Values": ["running"]},
            ]
        )
        instance_ids = []
        for reservation in response["Reservations"]:
            for instance in reservation["Instances"]:
                instance_ids.append(instance["InstanceId"])
        return instance_ids
    
    def terminate_random_instance(
        self, tag_key: str, tag_value: str, dry_run: bool = False
    ) -> dict:
        instances = self.get_instances_by_tag(tag_key, tag_value)
        if not instances:
            return {"action": "none", "reason": "no matching instances"}
        
        target = random.choice(instances)
        
        if dry_run:
            return {"action": "dry_run", "would_terminate": target}
        
        self.ec2.terminate_instances(InstanceIds=[target])
        return {
            "action": "terminated",
            "instance_id": target,
            "experiment_id": self.experiment_id,
            "pool_size": len(instances),
            "timestamp": datetime.now().isoformat(),
        }
    
    def verify_recovery(
        self,
        alarm_name: str,
        timeout_seconds: int = 300,
        poll_interval: int = 15,
    ) -> dict:
        """Poll a CloudWatch alarm to verify the system recovers."""
        start = time.time()
        while time.time() - start < timeout_seconds:
            response = self.cloudwatch.describe_alarms(AlarmNames=[alarm_name])
            alarm = response["MetricAlarms"][0]
            state = alarm["StateValue"]
            
            if state == "OK":
                elapsed = time.time() - start
                return {"recovered": True, "seconds": round(elapsed, 1)}
            
            time.sleep(poll_interval)
        
        return {"recovered": False, "timeout_seconds": timeout_seconds}

Kubernetes chaos with the Python client

from kubernetes import client, config
import random
import logging

logger = logging.getLogger(__name__)


class K8sChaos:
    def __init__(self, kubeconfig: str | None = None):
        if kubeconfig:
            config.load_kube_config(config_file=kubeconfig)
        else:
            config.load_incluster_config()
        self.v1 = client.CoreV1Api()
        self.apps = client.AppsV1Api()
    
    def kill_random_pod(self, namespace: str, label_selector: str) -> dict:
        pods = self.v1.list_namespaced_pod(
            namespace=namespace,
            label_selector=label_selector,
            field_selector="status.phase=Running",
        )
        if not pods.items:
            return {"killed": False, "reason": "no running pods match selector"}
        
        target = random.choice(pods.items)
        pod_name = target.metadata.name
        
        logger.info(f"Killing pod {pod_name} in {namespace}")
        self.v1.delete_namespaced_pod(
            name=pod_name,
            namespace=namespace,
            grace_period_seconds=0,
        )
        return {
            "killed": True,
            "pod": pod_name,
            "namespace": namespace,
            "remaining_pods": len(pods.items) - 1,
        }
    
    def simulate_node_pressure(
        self, node_name: str, taint_effect: str = "NoSchedule"
    ) -> dict:
        """Apply a taint to simulate node pressure without actually stressing it."""
        body = {
            "spec": {
                "taints": [
                    {
                        "key": "chaos-experiment",
                        "value": "true",
                        "effect": taint_effect,
                    }
                ]
            }
        }
        self.v1.patch_node(node_name, body)
        return {"tainted": True, "node": node_name, "effect": taint_effect}

Resource exhaustion experiments

For testing how applications handle resource pressure:

import os
import threading
import time
import tempfile


def consume_memory(target_mb: int, hold_seconds: int = 30) -> dict:
    """Allocate memory to simulate memory pressure."""
    block_size = 1024 * 1024  # 1 MB
    blocks = []
    
    try:
        for i in range(target_mb):
            blocks.append(bytearray(block_size))
        
        time.sleep(hold_seconds)
        return {"consumed_mb": target_mb, "held_seconds": hold_seconds}
    finally:
        blocks.clear()


def consume_cpu(cores: int = 2, duration_seconds: int = 30) -> dict:
    """Spin CPU cores to simulate CPU pressure."""
    stop_event = threading.Event()
    
    def burn():
        while not stop_event.is_set():
            _ = sum(i * i for i in range(10_000))
    
    threads = [threading.Thread(target=burn) for _ in range(cores)]
    for t in threads:
        t.start()
    
    stop_event.wait(timeout=duration_seconds)
    stop_event.set()
    for t in threads:
        t.join(timeout=5)
    
    return {"cores_burned": cores, "duration_seconds": duration_seconds}


def fill_disk(
    target_mb: int, directory: str = "/tmp", hold_seconds: int = 30
) -> dict:
    """Create temporary files to simulate disk pressure."""
    path = None
    try:
        fd, path = tempfile.mkstemp(dir=directory, prefix="chaos_disk_")
        os.write(fd, b"\0" * (target_mb * 1024 * 1024))
        os.close(fd)
        
        time.sleep(hold_seconds)
        return {"filled_mb": target_mb, "path": path}
    finally:
        if path and os.path.exists(path):
            os.remove(path)

Automated chaos in CI/CD

Mature teams run chaos experiments as part of their deployment pipeline. A Python script orchestrates the full flow:

import subprocess
import sys
import json
from pathlib import Path


def run_chaos_suite(experiments_dir: str, environment: str) -> dict:
    """Run all chaos experiments in a directory, fail the pipeline if any break."""
    results = {}
    experiments = sorted(Path(experiments_dir).glob("*.json"))
    
    for exp_file in experiments:
        print(f"\n{'='*60}")
        print(f"Running: {exp_file.name}")
        print(f"{'='*60}")
        
        result = subprocess.run(
            ["chaos", "run", str(exp_file), "--journal-path", f"/tmp/{exp_file.stem}-journal.json"],
            capture_output=True,
            text=True,
            timeout=600,
        )
        
        journal_path = f"/tmp/{exp_file.stem}-journal.json"
        if Path(journal_path).exists():
            journal = json.loads(Path(journal_path).read_text())
            status = journal.get("status", "unknown")
        else:
            status = "error"
        
        results[exp_file.name] = {
            "status": status,
            "return_code": result.returncode,
        }
        
        if status != "completed":
            print(f"FAILED: {exp_file.name} — status: {status}")
    
    failed = [k for k, v in results.items() if v["status"] != "completed"]
    
    if failed:
        print(f"\n{len(failed)} experiment(s) failed: {failed}")
        sys.exit(1)
    
    print(f"\nAll {len(results)} experiments passed")
    return results

Tradeoffs and safety rails

ApproachProsCons
Chaos Toolkit (CTK)Declarative experiments, rich plugin ecosystem, journal loggingSteeper learning curve, YAML/JSON verbosity
Custom boto3/k8s scriptsFull control, easy to debug, fits exact needsNo standardized experiment format, manual rollbacks
Gremlin/LitmusChaos SaaSUI dashboards, team collaboration, guardrails built inCost, vendor lock-in, less flexibility

Safety is paramount. Every chaos experiment should have:

  • Blast radius limits — only affect a subset of instances, never the entire fleet
  • Automatic rollbacks — CTK’s rollback block or custom cleanup functions
  • Kill switch — a mechanism to immediately stop the experiment (a flag file, a feature toggle, or a manual chaos stop command)
  • Monitoring correlation — tag metrics with experiment IDs so you can separate experiment impact from real incidents
  • Gradual escalation — start in dev, then staging, then production off-peak, then production during normal traffic

Real-world example: Netflix and GameDay

Netflix’s Chaos Engineering team runs “GameDay” exercises where they simulate major outages. Their Python-based tooling (FIT — Failure Injection Testing) can inject failures at the service level: force a specific microservice to return errors, add latency to specific API calls, or simulate an entire AWS region going offline. Each experiment generates a detailed report showing exactly how traffic rerouted, which fallbacks activated, and where the user experience degraded.

Teams that adopt this practice typically see a 40-60% reduction in production incidents within the first year, because they’ve already found and fixed the failure modes that would have caused those incidents.

The one thing to remember: Production chaos experiments need safety rails — blast radius limits, automatic rollbacks, and kill switches. The Chaos Toolkit provides a structured Python framework, but even simple custom scripts work when they follow the hypothesis → experiment → learn cycle.

pythonchaos-engineeringreliabilitydevops

See Also