Feature Branch Deployments with Python — Deep Dive
Webhook handler with FastAPI
The entry point for feature branch deployments is a webhook that receives events from your Git provider. This FastAPI handler processes GitHub pull request events:
import hashlib
import hmac
import logging
from fastapi import FastAPI, Request, HTTPException
logger = logging.getLogger(__name__)
app = FastAPI()
WEBHOOK_SECRET = "your-webhook-secret"
def verify_github_signature(payload: bytes, signature: str) -> bool:
expected = "sha256=" + hmac.new(
WEBHOOK_SECRET.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhooks/github")
async def handle_github_webhook(request: Request):
body = await request.body()
signature = request.headers.get("X-Hub-Signature-256", "")
if not verify_github_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid signature")
event = request.headers.get("X-GitHub-Event")
payload = await request.json()
if event == "pull_request":
action = payload["action"]
pr_number = payload["pull_request"]["number"]
branch = payload["pull_request"]["head"]["ref"]
repo = payload["repository"]["full_name"]
if action in ("opened", "synchronize", "reopened"):
logger.info(f"Deploying preview for PR #{pr_number} ({branch})")
env = await deploy_preview(repo, pr_number, branch)
await post_preview_url(repo, pr_number, env["url"])
return {"status": "deployed", "url": env["url"]}
elif action == "closed":
logger.info(f"Destroying preview for PR #{pr_number}")
await destroy_preview(pr_number)
return {"status": "destroyed"}
return {"status": "ignored"}
Kubernetes namespace-per-branch
Each preview gets its own Kubernetes namespace with isolated resources:
from kubernetes import client, config
import yaml
import logging
logger = logging.getLogger(__name__)
class PreviewEnvironment:
def __init__(self):
config.load_incluster_config()
self.core = client.CoreV1Api()
self.apps = client.AppsV1Api()
self.networking = client.NetworkingV1Api()
def namespace_name(self, pr_number: int) -> str:
return f"preview-pr-{pr_number}"
def create(
self,
pr_number: int,
image: str,
base_domain: str = "preview.example.com",
) -> dict:
ns_name = self.namespace_name(pr_number)
hostname = f"pr-{pr_number}.{base_domain}"
# Create namespace
self.core.create_namespace(
client.V1Namespace(
metadata=client.V1ObjectMeta(
name=ns_name,
labels={
"preview": "true",
"pr-number": str(pr_number),
},
annotations={
"preview/created-at": self._now_iso(),
},
)
)
)
# Deploy application
self.apps.create_namespaced_deployment(
ns_name,
self._build_deployment(ns_name, image, pr_number),
)
# Create service
self.core.create_namespaced_service(
ns_name,
self._build_service(ns_name),
)
# Create ingress
self.networking.create_namespaced_ingress(
ns_name,
self._build_ingress(ns_name, hostname),
)
logger.info(f"Preview environment created: https://{hostname}")
return {"namespace": ns_name, "url": f"https://{hostname}"}
def destroy(self, pr_number: int) -> dict:
ns_name = self.namespace_name(pr_number)
try:
self.core.delete_namespace(ns_name)
return {"destroyed": True, "namespace": ns_name}
except client.ApiException as e:
if e.status == 404:
return {"destroyed": False, "reason": "not found"}
raise
def _build_deployment(
self, namespace: str, image: str, pr_number: int
) -> client.V1Deployment:
return client.V1Deployment(
metadata=client.V1ObjectMeta(name="app"),
spec=client.V1DeploymentSpec(
replicas=1, # single replica for previews
selector=client.V1LabelSelector(
match_labels={"app": "preview"}
),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(
labels={"app": "preview"}
),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="app",
image=image,
ports=[
client.V1ContainerPort(container_port=8000)
],
env=[
client.V1EnvVar(
name="DATABASE_URL",
value=f"postgresql://preview:pass@db/pr_{pr_number}",
),
client.V1EnvVar(
name="ENVIRONMENT",
value="preview",
),
],
resources=client.V1ResourceRequirements(
requests={"cpu": "100m", "memory": "128Mi"},
limits={"cpu": "500m", "memory": "512Mi"},
),
readiness_probe=client.V1Probe(
http_get=client.V1HTTPGetAction(
path="/health", port=8000
),
initial_delay_seconds=15,
period_seconds=10,
),
)
]
),
),
),
)
def _build_service(self, namespace: str) -> client.V1Service:
return client.V1Service(
metadata=client.V1ObjectMeta(name="app"),
spec=client.V1ServiceSpec(
selector={"app": "preview"},
ports=[client.V1ServicePort(port=80, target_port=8000)],
),
)
def _build_ingress(
self, namespace: str, hostname: str
) -> client.V1Ingress:
return client.V1Ingress(
metadata=client.V1ObjectMeta(
name="app",
annotations={
"cert-manager.io/cluster-issuer": "letsencrypt-prod",
},
),
spec=client.V1IngressSpec(
tls=[
client.V1IngressTLS(
hosts=[hostname],
secret_name=f"tls-{namespace}",
)
],
rules=[
client.V1IngressRule(
host=hostname,
http=client.V1HTTPIngressRuleValue(
paths=[
client.V1HTTPIngressPath(
path="/",
path_type="Prefix",
backend=client.V1IngressBackend(
service=client.V1IngressServiceBackend(
name="app",
port=client.V1ServicePort(
number=80
),
)
),
)
]
),
)
],
),
)
def _now_iso(self) -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
Database provisioning
Each preview needs its own database. This approach uses PostgreSQL schemas:
import psycopg2
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
class PreviewDatabaseManager:
def __init__(self, admin_dsn: str):
self.admin_dsn = admin_dsn
@contextmanager
def _conn(self):
conn = psycopg2.connect(self.admin_dsn)
conn.autocommit = True
try:
yield conn.cursor()
finally:
conn.close()
def create_database(self, pr_number: int, template: str = "app_template") -> str:
"""Create an isolated database from a template for a preview environment."""
db_name = f"pr_{pr_number}"
with self._conn() as cur:
# Terminate connections to template (required for CREATE DATABASE ... TEMPLATE)
cur.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
"WHERE datname = %s AND pid <> pg_backend_pid()",
(template,),
)
cur.execute(
f'CREATE DATABASE "{db_name}" TEMPLATE "{template}"'
)
logger.info(f"Created preview database: {db_name}")
return db_name
def destroy_database(self, pr_number: int) -> None:
db_name = f"pr_{pr_number}"
with self._conn() as cur:
cur.execute(
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
"WHERE datname = %s AND pid <> pg_backend_pid()",
(db_name,),
)
cur.execute(f'DROP DATABASE IF EXISTS "{db_name}"')
logger.info(f"Destroyed preview database: {db_name}")
def seed_data(self, pr_number: int, seed_file: str) -> None:
"""Apply seed data to a preview database."""
db_name = f"pr_{pr_number}"
dsn = self.admin_dsn.rsplit("/", 1)[0] + f"/{db_name}"
conn = psycopg2.connect(dsn)
try:
with conn.cursor() as cur:
with open(seed_file) as f:
cur.execute(f.read())
conn.commit()
finally:
conn.close()
GitHub PR comment with preview URL
import httpx
async def post_preview_url(repo: str, pr_number: int, url: str) -> None:
"""Post or update a comment on the PR with the preview URL."""
token = "ghp_..."
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
marker = "<!-- preview-environment -->"
body = (
f"{marker}\n"
f"## 🚀 Preview Environment\n\n"
f"**URL:** [{url}]({url})\n\n"
f"This environment updates automatically when you push to this branch.\n"
f"It will be destroyed when this PR is closed or merged."
)
async with httpx.AsyncClient() as client:
# Check for existing preview comment
comments = await client.get(
f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments",
headers=headers,
)
existing = None
for comment in comments.json():
if marker in comment.get("body", ""):
existing = comment["id"]
break
if existing:
await client.patch(
f"https://api.github.com/repos/{repo}/issues/comments/{existing}",
headers=headers,
json={"body": body},
)
else:
await client.post(
f"https://api.github.com/repos/{repo}/issues/{pr_number}/comments",
headers=headers,
json={"body": body},
)
Automatic cleanup for stale environments
from kubernetes import client, config
from datetime import datetime, timezone, timedelta
import logging
logger = logging.getLogger(__name__)
def cleanup_stale_previews(max_age_hours: int = 72) -> list[str]:
"""Find and destroy preview namespaces older than max_age_hours."""
config.load_incluster_config()
core = client.CoreV1Api()
namespaces = core.list_namespace(label_selector="preview=true")
cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours)
destroyed = []
for ns in namespaces.items:
created_str = ns.metadata.annotations.get("preview/created-at", "")
if not created_str:
continue
created = datetime.fromisoformat(created_str)
if created < cutoff:
logger.info(
f"Destroying stale preview: {ns.metadata.name} "
f"(age: {(datetime.now(timezone.utc) - created).days} days)"
)
core.delete_namespace(ns.metadata.name)
destroyed.append(ns.metadata.name)
return destroyed
Docker Compose alternative for smaller teams
Not every team needs Kubernetes. For smaller setups, Docker Compose per branch works:
import subprocess
import os
from pathlib import Path
import logging
logger = logging.getLogger(__name__)
class DockerComposePreview:
def __init__(self, base_dir: str = "/opt/previews"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
def create(self, pr_number: int, image: str, port_offset: int = 0) -> dict:
preview_dir = self.base_dir / f"pr-{pr_number}"
preview_dir.mkdir(exist_ok=True)
port = 9000 + pr_number + port_offset
compose = f"""
version: '3.8'
services:
app:
image: {image}
ports:
- "{port}:8000"
environment:
- DATABASE_URL=postgresql://app:app@db/app
- ENVIRONMENT=preview
depends_on:
db:
condition: service_healthy
db:
image: postgres:16
environment:
- POSTGRES_DB=app
- POSTGRES_USER=app
- POSTGRES_PASSWORD=app
healthcheck:
test: pg_isready -U app
interval: 5s
retries: 5
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
"""
(preview_dir / "docker-compose.yml").write_text(compose)
subprocess.run(
["docker", "compose", "up", "-d"],
cwd=preview_dir,
check=True,
)
url = f"http://preview.example.com:{port}"
logger.info(f"Preview PR #{pr_number} at {url}")
return {"url": url, "port": port}
def destroy(self, pr_number: int) -> None:
preview_dir = self.base_dir / f"pr-{pr_number}"
if preview_dir.exists():
subprocess.run(
["docker", "compose", "down", "-v"],
cwd=preview_dir,
check=True,
)
import shutil
shutil.rmtree(preview_dir)
Cost optimization strategies
| Strategy | Monthly savings | Implementation effort |
|---|---|---|
| Auto-shutdown after 2h inactivity | 40-60% | Medium — needs access log monitoring |
| Smaller instance sizes for previews | 30-50% | Low — just resource limits |
| Shared database with schema isolation | 20-30% | Medium — migration changes |
| Spot/preemptible instances | 60-80% | Low — tolerate occasional restarts |
| Scale to zero (Knative/Cloud Run) | 70-90% | High — needs serverless-compatible app |
The biggest cost savings come from not running environments nobody is looking at. A Python cron job that checks Nginx access logs and shuts down idle previews pays for itself quickly.
The one thing to remember: Feature branch deployments are most valuable when they’re cheap and automatic. Start with Docker Compose for small teams, graduate to Kubernetes namespaces at scale, and always implement automatic cleanup to avoid runaway costs.
See Also
- Python Blue Green Deployments How Python helps teams switch between two identical server environments so updates never cause downtime
- Python Canary Releases Why teams send new code to just a few users first — and how Python manages the gradual rollout
- Python Chaos Engineering Why engineers deliberately break their own systems using Python — and how it prevents real disasters
- Python Compliance As Code How Python turns security rules and regulations into automated checks that run every time code changes
- Python Gitops Patterns How Git becomes the single source of truth for everything running in production — and Python makes it work