Compliance as Code with Python — Deep Dive

Custom Checkov policies

Checkov’s built-in policies cover common cases, but every organization has custom requirements. Writing custom checks in Python is straightforward:

# custom_checks/encryption_checks.py
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories


class RDSEncryptionEnabled(BaseResourceCheck):
    def __init__(self):
        name = "Ensure RDS instance has encryption at rest enabled"
        id = "CUSTOM_RDS_001"
        supported = ["aws_db_instance"]
        categories = [CheckCategories.ENCRYPTION]
        super().__init__(
            name=name, id=id, categories=categories,
            supported_resources=supported,
        )
    
    def scan_resource_conf(self, conf: dict) -> CheckResult:
        storage_encrypted = conf.get("storage_encrypted", [False])
        if isinstance(storage_encrypted, list):
            storage_encrypted = storage_encrypted[0]
        
        if storage_encrypted is True:
            return CheckResult.PASSED
        return CheckResult.FAILED


class RDSMultiAZEnabled(BaseResourceCheck):
    def __init__(self):
        name = "Ensure RDS instance has Multi-AZ enabled for production"
        id = "CUSTOM_RDS_002"
        supported = ["aws_db_instance"]
        categories = [CheckCategories.BACKUP_AND_RECOVERY]
        super().__init__(
            name=name, id=id, categories=categories,
            supported_resources=supported,
        )
    
    def scan_resource_conf(self, conf: dict) -> CheckResult:
        tags = conf.get("tags", [{}])
        if isinstance(tags, list):
            tags = tags[0] if tags else {}
        
        environment = tags.get("Environment", "production")
        if environment != "production":
            return CheckResult.PASSED
        
        multi_az = conf.get("multi_az", [False])
        if isinstance(multi_az, list):
            multi_az = multi_az[0]
        
        return CheckResult.PASSED if multi_az else CheckResult.FAILED


# Register checks
rds_encryption = RDSEncryptionEnabled()
rds_multi_az = RDSMultiAZEnabled()

Graph-based checks for cross-resource compliance

Checkov also supports graph checks that validate relationships between resources:

# custom_checks/network_isolation.py
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories


class SecurityGroupNoPublicRDP(BaseResourceCheck):
    def __init__(self):
        name = "Ensure no security group allows ingress from 0.0.0.0/0 to port 3389"
        id = "CUSTOM_NET_001"
        supported = ["aws_security_group", "aws_security_group_rule"]
        categories = [CheckCategories.NETWORKING]
        super().__init__(
            name=name, id=id, categories=categories,
            supported_resources=supported,
        )
    
    def scan_resource_conf(self, conf: dict) -> CheckResult:
        ingress_rules = conf.get("ingress", [])
        if not isinstance(ingress_rules, list):
            ingress_rules = [ingress_rules]
        
        for rule in ingress_rules:
            if not isinstance(rule, dict):
                continue
            
            from_port = rule.get("from_port", [0])
            to_port = rule.get("to_port", [0])
            cidr_blocks = rule.get("cidr_blocks", [[]])
            
            if isinstance(from_port, list):
                from_port = from_port[0] if from_port else 0
            if isinstance(to_port, list):
                to_port = to_port[0] if to_port else 0
            if isinstance(cidr_blocks, list) and cidr_blocks:
                if isinstance(cidr_blocks[0], list):
                    cidr_blocks = cidr_blocks[0]
            
            port_range = range(int(from_port), int(to_port) + 1)
            if 3389 in port_range and "0.0.0.0/0" in cidr_blocks:
                return CheckResult.FAILED
        
        return CheckResult.PASSED


check = SecurityGroupNoPublicRDP()

Cloud Custodian policies

Cloud Custodian uses YAML policies backed by a Python engine. It can detect violations and take automatic corrective action:

# policies/s3-compliance.yml
policies:
  - name: s3-encryption-required
    resource: s3
    description: "All S3 buckets must have default encryption"
    filters:
      - type: bucket-encryption
        state: False
    actions:
      - type: set-bucket-encryption
        crypto: AES256
      - type: notify
        template: default
        subject: "S3 Bucket Missing Encryption"
        to: ["security-team@example.com"]
        transport:
          type: sns
          topic: arn:aws:sns:us-east-1:123456:compliance-alerts
  
  - name: s3-no-public-access
    resource: s3
    filters:
      - type: global-grants
    actions:
      - type: delete-global-grants
        grantees:
          - "http://acs.amazonaws.com/groups/global/AllUsers"

