Feature Branch Deployments with Python — Deep Dive

Webhook handler with FastAPI

The entry point for feature branch deployments is a webhook that receives events from your Git provider. This FastAPI handler processes GitHub pull request events:

import hashlib
import hmac
import logging
from fastapi import FastAPI, Request, HTTPException

logger = logging.getLogger(__name__)
app = FastAPI()

WEBHOOK_SECRET = "your-webhook-secret"


def verify_github_signature(payload: bytes, signature: str) -> bool:
    expected = "sha256=" + hmac.new(
        WEBHOOK_SECRET.encode(), payload, hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, signature)


@app.post("/webhooks/github")
async def handle_github_webhook(request: Request):
    body = await request.body()
    signature = request.headers.get("X-Hub-Signature-256", "")
    
    if not verify_github_signature(body, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    event = request.headers.get("X-GitHub-Event")
    payload = await request.json()
    
    if event == "pull_request":
        action = payload["action"]
        pr_number = payload["pull_request"]["number"]
        branch = payload["pull_request"]["head"]["ref"]
        repo = payload["repository"]["full_name"]
        
        if action in ("opened", "synchronize", "reopened"):
            logger.info(f"Deploying preview for PR #{pr_number} ({branch})")
            env = await deploy_preview(repo, pr_number, branch)
            await post_preview_url(repo, pr_number, env["url"])
            return {"status": "deployed", "url": env["url"]}
        
        elif action == "closed":
            logger.info(f"Destroying preview for PR #{pr_number}")
            await destroy_preview(pr_number)
            return {"status": "destroyed"}
    
    return {"status": "ignored"}

Kubernetes namespace-per-branch

Each preview gets its own Kubernetes namespace with isolated resources:

from kubernetes import client, config
import yaml
import logging

logger = logging.getLogger(__name__)


class PreviewEnvironment:
    def __init__(self):
        config.load_incluster_config()
        self.core = client.CoreV1Api()
        self.apps = client.AppsV1Api()
        self.networking = client.NetworkingV1Api()
    
    def namespace_name(self, pr_number: int) -> str:
        return f"preview-pr-{pr_number}"
    
    def create(
        self,
        pr_number: int,
        image: str,
        base_domain: str = "preview.example.com",
    ) -> dict:
        ns_name = self.namespace_name(pr_number)
        hostname = f"pr-{pr_number}.{base_domain}"
        
        # Create namespace
        self.core.create_namespace(
            client.V1Namespace(
                metadata=client.V1ObjectMeta(
                    name=ns_name,
                    labels={
                        "preview": "true",
                        "pr-number": str(pr_number),
                    },
                    annotations={
                        "preview/created-at": self._now_iso(),
                    },
                )
            )
        )
        
        # Deploy application
        self.apps.create_namespaced_deployment(
            ns_name,
            self._build_deployment(ns_name, image, pr_number),
        )
        
        # Create service
        self.core.create_namespaced_service(
            ns_name,
            self._build_service(ns_name),
        )
        
        # Create ingress
        self.networking.create_namespaced_ingress(
            ns_name,
            self._build_ingress(ns_name, hostname),
        )
        
        logger.info(f"Preview environment created: https://{hostname}")
        return {"namespace": ns_name, "url": f"https://{hostname}"}
    
    def destroy(self, pr_number: int) -> dict:
        ns_name = self.namespace_name(pr_number)
        try:
            self.core.delete_namespace(ns_name)
            return {"destroyed": True, "namespace": ns_name}
        except client.ApiException as e:
            if e.status == 404:
                return {"destroyed": False, "reason": "not found"}
            raise
    
    def _build_deployment(
        self, namespace: str, image: str, pr_number: int
    ) -> client.V1Deployment:
        return client.V1Deployment(
            metadata=client.V1ObjectMeta(name="app"),
            spec=client.V1DeploymentSpec(
                replicas=1,  # single replica for previews
                selector=client.V1LabelSelector(
                    match_labels={"app": "preview"}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": "preview"}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name="app",
                                image=image,
                                ports=[
                                    client.V1ContainerPort(container_port=8000)
                                ],
                                env=[
                                    client.V1EnvVar(
                                        name="DATABASE_URL",
                                        value=f"postgresql://preview:pass@db/pr_{pr_number}",
                                    ),
                                    client.V1EnvVar(
                                        name="ENVIRONMENT",
                                        value="preview",
                                    ),
                                ],
                                resources=client.V1ResourceRequirements(
                                    requests={"cpu": "100m", "memory": "128Mi"},
                                    limits={"cpu": "500m", "memory": "512Mi"},
                                ),
                                readiness_probe=client.V1Probe(
                                    http_get=client.V1HTTPGetAction(
                                        path="/health", port=8000
                                    ),
                                    initial_delay_seconds=15,
                                    period_seconds=10,
                                ),
                            )
                        ]
                    ),
                ),
            ),
        )
    
    def _build_service(self, namespace: str) -> client.V1Service:
        return client.V1Service(
            metadata=client.V1ObjectMeta(name="app"),
            spec=client.V1ServiceSpec(
                selector={"app": "preview"},
                ports=[client.V1ServicePort(port=80, target_port=8000)],
            ),
        )
    
    def _build_ingress(
        self, namespace: str, hostname: str
    ) -> client.V1Ingress:
        return client.V1Ingress(
            metadata=client.V1ObjectMeta(
                name="app",
                annotations={
                    "cert-manager.io/cluster-issuer": "letsencrypt-prod",
                },
            ),
            spec=client.V1IngressSpec(
                tls=[
                    client.V1IngressTLS(
                        hosts=[hostname],
                        secret_name=f"tls-{namespace}",
                    )
                ],
                rules=[
                    client.V1IngressRule(
                        host=hostname,
                        http=client.V1HTTPIngressRuleValue(
                            paths=[
                                client.V1HTTPIngressPath(
                                    path="/",
                                    path_type="Prefix",
                                    backend=client.V1IngressBackend(
                                        service=client.V1IngressServiceBackend(
                                            name="app",
                                            port=client.V1ServicePort(
                                                number=80
                                            ),
                                        )
                                    ),
                                )
                            ]
                        ),
                    )
                ],
            ),
        )
    
    def _now_iso(self) -> str:
        from datetime import datetime, timezone
        return datetime.now(timezone.utc).isoformat()

