Canary Releases with Python — Deep Dive

Canary analysis with Prometheus

The heart of automated canary releases is statistical comparison between the canary and baseline. This Python implementation queries Prometheus and makes pass/fail decisions:

import httpx
import statistics
from dataclasses import dataclass, field
from enum import Enum


class CanaryVerdict(Enum):
    PASS = "pass"
    FAIL = "fail"
    INCONCLUSIVE = "inconclusive"


@dataclass
class MetricCheck:
    name: str
    query_canary: str
    query_baseline: str
    max_deviation_percent: float = 10.0
    direction: str = "lower_is_better"  # or "higher_is_better"


@dataclass
class CanaryAnalysis:
    verdict: CanaryVerdict
    checks: dict = field(default_factory=dict)
    summary: str = ""


class CanaryAnalyzer:
    def __init__(self, prometheus_url: str):
        self.prom_url = prometheus_url.rstrip("/")
        self.client = httpx.Client(timeout=30.0)
    
    def _query_range(self, query: str, duration: str = "15m") -> list[float]:
        """Query Prometheus and return a list of float values."""
        resp = self.client.get(
            f"{self.prom_url}/api/v1/query",
            params={"query": f"avg_over_time(({query})[{duration}:1m])"},
        )
        resp.raise_for_status()
        data = resp.json()
        
        values = []
        for result in data.get("data", {}).get("result", []):
            values.append(float(result["value"][1]))
        return values
    
    def analyze(
        self, checks: list[MetricCheck], analysis_duration: str = "15m"
    ) -> CanaryAnalysis:
        """Compare canary and baseline metrics."""
        results = {}
        failures = 0
        
        for check in checks:
            canary_vals = self._query_range(check.query_canary, analysis_duration)
            baseline_vals = self._query_range(check.query_baseline, analysis_duration)
            
            if not canary_vals or not baseline_vals:
                results[check.name] = {
                    "verdict": "inconclusive",
                    "reason": "insufficient data",
                }
                continue
            
            canary_avg = statistics.mean(canary_vals)
            baseline_avg = statistics.mean(baseline_vals)
            
            if baseline_avg == 0:
                deviation = 0 if canary_avg == 0 else float("inf")
            else:
                deviation = ((canary_avg - baseline_avg) / baseline_avg) * 100
            
            if check.direction == "lower_is_better":
                passed = deviation <= check.max_deviation_percent
            else:
                passed = deviation >= -check.max_deviation_percent
            
            results[check.name] = {
                "canary": round(canary_avg, 4),
                "baseline": round(baseline_avg, 4),
                "deviation_percent": round(deviation, 2),
                "threshold": check.max_deviation_percent,
                "verdict": "pass" if passed else "fail",
            }
            
            if not passed:
                failures += 1
        
        if failures > 0:
            verdict = CanaryVerdict.FAIL
            summary = f"{failures} metric(s) exceeded thresholds"
        elif any(r.get("verdict") == "inconclusive" for r in results.values()):
            verdict = CanaryVerdict.INCONCLUSIVE
            summary = "Some metrics had insufficient data"
        else:
            verdict = CanaryVerdict.PASS
            summary = "All metrics within thresholds"
        
        return CanaryAnalysis(verdict=verdict, checks=results, summary=summary)

Usage with typical SRE metrics:

analyzer = CanaryAnalyzer("http://prometheus:9090")

checks = [
    MetricCheck(
        name="error_rate",
        query_canary='sum(rate(http_requests_total{status=~"5..", version="canary"}[1m]))',
        query_baseline='sum(rate(http_requests_total{status=~"5..", version="stable"}[1m]))',
        max_deviation_percent=50.0,  # allow up to 50% higher error rate (on small numbers)
        direction="lower_is_better",
    ),
    MetricCheck(
        name="p99_latency",
        query_canary='histogram_quantile(0.99, rate(http_duration_seconds_bucket{version="canary"}[1m]))',
        query_baseline='histogram_quantile(0.99, rate(http_duration_seconds_bucket{version="stable"}[1m]))',
        max_deviation_percent=15.0,
        direction="lower_is_better",
    ),
    MetricCheck(
        name="throughput",
        query_canary='sum(rate(http_requests_total{version="canary"}[1m]))',
        query_baseline='sum(rate(http_requests_total{version="stable"}[1m]))',
        max_deviation_percent=20.0,
        direction="higher_is_better",
    ),
]

result = analyzer.analyze(checks, analysis_duration="10m")
print(f"Verdict: {result.verdict.value}{result.summary}")

Progressive traffic shifting

This orchestrator manages the full canary lifecycle with configurable steps:

import time
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class CanaryStep:
    weight_percent: int
    analysis_duration_minutes: int
    
    
