Blue-Green Deployments with Python — Deep Dive
AWS blue-green with Python and boto3
The most common blue-green pattern on AWS uses two Auto Scaling Groups (ASGs) behind an Application Load Balancer (ALB). The Python orchestrator manages which target group receives traffic:
import boto3
import time
import logging
logger = logging.getLogger(__name__)
class BlueGreenDeployer:
def __init__(self, region: str = "us-east-1"):
self.elbv2 = boto3.client("elbv2", region_name=region)
self.ec2 = boto3.client("ec2", region_name=region)
def get_active_target_group(self, listener_arn: str) -> str:
"""Find which target group currently receives traffic."""
response = self.elbv2.describe_rules(ListenerArn=listener_arn)
for rule in response["Rules"]:
if rule.get("IsDefault"):
for action in rule["Actions"]:
if action["Type"] == "forward":
return action["TargetGroupArn"]
raise RuntimeError("No default forwarding rule found")
def check_target_health(
self, target_group_arn: str, min_healthy: int = 2
) -> bool:
"""Verify enough targets are healthy in the target group."""
response = self.elbv2.describe_target_health(
TargetGroupArn=target_group_arn
)
healthy = sum(
1 for t in response["TargetHealthDescriptions"]
if t["TargetHealth"]["State"] == "healthy"
)
logger.info(f"Healthy targets: {healthy}/{min_healthy} required")
return healthy >= min_healthy
def switch_traffic(
self,
listener_arn: str,
new_target_group_arn: str,
old_target_group_arn: str,
) -> dict:
"""Switch the default listener rule to the new target group."""
rules = self.elbv2.describe_rules(ListenerArn=listener_arn)
default_rule = next(
r for r in rules["Rules"] if r.get("IsDefault")
)
self.elbv2.modify_rule(
RuleArn=default_rule["RuleArn"],
Actions=[
{
"Type": "forward",
"TargetGroupArn": new_target_group_arn,
}
],
)
return {
"switched": True,
"from": old_target_group_arn.split("/")[1],
"to": new_target_group_arn.split("/")[1],
}
def deploy(
self,
listener_arn: str,
blue_tg_arn: str,
green_tg_arn: str,
health_timeout: int = 300,
health_interval: int = 15,
) -> dict:
"""Full blue-green deployment flow."""
# Determine which is currently active
active_tg = self.get_active_target_group(listener_arn)
if active_tg == blue_tg_arn:
new_tg, old_tg = green_tg_arn, blue_tg_arn
label = "blue → green"
else:
new_tg, old_tg = blue_tg_arn, green_tg_arn
label = "green → blue"
logger.info(f"Deploying: {label}")
# Wait for new environment to be healthy
elapsed = 0
while elapsed < health_timeout:
if self.check_target_health(new_tg):
break
time.sleep(health_interval)
elapsed += health_interval
else:
return {"deployed": False, "reason": "health check timeout"}
# Switch traffic
result = self.switch_traffic(listener_arn, new_tg, old_tg)
result["direction"] = label
return result
Kubernetes blue-green with service selectors
In Kubernetes, blue-green deployments use separate Deployments with different labels, and the Service selector switches between them:
from kubernetes import client, config
import time
import logging
logger = logging.getLogger(__name__)
class K8sBlueGreen:
def __init__(self):
config.load_incluster_config()
self.apps = client.AppsV1Api()
self.core = client.CoreV1Api()
def deploy_green(
self,
namespace: str,
deployment_name: str,
new_image: str,
replicas: int = 3,
) -> dict:
"""Create or update the green deployment with the new image."""
green_name = f"{deployment_name}-green"
body = client.V1Deployment(
metadata=client.V1ObjectMeta(
name=green_name,
labels={"app": deployment_name, "slot": "green"},
),
spec=client.V1DeploymentSpec(
replicas=replicas,
selector=client.V1LabelSelector(
match_labels={"app": deployment_name, "slot": "green"}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": deployment_name, "slot": "green"}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="app",
image=new_image,
ports=[client.V1ContainerPort(container_port=8000)],
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/health", port=8000
),
initial_delay_seconds=10,
period_seconds=5,
),
)
]
),
),
),
)
try:
self.apps.create_namespaced_deployment(namespace, body)
action = "created"
except client.ApiException as e:
if e.status == 409:
self.apps.patch_namespaced_deployment(green_name, namespace, body)
action = "updated"
else:
raise
return {"deployment": green_name, "image": new_image, "action": action}
def wait_for_ready(
self,
namespace: str,
deployment_name: str,
timeout: int = 300,
) -> bool:
"""Wait until all replicas in the deployment are ready."""
start = time.time()
while time.time() - start < timeout:
dep = self.apps.read_namespaced_deployment(deployment_name, namespace)
ready = dep.status.ready_replicas or 0
desired = dep.spec.replicas
if ready >= desired:
logger.info(f"{deployment_name}: {ready}/{desired} ready")
return True
time.sleep(10)
return False
def switch_service(
self, namespace: str, service_name: str, target_slot: str
) -> dict:
"""Update the Service selector to point to blue or green."""
body = {"spec": {"selector": {"slot": target_slot}}}
self.core.patch_namespaced_service(service_name, namespace, body)
return {"service": service_name, "now_pointing_to": target_slot}
def full_deploy(
self,
namespace: str,
app_name: str,
new_image: str,
) -> dict:
"""Orchestrate a full blue-green deployment."""
# Read current service selector
svc = self.core.read_namespaced_service(app_name, namespace)
current_slot = svc.spec.selector.get("slot", "blue")
new_slot = "green" if current_slot == "blue" else "blue"
# Deploy to the inactive slot
self.deploy_green(namespace, app_name, new_image)
green_name = f"{app_name}-{new_slot}"
# Wait for readiness
if not self.wait_for_ready(namespace, green_name):
return {"deployed": False, "reason": f"{green_name} not ready"}
# Switch traffic
result = self.switch_service(namespace, app_name, new_slot)
result["deployed"] = True
result["previous_slot"] = current_slot
return result
Health check orchestration
A robust health check goes beyond HTTP 200. This pattern validates multiple layers:
import httpx
import asyncio
import logging
logger = logging.getLogger(__name__)
async def deep_health_check(base_url: str, checks: list[dict]) -> dict:
"""Run multiple health checks against a deployment.
checks format: [
{"path": "/health", "expect_status": 200},
{"path": "/health/db", "expect_status": 200, "expect_body_contains": "connected"},
{"path": "/api/v1/status", "expect_status": 200},
]
"""
results = {}
async with httpx.AsyncClient(timeout=10.0) as client:
for check in checks:
path = check["path"]
try:
resp = await client.get(f"{base_url}{path}")
passed = resp.status_code == check.get("expect_status", 200)
if passed and "expect_body_contains" in check:
passed = check["expect_body_contains"] in resp.text
results[path] = {"passed": passed, "status": resp.status_code}
except httpx.RequestError as e:
results[path] = {"passed": False, "error": str(e)}
all_passed = all(r["passed"] for r in results.values())
return {"healthy": all_passed, "checks": results}
async def wait_for_healthy(
base_url: str,
checks: list[dict],
timeout: int = 300,
interval: int = 10,
) -> bool:
elapsed = 0
while elapsed < timeout:
result = await deep_health_check(base_url, checks)
if result["healthy"]:
logger.info(f"Environment healthy after {elapsed}s")
return True
await asyncio.sleep(interval)
elapsed += interval
logger.error(f"Environment not healthy after {timeout}s")
return False
Database migration safety
The critical challenge in blue-green deployments — both environments share the same database:
"""
Backward-compatible migration strategy for blue-green deployments.
Phase 1 (deploy with old code still running):
- ADD new columns (nullable or with defaults)
- CREATE new tables
- ADD new indexes
Phase 2 (after switch, old code is idle):
- DROP old columns
- RENAME if needed
- ADD NOT NULL constraints
Never in a single deployment:
- DROP columns that old code reads
- RENAME columns that old code references
- Change column types in breaking ways
"""
def validate_migration_safety(migration_sql: str) -> list[str]:
"""Check a migration for blue-green safety violations."""
warnings = []
dangerous_patterns = [
("DROP COLUMN", "Dropping columns breaks the inactive environment"),
("RENAME COLUMN", "Renaming columns breaks the inactive environment"),
("ALTER COLUMN.*NOT NULL", "Adding NOT NULL may break old code writing NULLs"),
("DROP TABLE", "Dropping tables breaks the inactive environment"),
]
import re
upper_sql = migration_sql.upper()
for pattern, message in dangerous_patterns:
if re.search(pattern, upper_sql):
warnings.append(f"⚠️ {message}: found '{pattern}' in migration")
return warnings
Tradeoffs compared to other strategies
| Strategy | Zero downtime | Instant rollback | Infrastructure cost | Database complexity |
|---|---|---|---|---|
| Blue-green | ✅ | ✅ Seconds | Higher (2x environments) | High (shared, backward-compatible) |
| Rolling update | ✅ | ❌ Minutes | Same | Medium |
| Canary | ✅ | ✅ Seconds | Slightly higher | High |
| Recreate | ❌ | ❌ Full redeploy | Same | Low |
Blue-green is the safest strategy when you need instant rollback capability and can afford the infrastructure overhead. For Python web services (Django, FastAPI, Flask), where stateless application servers are cheap to duplicate, it’s often the default choice.
The one thing to remember: The real complexity in blue-green isn’t the traffic switch — it’s ensuring database migrations are backward-compatible so both environments can run against the same database simultaneously.
See Also
- Python Canary Releases Why teams send new code to just a few users first — and how Python manages the gradual rollout
- Python Chaos Engineering Why engineers deliberately break their own systems using Python — and how it prevents real disasters
- Python Compliance As Code How Python turns security rules and regulations into automated checks that run every time code changes
- Python Feature Branch Deployments How teams give every code branch its own live preview website using Python automation
- Python Gitops Patterns How Git becomes the single source of truth for everything running in production — and Python makes it work