Database provisioning

Each preview needs its own database. This approach uses PostgreSQL schemas:

import psycopg2
from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)


class PreviewDatabaseManager:
    def __init__(self, admin_dsn: str):
        self.admin_dsn = admin_dsn
    
    @contextmanager
    def _conn(self):
        conn = psycopg2.connect(self.admin_dsn)
        conn.autocommit = True
        try:
            yield conn.cursor()
        finally:
            conn.close()
    
    def create_database(self, pr_number: int, template: str = "app_template") -> str:
        """Create an isolated database from a template for a preview environment."""
        db_name = f"pr_{pr_number}"
        
        with self._conn() as cur:
            # Terminate connections to template (required for CREATE DATABASE ... TEMPLATE)
            cur.execute(
                "SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
                "WHERE datname = %s AND pid <> pg_backend_pid()",
                (template,),
            )
            cur.execute(
                f'CREATE DATABASE "{db_name}" TEMPLATE "{template}"'
            )
        
        logger.info(f"Created preview database: {db_name}")
        return db_name
    
    def destroy_database(self, pr_number: int) -> None:
        db_name = f"pr_{pr_number}"
        
        with self._conn() as cur:
            cur.execute(
                "SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
                "WHERE datname = %s AND pid <> pg_backend_pid()",
                (db_name,),
            )
            cur.execute(f'DROP DATABASE IF EXISTS "{db_name}"')
        
        logger.info(f"Destroyed preview database: {db_name}")
    
    def seed_data(self, pr_number: int, seed_file: str) -> None:
        """Apply seed data to a preview database."""
        db_name = f"pr_{pr_number}"
        dsn = self.admin_dsn.rsplit("/", 1)[0] + f"/{db_name}"
        
        conn = psycopg2.connect(dsn)
        try:
            with conn.cursor() as cur:
                with open(seed_file) as f:
                    cur.execute(f.read())
            conn.commit()
        finally:
            conn.close()

GitHub PR comment with preview URL

import httpx