Driving Cloud Custodian from Python

import subprocess
import json
import logging
from pathlib import Path
from datetime import datetime

logger = logging.getLogger(__name__)


class ComplianceRunner:
    def __init__(self, policy_dir: str, output_dir: str = "/tmp/custodian"):
        self.policy_dir = Path(policy_dir)
        self.output_dir = Path(output_dir)
    
    def run_all_policies(self) -> dict:
        """Execute all Cloud Custodian policies and collect results."""
        results = {}
        
        for policy_file in self.policy_dir.glob("*.yml"):
            policy_output = self.output_dir / policy_file.stem
            
            result = subprocess.run(
                [
                    "custodian", "run",
                    "--output-dir", str(policy_output),
                    str(policy_file),
                ],
                capture_output=True,
                text=True,
            )
            
            # Parse results from output directory
            for resource_dir in policy_output.iterdir():
                if resource_dir.is_dir():
                    resources_file = resource_dir / "resources.json"
                    if resources_file.exists():
                        violations = json.loads(resources_file.read_text())
                        results[resource_dir.name] = {
                            "violations": len(violations),
                            "resources": [
                                v.get("BucketName", v.get("InstanceId", "unknown"))
                                for v in violations[:10]
                            ],
                        }
        
        return results
    
    def generate_report(self, results: dict) -> str:
        """Generate a compliance summary report."""
        total_violations = sum(r["violations"] for r in results.values())
        
        report_lines = [
            f"# Compliance Report — {datetime.now():%Y-%m-%d %H:%M}",
            f"",
            f"**Total policies checked:** {len(results)}",
            f"**Total violations:** {total_violations}",
            f"",
        ]
        
        for policy, data in sorted(results.items()):
            status = "✅ PASS" if data["violations"] == 0 else f"❌ FAIL ({data['violations']})"
            report_lines.append(f"- **{policy}**: {status}")
            
            if data["violations"] > 0:
                for resource in data["resources"]:
                    report_lines.append(f"  - {resource}")
        
        return "\n".join(report_lines)

AWS Config custom rules with Python Lambda

AWS Config evaluates resource configurations against rules. Custom rules use Lambda functions:

# lambda_function.py — AWS Config custom rule
import json
import boto3
from datetime import datetime


config_client = boto3.client("config")


def lambda_handler(event, context):
    """Check that EBS volumes are encrypted."""
    invoking_event = json.loads(event["invokingEvent"])
    rule_parameters = json.loads(event.get("ruleParameters", "{}"))
    
    configuration_item = invoking_event.get("configurationItem", {})
    resource_type = configuration_item.get("resourceType", "")
    
    if resource_type != "AWS::EC2::Volume":
        return  # Not applicable
    
    configuration = configuration_item.get("configuration", {})
    encrypted = configuration.get("encrypted", False)
    
    compliance_type = "COMPLIANT" if encrypted else "NON_COMPLIANT"
    
    annotation = (
        "EBS volume is encrypted"
        if encrypted
        else "EBS volume is NOT encrypted — encryption at rest is required"
    )
    
    config_client.put_evaluations(
        Evaluations=[
            {
                "ComplianceResourceType": resource_type,
                "ComplianceResourceId": configuration_item["resourceId"],
                "ComplianceType": compliance_type,
                "Annotation": annotation,
                "OrderingTimestamp": datetime.now(),
            }
        ],
        ResultToken=event["resultToken"],
    )

Continuous compliance monitoring

A scheduled compliance scanner that tracks drift over time:

import boto3
import json
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict

logger = logging.getLogger(__name__)


@dataclass
class ComplianceViolation:
    resource_id: str
    resource_type: str
    rule: str
    severity: str
    details: str
    first_seen: str = ""
    
    def __post_init__(self):
        if not self.first_seen:
            self.first_seen = datetime.now(timezone.utc).isoformat()


