Compliance as Code with Python — Deep Dive
Custom Checkov policies
Checkov’s built-in policies cover common cases, but every organization has custom requirements. Writing custom checks in Python is straightforward:
# custom_checks/encryption_checks.py
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
class RDSEncryptionEnabled(BaseResourceCheck):
def __init__(self):
name = "Ensure RDS instance has encryption at rest enabled"
id = "CUSTOM_RDS_001"
supported = ["aws_db_instance"]
categories = [CheckCategories.ENCRYPTION]
super().__init__(
name=name, id=id, categories=categories,
supported_resources=supported,
)
def scan_resource_conf(self, conf: dict) -> CheckResult:
storage_encrypted = conf.get("storage_encrypted", [False])
if isinstance(storage_encrypted, list):
storage_encrypted = storage_encrypted[0]
if storage_encrypted is True:
return CheckResult.PASSED
return CheckResult.FAILED
class RDSMultiAZEnabled(BaseResourceCheck):
def __init__(self):
name = "Ensure RDS instance has Multi-AZ enabled for production"
id = "CUSTOM_RDS_002"
supported = ["aws_db_instance"]
categories = [CheckCategories.BACKUP_AND_RECOVERY]
super().__init__(
name=name, id=id, categories=categories,
supported_resources=supported,
)
def scan_resource_conf(self, conf: dict) -> CheckResult:
tags = conf.get("tags", [{}])
if isinstance(tags, list):
tags = tags[0] if tags else {}
environment = tags.get("Environment", "production")
if environment != "production":
return CheckResult.PASSED
multi_az = conf.get("multi_az", [False])
if isinstance(multi_az, list):
multi_az = multi_az[0]
return CheckResult.PASSED if multi_az else CheckResult.FAILED
# Register checks
rds_encryption = RDSEncryptionEnabled()
rds_multi_az = RDSMultiAZEnabled()
Graph-based checks for cross-resource compliance
Checkov also supports graph checks that validate relationships between resources:
# custom_checks/network_isolation.py
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.models.enums import CheckResult, CheckCategories
class SecurityGroupNoPublicRDP(BaseResourceCheck):
def __init__(self):
name = "Ensure no security group allows ingress from 0.0.0.0/0 to port 3389"
id = "CUSTOM_NET_001"
supported = ["aws_security_group", "aws_security_group_rule"]
categories = [CheckCategories.NETWORKING]
super().__init__(
name=name, id=id, categories=categories,
supported_resources=supported,
)
def scan_resource_conf(self, conf: dict) -> CheckResult:
ingress_rules = conf.get("ingress", [])
if not isinstance(ingress_rules, list):
ingress_rules = [ingress_rules]
for rule in ingress_rules:
if not isinstance(rule, dict):
continue
from_port = rule.get("from_port", [0])
to_port = rule.get("to_port", [0])
cidr_blocks = rule.get("cidr_blocks", [[]])
if isinstance(from_port, list):
from_port = from_port[0] if from_port else 0
if isinstance(to_port, list):
to_port = to_port[0] if to_port else 0
if isinstance(cidr_blocks, list) and cidr_blocks:
if isinstance(cidr_blocks[0], list):
cidr_blocks = cidr_blocks[0]
port_range = range(int(from_port), int(to_port) + 1)
if 3389 in port_range and "0.0.0.0/0" in cidr_blocks:
return CheckResult.FAILED
return CheckResult.PASSED
check = SecurityGroupNoPublicRDP()
Cloud Custodian policies
Cloud Custodian uses YAML policies backed by a Python engine. It can detect violations and take automatic corrective action:
# policies/s3-compliance.yml
policies:
- name: s3-encryption-required
resource: s3
description: "All S3 buckets must have default encryption"
filters:
- type: bucket-encryption
state: False
actions:
- type: set-bucket-encryption
crypto: AES256
- type: notify
template: default
subject: "S3 Bucket Missing Encryption"
to: ["security-team@example.com"]
transport:
type: sns
topic: arn:aws:sns:us-east-1:123456:compliance-alerts
- name: s3-no-public-access
resource: s3
filters:
- type: global-grants
actions:
- type: delete-global-grants
grantees:
- "http://acs.amazonaws.com/groups/global/AllUsers"
Driving Cloud Custodian from Python
import subprocess
import json
import logging
from pathlib import Path
from datetime import datetime
logger = logging.getLogger(__name__)
class ComplianceRunner:
def __init__(self, policy_dir: str, output_dir: str = "/tmp/custodian"):
self.policy_dir = Path(policy_dir)
self.output_dir = Path(output_dir)
def run_all_policies(self) -> dict:
"""Execute all Cloud Custodian policies and collect results."""
results = {}
for policy_file in self.policy_dir.glob("*.yml"):
policy_output = self.output_dir / policy_file.stem
result = subprocess.run(
[
"custodian", "run",
"--output-dir", str(policy_output),
str(policy_file),
],
capture_output=True,
text=True,
)
# Parse results from output directory
for resource_dir in policy_output.iterdir():
if resource_dir.is_dir():
resources_file = resource_dir / "resources.json"
if resources_file.exists():
violations = json.loads(resources_file.read_text())
results[resource_dir.name] = {
"violations": len(violations),
"resources": [
v.get("BucketName", v.get("InstanceId", "unknown"))
for v in violations[:10]
],
}
return results
def generate_report(self, results: dict) -> str:
"""Generate a compliance summary report."""
total_violations = sum(r["violations"] for r in results.values())
report_lines = [
f"# Compliance Report — {datetime.now():%Y-%m-%d %H:%M}",
f"",
f"**Total policies checked:** {len(results)}",
f"**Total violations:** {total_violations}",
f"",
]
for policy, data in sorted(results.items()):
status = "✅ PASS" if data["violations"] == 0 else f"❌ FAIL ({data['violations']})"
report_lines.append(f"- **{policy}**: {status}")
if data["violations"] > 0:
for resource in data["resources"]:
report_lines.append(f" - {resource}")
return "\n".join(report_lines)
AWS Config custom rules with Python Lambda
AWS Config evaluates resource configurations against rules. Custom rules use Lambda functions:
# lambda_function.py — AWS Config custom rule
import json
import boto3
from datetime import datetime
config_client = boto3.client("config")
def lambda_handler(event, context):
"""Check that EBS volumes are encrypted."""
invoking_event = json.loads(event["invokingEvent"])
rule_parameters = json.loads(event.get("ruleParameters", "{}"))
configuration_item = invoking_event.get("configurationItem", {})
resource_type = configuration_item.get("resourceType", "")
if resource_type != "AWS::EC2::Volume":
return # Not applicable
configuration = configuration_item.get("configuration", {})
encrypted = configuration.get("encrypted", False)
compliance_type = "COMPLIANT" if encrypted else "NON_COMPLIANT"
annotation = (
"EBS volume is encrypted"
if encrypted
else "EBS volume is NOT encrypted — encryption at rest is required"
)
config_client.put_evaluations(
Evaluations=[
{
"ComplianceResourceType": resource_type,
"ComplianceResourceId": configuration_item["resourceId"],
"ComplianceType": compliance_type,
"Annotation": annotation,
"OrderingTimestamp": datetime.now(),
}
],
ResultToken=event["resultToken"],
)
Continuous compliance monitoring
A scheduled compliance scanner that tracks drift over time:
import boto3
import json
import logging
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
logger = logging.getLogger(__name__)
@dataclass
class ComplianceViolation:
resource_id: str
resource_type: str
rule: str
severity: str
details: str
first_seen: str = ""
def __post_init__(self):
if not self.first_seen:
self.first_seen = datetime.now(timezone.utc).isoformat()
class ContinuousComplianceMonitor:
def __init__(self, region: str = "us-east-1"):
self.ec2 = boto3.client("ec2", region_name=region)
self.s3 = boto3.client("s3", region_name=region)
self.rds = boto3.client("rds", region_name=region)
self.iam = boto3.client("iam", region_name=region)
def check_s3_encryption(self) -> list[ComplianceViolation]:
violations = []
buckets = self.s3.list_buckets()["Buckets"]
for bucket in buckets:
name = bucket["Name"]
try:
self.s3.get_bucket_encryption(Bucket=name)
except self.s3.exceptions.ClientError as e:
if "ServerSideEncryptionConfigurationNotFoundError" in str(e):
violations.append(ComplianceViolation(
resource_id=name,
resource_type="S3::Bucket",
rule="s3-encryption-at-rest",
severity="HIGH",
details="Bucket lacks default encryption configuration",
))
return violations
def check_s3_public_access(self) -> list[ComplianceViolation]:
violations = []
buckets = self.s3.list_buckets()["Buckets"]
for bucket in buckets:
name = bucket["Name"]
try:
public_access = self.s3.get_public_access_block(Bucket=name)
config = public_access["PublicAccessBlockConfiguration"]
if not all([
config.get("BlockPublicAcls", False),
config.get("IgnorePublicAcls", False),
config.get("BlockPublicPolicy", False),
config.get("RestrictPublicBuckets", False),
]):
violations.append(ComplianceViolation(
resource_id=name,
resource_type="S3::Bucket",
rule="s3-block-public-access",
severity="CRITICAL",
details="Public access block not fully enabled",
))
except self.s3.exceptions.ClientError:
violations.append(ComplianceViolation(
resource_id=name,
resource_type="S3::Bucket",
rule="s3-block-public-access",
severity="CRITICAL",
details="No public access block configuration found",
))
return violations
def check_ebs_encryption(self) -> list[ComplianceViolation]:
violations = []
paginator = self.ec2.get_paginator("describe_volumes")
for page in paginator.paginate():
for volume in page["Volumes"]:
if not volume.get("Encrypted", False):
violations.append(ComplianceViolation(
resource_id=volume["VolumeId"],
resource_type="EC2::Volume",
rule="ebs-encryption-at-rest",
severity="HIGH",
details=f"Unencrypted volume ({volume.get('Size', '?')} GB)",
))
return violations
def run_full_scan(self) -> dict:
"""Run all compliance checks and return a structured report."""
all_violations = []
checks = [
("S3 Encryption", self.check_s3_encryption),
("S3 Public Access", self.check_s3_public_access),
("EBS Encryption", self.check_ebs_encryption),
]
for check_name, check_fn in checks:
logger.info(f"Running: {check_name}")
try:
violations = check_fn()
all_violations.extend(violations)
logger.info(f" {check_name}: {len(violations)} violations")
except Exception as e:
logger.error(f" {check_name} failed: {e}")
return {
"scan_time": datetime.now(timezone.utc).isoformat(),
"total_violations": len(all_violations),
"by_severity": {
"CRITICAL": len([v for v in all_violations if v.severity == "CRITICAL"]),
"HIGH": len([v for v in all_violations if v.severity == "HIGH"]),
"MEDIUM": len([v for v in all_violations if v.severity == "MEDIUM"]),
"LOW": len([v for v in all_violations if v.severity == "LOW"]),
},
"violations": [asdict(v) for v in all_violations],
}
Tradeoffs
| Approach | Strengths | Weaknesses |
|---|---|---|
| Checkov (static) | Fast, pre-deployment, wide IaC coverage | Only catches config issues, not runtime drift |
| Cloud Custodian | Real-time remediation, multi-cloud | YAML policies can get complex, runtime cost |
| AWS Config Rules | Native AWS integration, continuous | AWS-only, Lambda cold starts, cost per evaluation |
| Custom Python scripts | Full flexibility, exact business rules | Maintenance burden, no standard format |
Most teams combine approaches: Checkov in CI/CD for pre-deployment checks, Cloud Custodian or AWS Config for continuous runtime monitoring, and custom Python scripts for organization-specific rules that don’t fit standard frameworks.
The one thing to remember: Effective compliance as code combines pre-deployment checks (Checkov in CI/CD) with continuous runtime monitoring (Cloud Custodian, AWS Config). The goal is a feedback loop where violations are caught before deployment and drift is detected within hours, not audit cycles.
See Also
- Python Blue Green Deployments How Python helps teams switch between two identical server environments so updates never cause downtime
- Python Canary Releases Why teams send new code to just a few users first — and how Python manages the gradual rollout
- Python Chaos Engineering Why engineers deliberately break their own systems using Python — and how it prevents real disasters
- Python Feature Branch Deployments How teams give every code branch its own live preview website using Python automation
- Python Gitops Patterns How Git becomes the single source of truth for everything running in production — and Python makes it work