async def post_preview_url(repo: str, pr_number: int, url: str) -> None:
    """Post or update a comment on the PR with the preview URL."""
    token = "ghp_..."
    headers = {
        "Authorization": f"token {token}",
        "Accept": "application/vnd.github.v3+json",
    }
    
    marker = "<!-- preview-environment -->"
    body = (
        f"{marker}\n"
        f"## 🚀 Preview Environment\n\n"
        f"**URL:** [{url}]({url})\n\n"
        f"This environment updates automatically when you push to this branch.\n"
        f"It will be destroyed when this PR is closed or merged."
    )
    
    async with httpx.AsyncClient() as client:
        # Check for existing preview comment
        comments = await client.get(
            f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments",
            headers=headers,
        )
        
        existing = None
        for comment in comments.json():
            if marker in comment.get("body", ""):
                existing = comment["id"]
                break
        
        if existing:
            await client.patch(
                f"https://api.github.com/repos/{repo}/issues/comments/{existing}",
                headers=headers,
                json={"body": body},
            )
        else:
            await client.post(
                f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments",
                headers=headers,
                json={"body": body},
            )

Automatic cleanup for stale environments

from kubernetes import client, config
from datetime import datetime, timezone, timedelta
import logging

logger = logging.getLogger(__name__)


def cleanup_stale_previews(max_age_hours: int = 72) -> list[str]:
    """Find and destroy preview namespaces older than max_age_hours."""
    config.load_incluster_config()
    core = client.CoreV1Api()
    
    namespaces = core.list_namespace(label_selector="preview=true")
    cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
    destroyed = []
    
    for ns in namespaces.items:
        created_str = ns.metadata.annotations.get("preview/created-at", "")
        if not created_str:
            continue
        
        created = datetime.fromisoformat(created_str)
        if created < cutoff:
            logger.info(
                f"Destroying stale preview: {ns.metadata.name} "
                f"(age: {(datetime.now(timezone.utc) - created).days} days)"
            )
            core.delete_namespace(ns.metadata.name)
            destroyed.append(ns.metadata.name)
    
    return destroyed

Docker Compose alternative for smaller teams

Not every team needs Kubernetes. For smaller setups, Docker Compose per branch works:

import subprocess
import os
from pathlib import Path
import logging

logger = logging.getLogger(__name__)


class DockerComposePreview:
    def __init__(self, base_dir: str = "/opt/previews"):
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(parents=True, exist_ok=True)
    
    def create(self, pr_number: int, image: str, port_offset: int = 0) -> dict:
        preview_dir = self.base_dir / f"pr-{pr_number}"
        preview_dir.mkdir(exist_ok=True)
        
        port = 9000 + pr_number + port_offset
        
        compose = f"""
version: '3.8'
services:
  app:
    image: {image}
    ports:
      - "{port}:8000"
    environment:
      - DATABASE_URL=postgresql://app:app@db/app
      - ENVIRONMENT=preview
    depends_on:
      db:
        condition: service_healthy
  db:
    image: postgres:16
    environment:
      - POSTGRES_DB=app
      - POSTGRES_USER=app
      - POSTGRES_PASSWORD=app
    healthcheck:
      test: pg_isready -U app
      interval: 5s
      retries: 5
    volumes:
      - pgdata:/var/lib/postgresql/data
volumes:
  pgdata:
"""
        
        (preview_dir / "docker-compose.yml").write_text(compose)
        
        subprocess.run(
            ["docker", "compose", "up", "-d"],
            cwd=preview_dir,
            check=True,
        )
        
        url = f"http://preview.example.com:{port}"
        logger.info(f"Preview PR #{pr_number} at {url}")
        return {"url": url, "port": port}
    
    def destroy(self, pr_number: int) -> None:
        preview_dir = self.base_dir / f"pr-{pr_number}"
        if preview_dir.exists():
            subprocess.run(
                ["docker", "compose", "down", "-v"],
                cwd=preview_dir,
                check=True,
            )
            import shutil
            shutil.rmtree(preview_dir)

Cost optimization strategies

StrategyMonthly savingsImplementation effort
Auto-shutdown after 2h inactivity40-60%Medium — needs access log monitoring
Smaller instance sizes for previews30-50%Low — just resource limits
Shared database with schema isolation20-30%Medium — migration changes
Spot/preemptible instances60-80%Low — tolerate occasional restarts
Scale to zero (Knative/Cloud Run)70-90%High — needs serverless-compatible app

The biggest cost savings come from not running environments nobody is looking at. A Python cron job that checks Nginx access logs and shuts down idle previews pays for itself quickly.

The one thing to remember: Feature branch deployments are most valuable when they’re cheap and automatic. Start with Docker Compose for small teams, graduate to Kubernetes namespaces at scale, and always implement automatic cleanup to avoid runaway costs.

pythonfeature-branchesdeploymentdevops

See Also