Infrastructure Testing with Python — Deep Dive
Testinfra: testing live servers
Testinfra is a pytest plugin that lets you write assertions about actual server state. It connects via SSH, Docker, or local execution and provides modules for checking packages, services, files, sockets, and more.
pip install pytest-testinfra paramiko
Basic server tests
# test_webserver.py
import pytest
def test_nginx_installed(host):
nginx = host.package("nginx")
assert nginx.is_installed
assert nginx.version.startswith("1.")
def test_nginx_running(host):
service = host.service("nginx")
assert service.is_running
assert service.is_enabled
def test_nginx_listening(host):
socket = host.socket("tcp://0.0.0.0:443")
assert socket.is_listening
def test_http_redirects_to_https(host):
socket_80 = host.socket("tcp://0.0.0.0:80")
assert socket_80.is_listening
# Verify HTTP redirects
cmd = host.run("curl -s -o /dev/null -w '%{http_code}' http://localhost/")
assert cmd.stdout.strip() == "301"
def test_tls_certificate_valid(host):
cmd = host.run(
"echo | openssl s_client -connect localhost:443 -servername example.com 2>/dev/null "
"| openssl x509 -noout -dates"
)
assert cmd.rc == 0
assert "notAfter" in cmd.stdout
def test_firewall_rules(host):
iptables = host.iptables
rules = iptables.rules("filter", "INPUT")
# Verify SSH is allowed
assert any("dpt:22" in rule and "ACCEPT" in rule for rule in rules)
# Verify default policy is DROP
policy = host.run("iptables -L INPUT | head -1")
assert "DROP" in policy.stdout or "REJECT" in policy.stdout
Run against a remote host:
pytest test_webserver.py --hosts=ssh://deploy@web1.example.com
Security-focused tests
# test_security.py
def test_no_root_ssh(host):
sshd_config = host.file("/etc/ssh/sshd_config")
assert sshd_config.contains("PermitRootLogin no")
def test_password_auth_disabled(host):
sshd_config = host.file("/etc/ssh/sshd_config")
assert sshd_config.contains("PasswordAuthentication no")
def test_no_world_writable_files(host):
cmd = host.run(
"find / -xdev -type f -perm -0002 "
"-not -path '/proc/*' -not -path '/sys/*' 2>/dev/null | head -5"
)
assert cmd.stdout.strip() == "", f"World-writable files found: {cmd.stdout}"
def test_unattended_upgrades_enabled(host):
pkg = host.package("unattended-upgrades")
assert pkg.is_installed
config = host.file("/etc/apt/apt.conf.d/20auto-upgrades")
assert config.exists
assert config.contains("Unattended-Upgrade")
def test_fail2ban_running(host):
service = host.service("fail2ban")
assert service.is_running
assert service.is_enabled
def test_sensitive_files_permissions(host):
sensitive_files = {
"/etc/shadow": {"mode": 0o640, "user": "root"},
"/etc/ssh/sshd_config": {"mode": 0o600, "user": "root"},
}
for path, expected in sensitive_files.items():
f = host.file(path)
assert f.exists, f"{path} does not exist"
assert f.user == expected["user"], f"{path} owned by {f.user}"
assert f.mode == expected["mode"], f"{path} mode is {oct(f.mode)}"
Parameterized tests for multiple servers
# conftest.py
import pytest
import testinfra
def pytest_addoption(parser):
parser.addoption("--inventory", default="hosts.ini")
@pytest.fixture(scope="module")
def hosts_by_role(request):
"""Parse an inventory file and group hosts by role."""
inventory = request.config.getoption("--inventory")
roles = {}
current_role = None
with open(inventory) as f:
for line in f:
line = line.strip()
if line.startswith("[") and line.endswith("]"):
current_role = line[1:-1]
roles[current_role] = []
elif line and current_role:
roles[current_role].append(line)
return roles
# test_by_role.py
import pytest
import testinfra
@pytest.fixture(scope="module")
def web_hosts(hosts_by_role):
return [
testinfra.get_host(f"ssh://deploy@{h}")
for h in hosts_by_role.get("webservers", [])
]
def test_all_web_servers_respond(web_hosts):
for host in web_hosts:
cmd = host.run("curl -s -o /dev/null -w '%{http_code}' http://localhost/health")
assert cmd.stdout.strip() == "200", f"Health check failed on {host}"
Checkov: static analysis for IaC
Checkov scans Terraform, Kubernetes YAML, Dockerfiles, and more for security issues:
pip install checkov
Running Checkov
# Scan Terraform directory
checkov -d ./terraform/ --framework terraform
# Scan Kubernetes manifests
checkov -d ./k8s/ --framework kubernetes
# Output as JSON for programmatic use
checkov -d ./terraform/ -o json > checkov_results.json
Custom Checkov policies in Python
When built-in rules don’t cover your requirements, write custom checks:
# custom_checks/s3_naming.py
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
class S3BucketNamingConvention(BaseResourceCheck):
def __init__(self):
name = "Ensure S3 bucket follows naming convention"
id = "CUSTOM_S3_001"
supported = ["aws_s3_bucket"]
categories = [CheckCategories.CONVENTION]
super().__init__(name=name, id=id, categories=categories,
supported_resources=supported)
def scan_resource_conf(self, conf: dict) -> CheckResult:
bucket_name = conf.get("bucket", [""])[0]
# Must start with company prefix
if not bucket_name.startswith(("mycompany-prod-", "mycompany-staging-", "mycompany-dev-")):
return CheckResult.FAILED
return CheckResult.PASSED
check = S3BucketNamingConvention()
checkov -d ./terraform/ --external-checks-dir ./custom_checks/
Pulumi unit testing
If your infrastructure is defined in Pulumi (Python), you can unit test resource configurations:
# test_infra.py
import pulumi
import pytest
class MockedMixin:
"""Mock Pulumi resource creation for testing."""
resources = {}
@staticmethod
def set_mocks(mocks):
pulumi.runtime.set_mocks(mocks)
class InfraMocks(pulumi.runtime.Mocks):
def new_resource(self, args):
return [args.name + "_id", args.inputs]
def call(self, args):
return {}
pulumi.runtime.set_mocks(InfraMocks())
# Import after setting mocks
from my_infra import web_server, security_group
@pulumi.runtime.test
def test_server_has_tags():
"""Verify EC2 instance has required tags."""
def check_tags(tags):
assert tags is not None, "Server must have tags"
assert "Environment" in tags, "Must have Environment tag"
assert "Team" in tags, "Must have Team tag"
web_server.tags.apply(check_tags)
@pulumi.runtime.test
def test_security_group_no_open_ingress():
"""Verify security group doesn't allow 0.0.0.0/0 ingress."""
def check_ingress(ingress_rules):
for rule in ingress_rules:
cidrs = rule.get("cidr_blocks", [])
assert "0.0.0.0/0" not in cidrs, (
f"Security group allows unrestricted ingress on port {rule.get('from_port')}"
)
security_group.ingress.apply(check_ingress)
Integration test pipeline
A complete infrastructure test pipeline combines all layers:
# infra_test_pipeline.py
import subprocess
import json
import sys
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
def run_static_analysis(terraform_dir: str) -> dict:
"""Run Checkov on Terraform code."""
result = subprocess.run(
["checkov", "-d", terraform_dir, "-o", "json", "--quiet"],
capture_output=True,
text=True,
)
if result.returncode == 0:
return {"passed": True, "issues": 0}
try:
data = json.loads(result.stdout)
failed = sum(
len(check.get("failed_checks", []))
for check in data
if isinstance(check, dict)
)
return {"passed": False, "issues": failed}
except json.JSONDecodeError:
return {"passed": False, "issues": -1, "raw": result.stdout[:500]}
def run_testinfra(test_dir: str, hosts: list[str]) -> dict:
"""Run Testinfra tests against live hosts."""
host_args = []
for host in hosts:
host_args.extend(["--hosts", f"ssh://{host}"])
result = subprocess.run(
["pytest", test_dir, *host_args, "-v", "--tb=short", "--json-report"],
capture_output=True,
text=True,
)
report_path = Path(".report.json")
if report_path.exists():
report = json.loads(report_path.read_text())
passed = report["summary"].get("passed", 0)
failed = report["summary"].get("failed", 0)
report_path.unlink()
else:
passed, failed = 0, -1
return {
"passed": failed == 0,
"tests_passed": passed,
"tests_failed": failed,
}
def run_pipeline(
terraform_dir: str,
test_dir: str,
hosts: list[str],
) -> None:
"""Run the full infrastructure testing pipeline."""
print("=" * 60)
print("Stage 1: Static Analysis (Checkov)")
print("=" * 60)
static = run_static_analysis(terraform_dir)
print(f" Result: {'PASS' if static['passed'] else 'FAIL'}")
print(f" Issues: {static['issues']}")
if not static["passed"]:
print("\nStatic analysis failed — fix issues before proceeding")
sys.exit(1)
print(f"\n{'=' * 60}")
print("Stage 2: Integration Tests (Testinfra)")
print("=" * 60)
integration = run_testinfra(test_dir, hosts)
print(f" Result: {'PASS' if integration['passed'] else 'FAIL'}")
print(f" Passed: {integration['tests_passed']}")
print(f" Failed: {integration['tests_failed']}")
if not integration["passed"]:
print("\nIntegration tests failed")
sys.exit(1)
print("\n✅ All infrastructure tests passed")
Tradeoffs
| Approach | Speed | Coverage | Cost |
|---|---|---|---|
| Static analysis (Checkov) | Seconds | Config errors, security misconfig | Free |
| Unit tests (Pulumi) | Seconds | Resource properties, logic | Free |
| Container tests (Molecule) | Minutes | Ansible role behavior | Low (Docker) |
| Live tests (Testinfra) | Minutes | Actual server state | Moderate (real infra) |
| E2E tests | 10+ min | Full system behavior | High (full environment) |
Start with static analysis — it’s fast, free, and catches the most common issues. Add Testinfra when you have production servers that need ongoing verification. Use E2E tests for critical paths only, since they’re slow and expensive.
The one thing to remember: Infrastructure testing is most effective as a pipeline — static analysis catches config errors instantly, while Testinfra verifies that live servers match their expected state. Treat infrastructure code with the same testing rigor as application code.
See Also
- Python Blue Green Deployments How Python helps teams switch between two identical server environments so updates never cause downtime
- Python Canary Releases Why teams send new code to just a few users first — and how Python manages the gradual rollout
- Python Chaos Engineering Why engineers deliberately break their own systems using Python — and how it prevents real disasters
- Python Compliance As Code How Python turns security rules and regulations into automated checks that run every time code changes
- Python Feature Branch Deployments How teams give every code branch its own live preview website using Python automation