Infrastructure Testing with Python — Deep Dive

Testinfra: testing live servers

Testinfra is a pytest plugin that lets you write assertions about actual server state. It connects via SSH, Docker, or local execution and provides modules for checking packages, services, files, sockets, and more.

pip install pytest-testinfra paramiko

Basic server tests

# test_webserver.py
import pytest


def test_nginx_installed(host):
    nginx = host.package("nginx")
    assert nginx.is_installed
    assert nginx.version.startswith("1.")


def test_nginx_running(host):
    service = host.service("nginx")
    assert service.is_running
    assert service.is_enabled


def test_nginx_listening(host):
    socket = host.socket("tcp://0.0.0.0:443")
    assert socket.is_listening


def test_http_redirects_to_https(host):
    socket_80 = host.socket("tcp://0.0.0.0:80")
    assert socket_80.is_listening
    
    # Verify HTTP redirects
    cmd = host.run("curl -s -o /dev/null -w '%{http_code}' http://localhost/")
    assert cmd.stdout.strip() == "301"


def test_tls_certificate_valid(host):
    cmd = host.run(
        "echo | openssl s_client -connect localhost:443 -servername example.com 2>/dev/null "
        "| openssl x509 -noout -dates"
    )
    assert cmd.rc == 0
    assert "notAfter" in cmd.stdout


def test_firewall_rules(host):
    iptables = host.iptables
    rules = iptables.rules("filter", "INPUT")
    
    # Verify SSH is allowed
    assert any("dpt:22" in rule and "ACCEPT" in rule for rule in rules)
    
    # Verify default policy is DROP
    policy = host.run("iptables -L INPUT | head -1")
    assert "DROP" in policy.stdout or "REJECT" in policy.stdout

Run against a remote host:

pytest test_webserver.py --hosts=ssh://deploy@web1.example.com

Security-focused tests

# test_security.py


def test_no_root_ssh(host):
    sshd_config = host.file("/etc/ssh/sshd_config")
    assert sshd_config.contains("PermitRootLogin no")


def test_password_auth_disabled(host):
    sshd_config = host.file("/etc/ssh/sshd_config")
    assert sshd_config.contains("PasswordAuthentication no")


def test_no_world_writable_files(host):
    cmd = host.run(
        "find / -xdev -type f -perm -0002 "
        "-not -path '/proc/*' -not -path '/sys/*' 2>/dev/null | head -5"
    )
    assert cmd.stdout.strip() == "", f"World-writable files found: {cmd.stdout}"


def test_unattended_upgrades_enabled(host):
    pkg = host.package("unattended-upgrades")
    assert pkg.is_installed
    
    config = host.file("/etc/apt/apt.conf.d/20auto-upgrades")
    assert config.exists
    assert config.contains("Unattended-Upgrade")


def test_fail2ban_running(host):
    service = host.service("fail2ban")
    assert service.is_running
    assert service.is_enabled


def test_sensitive_files_permissions(host):
    sensitive_files = {
        "/etc/shadow": {"mode": 0o640, "user": "root"},
        "/etc/ssh/sshd_config": {"mode": 0o600, "user": "root"},
    }
    
    for path, expected in sensitive_files.items():
        f = host.file(path)
        assert f.exists, f"{path} does not exist"
        assert f.user == expected["user"], f"{path} owned by {f.user}"
        assert f.mode == expected["mode"], f"{path} mode is {oct(f.mode)}"

Parameterized tests for multiple servers

# conftest.py
import pytest
import testinfra


def pytest_addoption(parser):
    parser.addoption("--inventory", default="hosts.ini")


@pytest.fixture(scope="module")
def hosts_by_role(request):
    """Parse an inventory file and group hosts by role."""
    inventory = request.config.getoption("--inventory")
    roles = {}
    current_role = None
    
    with open(inventory) as f:
        for line in f:
            line = line.strip()
            if line.startswith("[") and line.endswith("]"):
                current_role = line[1:-1]
                roles[current_role] = []
            elif line and current_role:
                roles[current_role].append(line)
    
    return roles


# test_by_role.py
import pytest
import testinfra


@pytest.fixture(scope="module")
def web_hosts(hosts_by_role):
    return [
        testinfra.get_host(f"ssh://deploy@{h}")
        for h in hosts_by_role.get("webservers", [])
    ]


def test_all_web_servers_respond(web_hosts):
    for host in web_hosts:
        cmd = host.run("curl -s -o /dev/null -w '%{http_code}' http://localhost/health")
        assert cmd.stdout.strip() == "200", f"Health check failed on {host}"

Checkov: static analysis for IaC

Checkov scans Terraform, Kubernetes YAML, Dockerfiles, and more for security issues:

pip install checkov

Running Checkov

# Scan Terraform directory
checkov -d ./terraform/ --framework terraform

# Scan Kubernetes manifests
checkov -d ./k8s/ --framework kubernetes

# Output as JSON for programmatic use
checkov -d ./terraform/ -o json > checkov_results.json

Custom Checkov policies in Python

When built-in rules don’t cover your requirements, write custom checks:

# custom_checks/s3_naming.py
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories


class S3BucketNamingConvention(BaseResourceCheck):
    def __init__(self):
        name = "Ensure S3 bucket follows naming convention"
        id = "CUSTOM_S3_001"
        supported = ["aws_s3_bucket"]
        categories = [CheckCategories.CONVENTION]
        super().__init__(name=name, id=id, categories=categories,
                        supported_resources=supported)
    
    def scan_resource_conf(self, conf: dict) -> CheckResult:
        bucket_name = conf.get("bucket", [""])[0]
        
        # Must start with company prefix
        if not bucket_name.startswith(("mycompany-prod-", "mycompany-staging-", "mycompany-dev-")):
            return CheckResult.FAILED
        
        return CheckResult.PASSED