class CanaryOrchestrator:
    def __init__(
        self,
        traffic_manager,  # implements set_canary_weight(int)
        analyzer: CanaryAnalyzer,
        checks: list[MetricCheck],
    ):
        self.traffic = traffic_manager
        self.analyzer = analyzer
        self.checks = checks
    
    def execute(self, steps: list[CanaryStep]) -> dict:
        """Run a full canary rollout with progressive traffic shifting."""
        for i, step in enumerate(steps):
            logger.info(
                f"Step {i+1}/{len(steps)}: "
                f"setting canary weight to {step.weight_percent}%"
            )
            self.traffic.set_canary_weight(step.weight_percent)
            
            # Wait for traffic to stabilize
            stabilization = max(60, step.analysis_duration_minutes * 10)
            logger.info(f"Waiting {stabilization}s for stabilization...")
            time.sleep(stabilization)
            
            # Analyze metrics
            analysis = self.analyzer.analyze(
                self.checks,
                analysis_duration=f"{step.analysis_duration_minutes}m",
            )
            
            logger.info(
                f"Analysis: {analysis.verdict.value}{analysis.summary}"
            )
            
            if analysis.verdict == CanaryVerdict.FAIL:
                logger.error("Canary failed — rolling back")
                self.traffic.set_canary_weight(0)
                return {
                    "outcome": "rollback",
                    "failed_at_step": i + 1,
                    "weight_percent": step.weight_percent,
                    "analysis": analysis.checks,
                }
            
            if analysis.verdict == CanaryVerdict.INCONCLUSIVE:
                logger.warning("Inconclusive — holding at current weight")
                # Could retry or alert, depending on policy
        
        # All steps passed — promote canary to 100%
        logger.info("All steps passed — promoting canary to 100%")
        self.traffic.set_canary_weight(100)
        
        return {
            "outcome": "promoted",
            "steps_completed": len(steps),
        }

AWS traffic manager implementation

import boto3


class ALBTrafficManager:
    """Manage canary traffic weight via ALB weighted target groups."""
    
    def __init__(
        self,
        listener_arn: str,
        stable_tg_arn: str,
        canary_tg_arn: str,
        region: str = "us-east-1",
    ):
        self.elbv2 = boto3.client("elbv2", region_name=region)
        self.listener_arn = listener_arn
        self.stable_tg = stable_tg_arn
        self.canary_tg = canary_tg_arn
    
    def set_canary_weight(self, percent: int) -> None:
        rules = self.elbv2.describe_rules(ListenerArn=self.listener_arn)
        default_rule = next(r for r in rules["Rules"] if r.get("IsDefault"))
        
        self.elbv2.modify_rule(
            RuleArn=default_rule["RuleArn"],
            Actions=[
                {
                    "Type": "forward",
                    "ForwardConfig": {
                        "TargetGroups": [
                            {
                                "TargetGroupArn": self.stable_tg,
                                "Weight": 100 - percent,
                            },
                            {
                                "TargetGroupArn": self.canary_tg,
                                "Weight": percent,
                            },
                        ]
                    },
                }
            ],
        )

Kubernetes with Istio VirtualService

from kubernetes import client, config
import json


class IstioTrafficManager:
    """Manage canary traffic via Istio VirtualService weights."""
    
    def __init__(self, namespace: str, virtualservice_name: str):
        config.load_incluster_config()
        self.custom = client.CustomObjectsApi()
        self.namespace = namespace
        self.vs_name = virtualservice_name
    
    def set_canary_weight(self, percent: int) -> None:
        patch = {
            "spec": {
                "http": [
                    {
                        "route": [
                            {
                                "destination": {
                                    "host": f"{self.vs_name}",
                                    "subset": "stable",
                                },
                                "weight": 100 - percent,
                            },
                            {
                                "destination": {
                                    "host": f"{self.vs_name}",
                                    "subset": "canary",
                                },
                                "weight": percent,
                            },
                        ]
                    }
                ]
            }
        }
        
        self.custom.patch_namespaced_custom_object(
            group="networking.istio.io",
            version="v1beta1",
            namespace=self.namespace,
            plural="virtualservices",
            name=self.vs_name,
            body=patch,
        )

Putting it all together

# Full canary release pipeline
traffic = ALBTrafficManager(
    listener_arn="arn:aws:elasticloadbalancing:...",
    stable_tg_arn="arn:aws:elasticloadbalancing:.../stable-tg/...",
    canary_tg_arn="arn:aws:elasticloadbalancing:.../canary-tg/...",
)

analyzer = CanaryAnalyzer("http://prometheus:9090")

orchestrator = CanaryOrchestrator(traffic, analyzer, checks)

result = orchestrator.execute([
    CanaryStep(weight_percent=1, analysis_duration_minutes=5),
    CanaryStep(weight_percent=5, analysis_duration_minutes=10),
    CanaryStep(weight_percent=25, analysis_duration_minutes=15),
    CanaryStep(weight_percent=50, analysis_duration_minutes=15),
])

if result["outcome"] == "rollback":
    # Alert on-call, create incident ticket
    print(f"Canary failed at {result['weight_percent']}%")
else:
    print("Canary promoted successfully")

Tradeoffs

AspectSimple (ALB weights)Istio/Argo RolloutsCustom Python
Setup complexityLowHighMedium
Traffic granularityPer-requestPer-request, header-basedPer-request
Metric analysisManual/scriptedBuilt-in (Kayenta)Full control
Rollback speedSecondsSecondsSeconds
Learning curveLowSteepMedium

For most Python teams, starting with ALB weighted routing and a custom analysis script provides 80% of the value with 20% of the complexity. Graduate to Istio or Argo Rollouts when you need header-based routing, automatic analysis, or multi-cluster canaries.

The one thing to remember: Automated canary analysis — comparing canary metrics against baseline using statistical thresholds — is what separates real canary releases from “deploy and hope.” Python’s data analysis capabilities make it natural for building these comparison pipelines.

pythoncanary-releasedeploymentdevops

See Also