Blue-Green Deployments with Python — Deep Dive

AWS blue-green with Python and boto3

The most common blue-green pattern on AWS uses two Auto Scaling Groups (ASGs) behind an Application Load Balancer (ALB). The Python orchestrator manages which target group receives traffic:

import boto3
import time
import logging

logger = logging.getLogger(__name__)


class BlueGreenDeployer:
    def __init__(self, region: str = "us-east-1"):
        self.elbv2 = boto3.client("elbv2", region_name=region)
        self.ec2 = boto3.client("ec2", region_name=region)
    
    def get_active_target_group(self, listener_arn: str) -> str:
        """Find which target group currently receives traffic."""
        response = self.elbv2.describe_rules(ListenerArn=listener_arn)
        for rule in response["Rules"]:
            if rule.get("IsDefault"):
                for action in rule["Actions"]:
                    if action["Type"] == "forward":
                        return action["TargetGroupArn"]
        raise RuntimeError("No default forwarding rule found")
    
    def check_target_health(
        self, target_group_arn: str, min_healthy: int = 2
    ) -> bool:
        """Verify enough targets are healthy in the target group."""
        response = self.elbv2.describe_target_health(
            TargetGroupArn=target_group_arn
        )
        healthy = sum(
            1 for t in response["TargetHealthDescriptions"]
            if t["TargetHealth"]["State"] == "healthy"
        )
        logger.info(f"Healthy targets: {healthy}/{min_healthy} required")
        return healthy >= min_healthy
    
    def switch_traffic(
        self,
        listener_arn: str,
        new_target_group_arn: str,
        old_target_group_arn: str,
    ) -> dict:
        """Switch the default listener rule to the new target group."""
        rules = self.elbv2.describe_rules(ListenerArn=listener_arn)
        default_rule = next(
            r for r in rules["Rules"] if r.get("IsDefault")
        )
        
        self.elbv2.modify_rule(
            RuleArn=default_rule["RuleArn"],
            Actions=[
                {
                    "Type": "forward",
                    "TargetGroupArn": new_target_group_arn,
                }
            ],
        )
        
        return {
            "switched": True,
            "from": old_target_group_arn.split("/")[1],
            "to": new_target_group_arn.split("/")[1],
        }
    
    def deploy(
        self,
        listener_arn: str,
        blue_tg_arn: str,
        green_tg_arn: str,
        health_timeout: int = 300,
        health_interval: int = 15,
    ) -> dict:
        """Full blue-green deployment flow."""
        # Determine which is currently active
        active_tg = self.get_active_target_group(listener_arn)
        
        if active_tg == blue_tg_arn:
            new_tg, old_tg = green_tg_arn, blue_tg_arn
            label = "blue → green"
        else:
            new_tg, old_tg = blue_tg_arn, green_tg_arn
            label = "green → blue"
        
        logger.info(f"Deploying: {label}")
        
        # Wait for new environment to be healthy
        elapsed = 0
        while elapsed < health_timeout:
            if self.check_target_health(new_tg):
                break
            time.sleep(health_interval)
            elapsed += health_interval
        else:
            return {"deployed": False, "reason": "health check timeout"}
        
        # Switch traffic
        result = self.switch_traffic(listener_arn, new_tg, old_tg)
        result["direction"] = label
        return result

Kubernetes blue-green with service selectors

In Kubernetes, blue-green deployments use separate Deployments with different labels, and the Service selector switches between them:

from kubernetes import client, config
import time
import logging

logger = logging.getLogger(__name__)


class K8sBlueGreen:
    def __init__(self):
        config.load_incluster_config()
        self.apps = client.AppsV1Api()
        self.core = client.CoreV1Api()
    
    def deploy_green(
        self,
        namespace: str,
        deployment_name: str,
        new_image: str,
        replicas: int = 3,
    ) -> dict:
        """Create or update the green deployment with the new image."""
        green_name = f"{deployment_name}-green"
        
        body = client.V1Deployment(
            metadata=client.V1ObjectMeta(
                name=green_name,
                labels={"app": deployment_name, "slot": "green"},
            ),
            spec=client.V1DeploymentSpec(
                replicas=replicas,
                selector=client.V1LabelSelector(
                    match_labels={"app": deployment_name, "slot": "green"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": deployment_name, "slot": "green"}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="app",
                                image=new_image,
                                ports=[client.V1ContainerPort(container_port=8000)],
                                readiness_probe=client.V1Probe(
                                    http_get=client.V1HTTPGetAction(
                                        path="/health", port=8000
                                    ),
                                    initial_delay_seconds=10,
                                    period_seconds=5,
                                ),
                            )
                        ]
                    ),
                ),
            ),
        )
        
        try:
            self.apps.create_namespaced_deployment(namespace, body)
            action = "created"
        except client.ApiException as e:
            if e.status == 409:
                self.apps.patch_namespaced_deployment(green_name, namespace, body)
                action = "updated"
            else:
                raise
        
        return {"deployment": green_name, "image": new_image, "action": action}
    
    def wait_for_ready(
        self,
        namespace: str,
        deployment_name: str,
        timeout: int = 300,
    ) -> bool:
        """Wait until all replicas in the deployment are ready."""
        start = time.time()
        while time.time() - start < timeout:
            dep = self.apps.read_namespaced_deployment(deployment_name, namespace)
            ready = dep.status.ready_replicas or 0
            desired = dep.spec.replicas
            
            if ready >= desired:
                logger.info(f"{deployment_name}: {ready}/{desired} ready")
                return True
            
            time.sleep(10)
        return False
    
    def switch_service(
        self, namespace: str, service_name: str, target_slot: str
    ) -> dict:
        """Update the Service selector to point to blue or green."""
        body = {"spec": {"selector": {"slot": target_slot}}}
        self.core.patch_namespaced_service(service_name, namespace, body)
        return {"service": service_name, "now_pointing_to": target_slot}
    
    def full_deploy(
        self,
        namespace: str,
        app_name: str,
        new_image: str,
    ) -> dict:
        """Orchestrate a full blue-green deployment."""
        # Read current service selector
        svc = self.core.read_namespaced_service(app_name, namespace)
        current_slot = svc.spec.selector.get("slot", "blue")
        new_slot = "green" if current_slot == "blue" else "blue"
        
        # Deploy to the inactive slot
        self.deploy_green(namespace, app_name, new_image)
        green_name = f"{app_name}-{new_slot}"
        
        # Wait for readiness
        if not self.wait_for_ready(namespace, green_name):
            return {"deployed": False, "reason": f"{green_name} not ready"}
        
        # Switch traffic
        result = self.switch_service(namespace, app_name, new_slot)
        result["deployed"] = True
        result["previous_slot"] = current_slot
        return result