class ContinuousComplianceMonitor:
    def __init__(self, region: str = "us-east-1"):
        self.ec2 = boto3.client("ec2", region_name=region)
        self.s3 = boto3.client("s3", region_name=region)
        self.rds = boto3.client("rds", region_name=region)
        self.iam = boto3.client("iam", region_name=region)
    
    def check_s3_encryption(self) -> list[ComplianceViolation]:
        violations = []
        buckets = self.s3.list_buckets()["Buckets"]
        
        for bucket in buckets:
            name = bucket["Name"]
            try:
                self.s3.get_bucket_encryption(Bucket=name)
            except self.s3.exceptions.ClientError as e:
                if "ServerSideEncryptionConfigurationNotFoundError" in str(e):
                    violations.append(ComplianceViolation(
                        resource_id=name,
                        resource_type="S3::Bucket",
                        rule="s3-encryption-at-rest",
                        severity="HIGH",
                        details="Bucket lacks default encryption configuration",
                    ))
        
        return violations
    
    def check_s3_public_access(self) -> list[ComplianceViolation]:
        violations = []
        buckets = self.s3.list_buckets()["Buckets"]
        
        for bucket in buckets:
            name = bucket["Name"]
            try:
                public_access = self.s3.get_public_access_block(Bucket=name)
                config = public_access["PublicAccessBlockConfiguration"]
                
                if not all([
                    config.get("BlockPublicAcls", False),
                    config.get("IgnorePublicAcls", False),
                    config.get("BlockPublicPolicy", False),
                    config.get("RestrictPublicBuckets", False),
                ]):
                    violations.append(ComplianceViolation(
                        resource_id=name,
                        resource_type="S3::Bucket",
                        rule="s3-block-public-access",
                        severity="CRITICAL",
                        details="Public access block not fully enabled",
                    ))
            except self.s3.exceptions.ClientError:
                violations.append(ComplianceViolation(
                    resource_id=name,
                    resource_type="S3::Bucket",
                    rule="s3-block-public-access",
                    severity="CRITICAL",
                    details="No public access block configuration found",
                ))
        
        return violations
    
    def check_ebs_encryption(self) -> list[ComplianceViolation]:
        violations = []
        paginator = self.ec2.get_paginator("describe_volumes")
        
        for page in paginator.paginate():
            for volume in page["Volumes"]:
                if not volume.get("Encrypted", False):
                    violations.append(ComplianceViolation(
                        resource_id=volume["VolumeId"],
                        resource_type="EC2::Volume",
                        rule="ebs-encryption-at-rest",
                        severity="HIGH",
                        details=f"Unencrypted volume ({volume.get('Size', '?')} GB)",
                    ))
        
        return violations
    
    def run_full_scan(self) -> dict:
        """Run all compliance checks and return a structured report."""
        all_violations = []
        
        checks = [
            ("S3 Encryption", self.check_s3_encryption),
            ("S3 Public Access", self.check_s3_public_access),
            ("EBS Encryption", self.check_ebs_encryption),
        ]
        
        for check_name, check_fn in checks:
            logger.info(f"Running: {check_name}")
            try:
                violations = check_fn()
                all_violations.extend(violations)
                logger.info(f"  {check_name}: {len(violations)} violations")
            except Exception as e:
                logger.error(f"  {check_name} failed: {e}")
        
        return {
            "scan_time": datetime.now(timezone.utc).isoformat(),
            "total_violations": len(all_violations),
            "by_severity": {
                "CRITICAL": len([v for v in all_violations if v.severity == "CRITICAL"]),
                "HIGH": len([v for v in all_violations if v.severity == "HIGH"]),
                "MEDIUM": len([v for v in all_violations if v.severity == "MEDIUM"]),
                "LOW": len([v for v in all_violations if v.severity == "LOW"]),
            },
            "violations": [asdict(v) for v in all_violations],
        }

Tradeoffs

ApproachStrengthsWeaknesses
Checkov (static)Fast, pre-deployment, wide IaC coverageOnly catches config issues, not runtime drift
Cloud CustodianReal-time remediation, multi-cloudYAML policies can get complex, runtime cost
AWS Config RulesNative AWS integration, continuousAWS-only, Lambda cold starts, cost per evaluation
Custom Python scriptsFull flexibility, exact business rulesMaintenance burden, no standard format

Most teams combine approaches: Checkov in CI/CD for pre-deployment checks, Cloud Custodian or AWS Config for continuous runtime monitoring, and custom Python scripts for organization-specific rules that don’t fit standard frameworks.

The one thing to remember: Effective compliance as code combines pre-deployment checks (Checkov in CI/CD) with continuous runtime monitoring (Cloud Custodian, AWS Config). The goal is a feedback loop where violations are caught before deployment and drift is detected within hours, not audit cycles.

pythoncompliancesecuritydevops

See Also