Canary Releases with Python — Deep Dive
Canary analysis with Prometheus
The heart of automated canary releases is statistical comparison between the canary and baseline. This Python implementation queries Prometheus and makes pass/fail decisions:
import httpx
import statistics
from dataclasses import dataclass, field
from enum import Enum
class CanaryVerdict(Enum):
PASS = "pass"
FAIL = "fail"
INCONCLUSIVE = "inconclusive"
@dataclass
class MetricCheck:
name: str
query_canary: str
query_baseline: str
max_deviation_percent: float = 10.0
direction: str = "lower_is_better" # or "higher_is_better"
@dataclass
class CanaryAnalysis:
verdict: CanaryVerdict
checks: dict = field(default_factory=dict)
summary: str = ""
class CanaryAnalyzer:
def __init__(self, prometheus_url: str):
self.prom_url = prometheus_url.rstrip("/")
self.client = httpx.Client(timeout=30.0)
def _query_range(self, query: str, duration: str = "15m") -> list[float]:
"""Query Prometheus and return a list of float values."""
resp = self.client.get(
f"{self.prom_url}/api/v1/query",
params={"query": f"avg_over_time(({query})[{duration}:1m])"},
)
resp.raise_for_status()
data = resp.json()
values = []
for result in data.get("data", {}).get("result", []):
values.append(float(result["value"][1]))
return values
def analyze(
self, checks: list[MetricCheck], analysis_duration: str = "15m"
) -> CanaryAnalysis:
"""Compare canary and baseline metrics."""
results = {}
failures = 0
for check in checks:
canary_vals = self._query_range(check.query_canary, analysis_duration)
baseline_vals = self._query_range(check.query_baseline, analysis_duration)
if not canary_vals or not baseline_vals:
results[check.name] = {
"verdict": "inconclusive",
"reason": "insufficient data",
}
continue
canary_avg = statistics.mean(canary_vals)
baseline_avg = statistics.mean(baseline_vals)
if baseline_avg == 0:
deviation = 0 if canary_avg == 0 else float("inf")
else:
deviation = ((canary_avg - baseline_avg) / baseline_avg) * 100
if check.direction == "lower_is_better":
passed = deviation <= check.max_deviation_percent
else:
passed = deviation >= -check.max_deviation_percent
results[check.name] = {
"canary": round(canary_avg, 4),
"baseline": round(baseline_avg, 4),
"deviation_percent": round(deviation, 2),
"threshold": check.max_deviation_percent,
"verdict": "pass" if passed else "fail",
}
if not passed:
failures += 1
if failures > 0:
verdict = CanaryVerdict.FAIL
summary = f"{failures} metric(s) exceeded thresholds"
elif any(r.get("verdict") == "inconclusive" for r in results.values()):
verdict = CanaryVerdict.INCONCLUSIVE
summary = "Some metrics had insufficient data"
else:
verdict = CanaryVerdict.PASS
summary = "All metrics within thresholds"
return CanaryAnalysis(verdict=verdict, checks=results, summary=summary)
Usage with typical SRE metrics:
analyzer = CanaryAnalyzer("http://prometheus:9090")
checks = [
MetricCheck(
name="error_rate",
query_canary='sum(rate(http_requests_total{status=~"5..", version="canary"}[1m]))',
query_baseline='sum(rate(http_requests_total{status=~"5..", version="stable"}[1m]))',
max_deviation_percent=50.0, # allow up to 50% higher error rate (on small numbers)
direction="lower_is_better",
),
MetricCheck(
name="p99_latency",
query_canary='histogram_quantile(0.99, rate(http_duration_seconds_bucket{version="canary"}[1m]))',
query_baseline='histogram_quantile(0.99, rate(http_duration_seconds_bucket{version="stable"}[1m]))',
max_deviation_percent=15.0,
direction="lower_is_better",
),
MetricCheck(
name="throughput",
query_canary='sum(rate(http_requests_total{version="canary"}[1m]))',
query_baseline='sum(rate(http_requests_total{version="stable"}[1m]))',
max_deviation_percent=20.0,
direction="higher_is_better",
),
]
result = analyzer.analyze(checks, analysis_duration="10m")
print(f"Verdict: {result.verdict.value} — {result.summary}")
Progressive traffic shifting
This orchestrator manages the full canary lifecycle with configurable steps:
import time
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class CanaryStep:
weight_percent: int
analysis_duration_minutes: int
class CanaryOrchestrator:
def __init__(
self,
traffic_manager, # implements set_canary_weight(int)
analyzer: CanaryAnalyzer,
checks: list[MetricCheck],
):
self.traffic = traffic_manager
self.analyzer = analyzer
self.checks = checks
def execute(self, steps: list[CanaryStep]) -> dict:
"""Run a full canary rollout with progressive traffic shifting."""
for i, step in enumerate(steps):
logger.info(
f"Step {i+1}/{len(steps)}: "
f"setting canary weight to {step.weight_percent}%"
)
self.traffic.set_canary_weight(step.weight_percent)
# Wait for traffic to stabilize
stabilization = max(60, step.analysis_duration_minutes * 10)
logger.info(f"Waiting {stabilization}s for stabilization...")
time.sleep(stabilization)
# Analyze metrics
analysis = self.analyzer.analyze(
self.checks,
analysis_duration=f"{step.analysis_duration_minutes}m",
)
logger.info(
f"Analysis: {analysis.verdict.value} — {analysis.summary}"
)
if analysis.verdict == CanaryVerdict.FAIL:
logger.error("Canary failed — rolling back")
self.traffic.set_canary_weight(0)
return {
"outcome": "rollback",
"failed_at_step": i + 1,
"weight_percent": step.weight_percent,
"analysis": analysis.checks,
}
if analysis.verdict == CanaryVerdict.INCONCLUSIVE:
logger.warning("Inconclusive — holding at current weight")
# Could retry or alert, depending on policy
# All steps passed — promote canary to 100%
logger.info("All steps passed — promoting canary to 100%")
self.traffic.set_canary_weight(100)
return {
"outcome": "promoted",
"steps_completed": len(steps),
}
AWS traffic manager implementation
import boto3
class ALBTrafficManager:
"""Manage canary traffic weight via ALB weighted target groups."""
def __init__(
self,
listener_arn: str,
stable_tg_arn: str,
canary_tg_arn: str,
region: str = "us-east-1",
):
self.elbv2 = boto3.client("elbv2", region_name=region)
self.listener_arn = listener_arn
self.stable_tg = stable_tg_arn
self.canary_tg = canary_tg_arn
def set_canary_weight(self, percent: int) -> None:
rules = self.elbv2.describe_rules(ListenerArn=self.listener_arn)
default_rule = next(r for r in rules["Rules"] if r.get("IsDefault"))
self.elbv2.modify_rule(
RuleArn=default_rule["RuleArn"],
Actions=[
{
"Type": "forward",
"ForwardConfig": {
"TargetGroups": [
{
"TargetGroupArn": self.stable_tg,
"Weight": 100 - percent,
},
{
"TargetGroupArn": self.canary_tg,
"Weight": percent,
},
]
},
}
],
)
Kubernetes with Istio VirtualService
from kubernetes import client, config
import json
class IstioTrafficManager:
"""Manage canary traffic via Istio VirtualService weights."""
def __init__(self, namespace: str, virtualservice_name: str):
config.load_incluster_config()
self.custom = client.CustomObjectsApi()
self.namespace = namespace
self.vs_name = virtualservice_name
def set_canary_weight(self, percent: int) -> None:
patch = {
"spec": {
"http": [
{
"route": [
{
"destination": {
"host": f"{self.vs_name}",
"subset": "stable",
},
"weight": 100 - percent,
},
{
"destination": {
"host": f"{self.vs_name}",
"subset": "canary",
},
"weight": percent,
},
]
}
]
}
}
self.custom.patch_namespaced_custom_object(
group="networking.istio.io",
version="v1beta1",
namespace=self.namespace,
plural="virtualservices",
name=self.vs_name,
body=patch,
)
Putting it all together
# Full canary release pipeline
traffic = ALBTrafficManager(
listener_arn="arn:aws:elasticloadbalancing:...",
stable_tg_arn="arn:aws:elasticloadbalancing:.../stable-tg/...",
canary_tg_arn="arn:aws:elasticloadbalancing:.../canary-tg/...",
)
analyzer = CanaryAnalyzer("http://prometheus:9090")
orchestrator = CanaryOrchestrator(traffic, analyzer, checks)
result = orchestrator.execute([
CanaryStep(weight_percent=1, analysis_duration_minutes=5),
CanaryStep(weight_percent=5, analysis_duration_minutes=10),
CanaryStep(weight_percent=25, analysis_duration_minutes=15),
CanaryStep(weight_percent=50, analysis_duration_minutes=15),
])
if result["outcome"] == "rollback":
# Alert on-call, create incident ticket
print(f"Canary failed at {result['weight_percent']}%")
else:
print("Canary promoted successfully")
Tradeoffs
| Aspect | Simple (ALB weights) | Istio/Argo Rollouts | Custom Python |
|---|---|---|---|
| Setup complexity | Low | High | Medium |
| Traffic granularity | Per-request | Per-request, header-based | Per-request |
| Metric analysis | Manual/scripted | Built-in (Kayenta) | Full control |
| Rollback speed | Seconds | Seconds | Seconds |
| Learning curve | Low | Steep | Medium |
For most Python teams, starting with ALB weighted routing and a custom analysis script provides 80% of the value with 20% of the complexity. Graduate to Istio or Argo Rollouts when you need header-based routing, automatic analysis, or multi-cluster canaries.
The one thing to remember: Automated canary analysis — comparing canary metrics against baseline using statistical thresholds — is what separates real canary releases from “deploy and hope.” Python’s data analysis capabilities make it natural for building these comparison pipelines.
See Also
- Python Blue Green Deployments How Python helps teams switch between two identical server environments so updates never cause downtime
- Python Chaos Engineering Why engineers deliberately break their own systems using Python — and how it prevents real disasters
- Python Compliance As Code How Python turns security rules and regulations into automated checks that run every time code changes
- Python Feature Branch Deployments How teams give every code branch its own live preview website using Python automation
- Python Gitops Patterns How Git becomes the single source of truth for everything running in production — and Python makes it work