Chaos Engineering with Python — Deep Dive
Chaos Toolkit architecture
The Chaos Toolkit (CTK) is the de facto open-source framework for chaos engineering in Python. It models experiments as a series of probes (observations) and actions (fault injections), with steady-state hypotheses that define pass/fail criteria.
Install and verify:
pip install chaostoolkit chaostoolkit-kubernetes chaostoolkit-aws
chaos --version
An experiment file (experiment.json) defines the full lifecycle:
{
"title": "Pod termination resilience",
"description": "Verify the service recovers when a pod is killed",
"steady-state-hypothesis": {
"title": "Service responds normally",
"probes": [
{
"type": "probe",
"name": "api-responds-200",
"tolerance": 200,
"provider": {
"type": "http",
"url": "https://api.example.com/health"
}
}
]
},
"method": [
{
"type": "action",
"name": "kill-random-pod",
"provider": {
"type": "python",
"module": "chaosk8s.pod.actions",
"func": "terminate_pods",
"arguments": {
"label_selector": "app=payment-service",
"ns": "production",
"qty": 1
}
},
"pauses": {
"after": 30
}
}
],
"rollbacks": [
{
"type": "action",
"name": "scale-back-up",
"provider": {
"type": "python",
"module": "chaosk8s.deployment.actions",
"func": "scale_deployment",
"arguments": {
"name": "payment-service",
"replicas": 3,
"ns": "production"
}
}
}
]
}
Run it with chaos run experiment.json. CTK checks the steady state before and after the method, runs rollbacks if things go wrong, and produces a journal file with detailed results.
Writing custom fault injectors
When off-the-shelf plugins don’t cover your failure modes, you write Python extensions. CTK discovers functions by module path:
# file: chaosext/network.py
import subprocess
import logging
logger = logging.getLogger("chaostoolkit")
def inject_latency(
interface: str = "eth0",
latency_ms: int = 300,
jitter_ms: int = 50,
duration_seconds: int = 60,
) -> dict:
"""Add network latency using tc (traffic control).
Requires root or CAP_NET_ADMIN capability.
"""
add_cmd = [
"tc", "qdisc", "add", "dev", interface, "root", "netem",
"delay", f"{latency_ms}ms", f"{jitter_ms}ms", "distribution", "normal",
]
remove_cmd = [
"tc", "qdisc", "del", "dev", interface, "root", "netem",
]
logger.info(f"Injecting {latency_ms}ms ± {jitter_ms}ms latency on {interface}")
subprocess.run(add_cmd, check=True)
return {
"injected": True,
"interface": interface,
"latency_ms": latency_ms,
"cleanup_cmd": " ".join(remove_cmd),
}
def remove_latency(interface: str = "eth0") -> dict:
"""Remove injected latency — use as a rollback action."""
subprocess.run(
["tc", "qdisc", "del", "dev", interface, "root", "netem"],
check=True,
)
return {"removed": True, "interface": interface}
Reference it in your experiment:
{
"type": "action",
"name": "add-network-latency",
"provider": {
"type": "python",
"module": "chaosext.network",
"func": "inject_latency",
"arguments": {"latency_ms": 500, "duration_seconds": 120}
}
}
AWS fault injection with boto3
For cloud infrastructure, direct boto3 calls give fine-grained control:
import boto3
import random
import time
from datetime import datetime
class AWSChaosExperiment:
def __init__(self, region: str = "us-east-1"):
self.ec2 = boto3.client("ec2", region_name=region)
self.cloudwatch = boto3.client("cloudwatch", region_name=region)
self.experiment_id = f"chaos-{datetime.now():%Y%m%d-%H%M%S}"
def get_instances_by_tag(self, tag_key: str, tag_value: str) -> list[str]:
response = self.ec2.describe_instances(
Filters=[
{"Name": f"tag:{tag_key}", "Values": [tag_value]},
{"Name": "instance-state-name", "Values": ["running"]},
]
)
instance_ids = []
for reservation in response["Reservations"]:
for instance in reservation["Instances"]:
instance_ids.append(instance["InstanceId"])
return instance_ids
def terminate_random_instance(
self, tag_key: str, tag_value: str, dry_run: bool = False
) -> dict:
instances = self.get_instances_by_tag(tag_key, tag_value)
if not instances:
return {"action": "none", "reason": "no matching instances"}
target = random.choice(instances)
if dry_run:
return {"action": "dry_run", "would_terminate": target}
self.ec2.terminate_instances(InstanceIds=[target])
return {
"action": "terminated",
"instance_id": target,
"experiment_id": self.experiment_id,
"pool_size": len(instances),
"timestamp": datetime.now().isoformat(),
}
def verify_recovery(
self,
alarm_name: str,
timeout_seconds: int = 300,
poll_interval: int = 15,
) -> dict:
"""Poll a CloudWatch alarm to verify the system recovers."""
start = time.time()
while time.time() - start < timeout_seconds:
response = self.cloudwatch.describe_alarms(AlarmNames=[alarm_name])
alarm = response["MetricAlarms"][0]
state = alarm["StateValue"]
if state == "OK":
elapsed = time.time() - start
return {"recovered": True, "seconds": round(elapsed, 1)}
time.sleep(poll_interval)
return {"recovered": False, "timeout_seconds": timeout_seconds}
Kubernetes chaos with the Python client
from kubernetes import client, config
import random
import logging
logger = logging.getLogger(__name__)
class K8sChaos:
def __init__(self, kubeconfig: str | None = None):
if kubeconfig:
config.load_kube_config(config_file=kubeconfig)
else:
config.load_incluster_config()
self.v1 = client.CoreV1Api()
self.apps = client.AppsV1Api()
def kill_random_pod(self, namespace: str, label_selector: str) -> dict:
pods = self.v1.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
field_selector="status.phase=Running",
)
if not pods.items:
return {"killed": False, "reason": "no running pods match selector"}
target = random.choice(pods.items)
pod_name = target.metadata.name
logger.info(f"Killing pod {pod_name} in {namespace}")
self.v1.delete_namespaced_pod(
name=pod_name,
namespace=namespace,
grace_period_seconds=0,
)
return {
"killed": True,
"pod": pod_name,
"namespace": namespace,
"remaining_pods": len(pods.items) - 1,
}
def simulate_node_pressure(
self, node_name: str, taint_effect: str = "NoSchedule"
) -> dict:
"""Apply a taint to simulate node pressure without actually stressing it."""
body = {
"spec": {
"taints": [
{
"key": "chaos-experiment",
"value": "true",
"effect": taint_effect,
}
]
}
}
self.v1.patch_node(node_name, body)
return {"tainted": True, "node": node_name, "effect": taint_effect}
Resource exhaustion experiments
For testing how applications handle resource pressure:
import os
import threading
import time
import tempfile
def consume_memory(target_mb: int, hold_seconds: int = 30) -> dict:
"""Allocate memory to simulate memory pressure."""
block_size = 1024 * 1024 # 1 MB
blocks = []
try:
for i in range(target_mb):
blocks.append(bytearray(block_size))
time.sleep(hold_seconds)
return {"consumed_mb": target_mb, "held_seconds": hold_seconds}
finally:
blocks.clear()
def consume_cpu(cores: int = 2, duration_seconds: int = 30) -> dict:
"""Spin CPU cores to simulate CPU pressure."""
stop_event = threading.Event()
def burn():
while not stop_event.is_set():
_ = sum(i * i for i in range(10_000))
threads = [threading.Thread(target=burn) for _ in range(cores)]
for t in threads:
t.start()
stop_event.wait(timeout=duration_seconds)
stop_event.set()
for t in threads:
t.join(timeout=5)
return {"cores_burned": cores, "duration_seconds": duration_seconds}
def fill_disk(
target_mb: int, directory: str = "/tmp", hold_seconds: int = 30
) -> dict:
"""Create temporary files to simulate disk pressure."""
path = None
try:
fd, path = tempfile.mkstemp(dir=directory, prefix="chaos_disk_")
os.write(fd, b"\0" * (target_mb * 1024 * 1024))
os.close(fd)
time.sleep(hold_seconds)
return {"filled_mb": target_mb, "path": path}
finally:
if path and os.path.exists(path):
os.remove(path)
Automated chaos in CI/CD
Mature teams run chaos experiments as part of their deployment pipeline. A Python script orchestrates the full flow:
import subprocess
import sys
import json
from pathlib import Path
def run_chaos_suite(experiments_dir: str, environment: str) -> dict:
"""Run all chaos experiments in a directory, fail the pipeline if any break."""
results = {}
experiments = sorted(Path(experiments_dir).glob("*.json"))
for exp_file in experiments:
print(f"\n{'='*60}")
print(f"Running: {exp_file.name}")
print(f"{'='*60}")
result = subprocess.run(
["chaos", "run", str(exp_file), "--journal-path", f"/tmp/{exp_file.stem}-journal.json"],
capture_output=True,
text=True,
timeout=600,
)
journal_path = f"/tmp/{exp_file.stem}-journal.json"
if Path(journal_path).exists():
journal = json.loads(Path(journal_path).read_text())
status = journal.get("status", "unknown")
else:
status = "error"
results[exp_file.name] = {
"status": status,
"return_code": result.returncode,
}
if status != "completed":
print(f"FAILED: {exp_file.name} — status: {status}")
failed = [k for k, v in results.items() if v["status"] != "completed"]
if failed:
print(f"\n{len(failed)} experiment(s) failed: {failed}")
sys.exit(1)
print(f"\nAll {len(results)} experiments passed")
return results
Tradeoffs and safety rails
| Approach | Pros | Cons |
|---|---|---|
| Chaos Toolkit (CTK) | Declarative experiments, rich plugin ecosystem, journal logging | Steeper learning curve, YAML/JSON verbosity |
| Custom boto3/k8s scripts | Full control, easy to debug, fits exact needs | No standardized experiment format, manual rollbacks |
| Gremlin/LitmusChaos SaaS | UI dashboards, team collaboration, guardrails built in | Cost, vendor lock-in, less flexibility |
Safety is paramount. Every chaos experiment should have:
- Blast radius limits — only affect a subset of instances, never the entire fleet
- Automatic rollbacks — CTK’s rollback block or custom cleanup functions
- Kill switch — a mechanism to immediately stop the experiment (a flag file, a feature toggle, or a manual
chaos stopcommand) - Monitoring correlation — tag metrics with experiment IDs so you can separate experiment impact from real incidents
- Gradual escalation — start in dev, then staging, then production off-peak, then production during normal traffic
Real-world example: Netflix and GameDay
Netflix’s Chaos Engineering team runs “GameDay” exercises where they simulate major outages. Their Python-based tooling (FIT — Failure Injection Testing) can inject failures at the service level: force a specific microservice to return errors, add latency to specific API calls, or simulate an entire AWS region going offline. Each experiment generates a detailed report showing exactly how traffic rerouted, which fallbacks activated, and where the user experience degraded.
Teams that adopt this practice typically see a 40-60% reduction in production incidents within the first year, because they’ve already found and fixed the failure modes that would have caused those incidents.
The one thing to remember: Production chaos experiments need safety rails — blast radius limits, automatic rollbacks, and kill switches. The Chaos Toolkit provides a structured Python framework, but even simple custom scripts work when they follow the hypothesis → experiment → learn cycle.
See Also
- Python Blue Green Deployments How Python helps teams switch between two identical server environments so updates never cause downtime
- Python Canary Releases Why teams send new code to just a few users first — and how Python manages the gradual rollout
- Python Compliance As Code How Python turns security rules and regulations into automated checks that run every time code changes
- Python Feature Branch Deployments How teams give every code branch its own live preview website using Python automation
- Python Gitops Patterns How Git becomes the single source of truth for everything running in production — and Python makes it work