check = S3BucketNamingConvention()
checkov -d ./terraform/ --external-checks-dir ./custom_checks/

Pulumi unit testing

If your infrastructure is defined in Pulumi (Python), you can unit test resource configurations:

# test_infra.py
import pulumi
import pytest


class MockedMixin:
    """Mock Pulumi resource creation for testing."""
    resources = {}
    
    @staticmethod
    def set_mocks(mocks):
        pulumi.runtime.set_mocks(mocks)


class InfraMocks(pulumi.runtime.Mocks):
    def new_resource(self, args):
        return [args.name + "_id", args.inputs]
    
    def call(self, args):
        return {}


pulumi.runtime.set_mocks(InfraMocks())

# Import after setting mocks
from my_infra import web_server, security_group


@pulumi.runtime.test
def test_server_has_tags():
    """Verify EC2 instance has required tags."""
    def check_tags(tags):
        assert tags is not None, "Server must have tags"
        assert "Environment" in tags, "Must have Environment tag"
        assert "Team" in tags, "Must have Team tag"
    
    web_server.tags.apply(check_tags)


@pulumi.runtime.test
def test_security_group_no_open_ingress():
    """Verify security group doesn't allow 0.0.0.0/0 ingress."""
    def check_ingress(ingress_rules):
        for rule in ingress_rules:
            cidrs = rule.get("cidr_blocks", [])
            assert "0.0.0.0/0" not in cidrs, (
                f"Security group allows unrestricted ingress on port {rule.get('from_port')}"
            )
    
    security_group.ingress.apply(check_ingress)

Integration test pipeline

A complete infrastructure test pipeline combines all layers:

# infra_test_pipeline.py
import subprocess
import json
import sys
import logging
from pathlib import Path

logger = logging.getLogger(__name__)


def run_static_analysis(terraform_dir: str) -> dict:
    """Run Checkov on Terraform code."""
    result = subprocess.run(
        ["checkov", "-d", terraform_dir, "-o", "json", "--quiet"],
        capture_output=True,
        text=True,
    )
    
    if result.returncode == 0:
        return {"passed": True, "issues": 0}
    
    try:
        data = json.loads(result.stdout)
        failed = sum(
            len(check.get("failed_checks", []))
            for check in data
            if isinstance(check, dict)
        )
        return {"passed": False, "issues": failed}
    except json.JSONDecodeError:
        return {"passed": False, "issues": -1, "raw": result.stdout[:500]}


def run_testinfra(test_dir: str, hosts: list[str]) -> dict:
    """Run Testinfra tests against live hosts."""
    host_args = []
    for host in hosts:
        host_args.extend(["--hosts", f"ssh://{host}"])
    
    result = subprocess.run(
        ["pytest", test_dir, *host_args, "-v", "--tb=short", "--json-report"],
        capture_output=True,
        text=True,
    )
    
    report_path = Path(".report.json")
    if report_path.exists():
        report = json.loads(report_path.read_text())
        passed = report["summary"].get("passed", 0)
        failed = report["summary"].get("failed", 0)
        report_path.unlink()
    else:
        passed, failed = 0, -1
    
    return {
        "passed": failed == 0,
        "tests_passed": passed,
        "tests_failed": failed,
    }


def run_pipeline(
    terraform_dir: str,
    test_dir: str,
    hosts: list[str],
) -> None:
    """Run the full infrastructure testing pipeline."""
    print("=" * 60)
    print("Stage 1: Static Analysis (Checkov)")
    print("=" * 60)
    
    static = run_static_analysis(terraform_dir)
    print(f"  Result: {'PASS' if static['passed'] else 'FAIL'}")
    print(f"  Issues: {static['issues']}")
    
    if not static["passed"]:
        print("\nStatic analysis failed — fix issues before proceeding")
        sys.exit(1)
    
    print(f"\n{'=' * 60}")
    print("Stage 2: Integration Tests (Testinfra)")
    print("=" * 60)
    
    integration = run_testinfra(test_dir, hosts)
    print(f"  Result: {'PASS' if integration['passed'] else 'FAIL'}")
    print(f"  Passed: {integration['tests_passed']}")
    print(f"  Failed: {integration['tests_failed']}")
    
    if not integration["passed"]:
        print("\nIntegration tests failed")
        sys.exit(1)
    
    print("\n✅ All infrastructure tests passed")

Tradeoffs

ApproachSpeedCoverageCost
Static analysis (Checkov)SecondsConfig errors, security misconfigFree
Unit tests (Pulumi)SecondsResource properties, logicFree
Container tests (Molecule)MinutesAnsible role behaviorLow (Docker)
Live tests (Testinfra)MinutesActual server stateModerate (real infra)
E2E tests10+ minFull system behaviorHigh (full environment)

Start with static analysis — it’s fast, free, and catches the most common issues. Add Testinfra when you have production servers that need ongoing verification. Use E2E tests for critical paths only, since they’re slow and expensive.

The one thing to remember: Infrastructure testing is most effective as a pipeline — static analysis catches config errors instantly, while Testinfra verifies that live servers match their expected state. Treat infrastructure code with the same testing rigor as application code.

pythoninfrastructuretestingdevops

See Also