Health check orchestration

A robust health check goes beyond HTTP 200. This pattern validates multiple layers:

import httpx
import asyncio
import logging

logger = logging.getLogger(__name__)


async def deep_health_check(base_url: str, checks: list[dict]) -> dict:
    """Run multiple health checks against a deployment.
    
    checks format: [
        {"path": "/health", "expect_status": 200},
        {"path": "/health/db", "expect_status": 200, "expect_body_contains": "connected"},
        {"path": "/api/v1/status", "expect_status": 200},
    ]
    """
    results = {}
    async with httpx.AsyncClient(timeout=10.0) as client:
        for check in checks:
            path = check["path"]
            try:
                resp = await client.get(f"{base_url}{path}")
                passed = resp.status_code == check.get("expect_status", 200)
                
                if passed and "expect_body_contains" in check:
                    passed = check["expect_body_contains"] in resp.text
                
                results[path] = {"passed": passed, "status": resp.status_code}
            except httpx.RequestError as e:
                results[path] = {"passed": False, "error": str(e)}
    
    all_passed = all(r["passed"] for r in results.values())
    return {"healthy": all_passed, "checks": results}


async def wait_for_healthy(
    base_url: str,
    checks: list[dict],
    timeout: int = 300,
    interval: int = 10,
) -> bool:
    elapsed = 0
    while elapsed < timeout:
        result = await deep_health_check(base_url, checks)
        if result["healthy"]:
            logger.info(f"Environment healthy after {elapsed}s")
            return True
        await asyncio.sleep(interval)
        elapsed += interval
    
    logger.error(f"Environment not healthy after {timeout}s")
    return False

Database migration safety

The critical challenge in blue-green deployments — both environments share the same database:

"""
Backward-compatible migration strategy for blue-green deployments.

Phase 1 (deploy with old code still running):
  - ADD new columns (nullable or with defaults)
  - CREATE new tables
  - ADD new indexes
  
Phase 2 (after switch, old code is idle):
  - DROP old columns
  - RENAME if needed
  - ADD NOT NULL constraints
  
Never in a single deployment:
  - DROP columns that old code reads
  - RENAME columns that old code references
  - Change column types in breaking ways
"""


def validate_migration_safety(migration_sql: str) -> list[str]:
    """Check a migration for blue-green safety violations."""
    warnings = []
    dangerous_patterns = [
        ("DROP COLUMN", "Dropping columns breaks the inactive environment"),
        ("RENAME COLUMN", "Renaming columns breaks the inactive environment"),
        ("ALTER COLUMN.*NOT NULL", "Adding NOT NULL may break old code writing NULLs"),
        ("DROP TABLE", "Dropping tables breaks the inactive environment"),
    ]
    
    import re
    upper_sql = migration_sql.upper()
    for pattern, message in dangerous_patterns:
        if re.search(pattern, upper_sql):
            warnings.append(f"⚠️  {message}: found '{pattern}' in migration")
    
    return warnings

Tradeoffs compared to other strategies

StrategyZero downtimeInstant rollbackInfrastructure costDatabase complexity
Blue-green✅ SecondsHigher (2x environments)High (shared, backward-compatible)
Rolling update❌ MinutesSameMedium
Canary✅ SecondsSlightly higherHigh
Recreate❌ Full redeploySameLow

Blue-green is the safest strategy when you need instant rollback capability and can afford the infrastructure overhead. For Python web services (Django, FastAPI, Flask), where stateless application servers are cheap to duplicate, it’s often the default choice.

The one thing to remember: The real complexity in blue-green isn’t the traffic switch — it’s ensuring database migrations are backward-compatible so both environments can run against the same database simultaneously.

pythonblue-greendeploymentdevops

See Also