Shadow Deployment for ML Models in Python — Deep Dive
Architecture Patterns
Pattern 1: Application-Level Mirroring
The serving application calls both models and discards the shadow response:
import asyncio
import time
import logging
from fastapi import FastAPI, Request
from contextlib import suppress
app = FastAPI()
logger = logging.getLogger("shadow")
class ModelRouter:
def __init__(self, primary_model, shadow_model):
self.primary = primary_model
self.shadow = shadow_model
async def predict(self, features: dict) -> dict:
# Primary prediction: blocking, returned to user
start = time.perf_counter()
primary_result = await self.primary.predict(features)
primary_latency = time.perf_counter() - start
# Shadow prediction: fire-and-forget, never returned
asyncio.create_task(
self._shadow_predict(features, primary_result, primary_latency)
)
return primary_result
async def _shadow_predict(
self, features: dict, primary_result: dict, primary_latency: float
):
"""Run shadow model and log comparison. Never affects the user."""
try:
start = time.perf_counter()
shadow_result = await self.shadow.predict(features)
shadow_latency = time.perf_counter() - start
await log_comparison({
"timestamp": time.time(),
"features_hash": hash(frozenset(features.items())),
"primary_prediction": primary_result["prediction"],
"shadow_prediction": shadow_result["prediction"],
"primary_latency_ms": primary_latency * 1000,
"shadow_latency_ms": shadow_latency * 1000,
"agreement": primary_result["prediction"] == shadow_result["prediction"],
})
except Exception as e:
# Shadow failures must never propagate
logger.warning(f"Shadow prediction failed: {e}")
The critical rule: shadow failures are logged but never raise exceptions or delay the primary response.
Pattern 2: Infrastructure-Level Mirroring
Instead of modifying the application, mirror traffic at the load balancer or service mesh layer:
# Istio VirtualService configuration
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: ml-serving
spec:
hosts:
- ml-serving.default.svc.cluster.local
http:
- route:
- destination:
host: model-primary
port:
number: 8080
mirror:
host: model-shadow
port:
number: 8080
mirrorPercentage:
value: 100.0
Istio’s traffic mirroring sends a copy of every request to the shadow service. The shadow’s response is discarded by the mesh. This requires zero application code changes.
Pattern 3: Queue-Based Replay
For batch or near-real-time systems, log production requests to a queue and replay them through the shadow model asynchronously:
from kafka import KafkaConsumer, KafkaProducer
import json
def shadow_replay_worker():
"""Consume production requests from Kafka and replay through shadow model."""
consumer = KafkaConsumer(
"production-requests",
bootstrap_servers="kafka:9092",
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
producer = KafkaProducer(
bootstrap_servers="kafka:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
shadow_model = load_shadow_model()
for message in consumer:
request = message.value
try:
shadow_prediction = shadow_model.predict(request["features"])
producer.send("shadow-comparisons", {
"request_id": request["request_id"],
"timestamp": request["timestamp"],
"primary_prediction": request["primary_prediction"],
"shadow_prediction": shadow_prediction,
})
except Exception as e:
producer.send("shadow-errors", {
"request_id": request["request_id"],
"error": str(e)
})
Queue-based replay decouples shadow processing from the serving path entirely, eliminating any latency impact. The tradeoff is that time-sensitive features (like “current time” or “requests in the last second”) may differ between the original and replayed request.
Comparison Pipeline
import pandas as pd
import numpy as np
from dataclasses import dataclass
@dataclass
class ShadowReport:
total_requests: int
agreement_rate: float
primary_latency_p50: float
primary_latency_p99: float
shadow_latency_p50: float
shadow_latency_p99: float
shadow_error_rate: float
prediction_distribution_divergence: float
def generate_shadow_report(comparisons_df: pd.DataFrame) -> ShadowReport:
"""Analyze shadow deployment results."""
total = len(comparisons_df)
agreement = comparisons_df["agreement"].mean()
# Latency percentiles
p_lat = comparisons_df["primary_latency_ms"]
s_lat = comparisons_df["shadow_latency_ms"]
# Prediction distribution comparison (Jensen-Shannon)
from scipy.spatial.distance import jensenshannon
p_hist, bins = np.histogram(comparisons_df["primary_prediction"], bins=50, density=True)
s_hist, _ = np.histogram(comparisons_df["shadow_prediction"], bins=bins, density=True)
js_div = jensenshannon(p_hist + 1e-10, s_hist + 1e-10)
shadow_errors = comparisons_df["shadow_error"].sum() if "shadow_error" in comparisons_df else 0
return ShadowReport(
total_requests=total,
agreement_rate=agreement,
primary_latency_p50=p_lat.quantile(0.5),
primary_latency_p99=p_lat.quantile(0.99),
shadow_latency_p50=s_lat.quantile(0.5),
shadow_latency_p99=s_lat.quantile(0.99),
shadow_error_rate=shadow_errors / total,
prediction_distribution_divergence=js_div,
)
Automated Promotion Gates
Define objective criteria for promoting a shadow model to the next stage (typically A/B testing or direct production):
@dataclass
class PromotionCriteria:
min_requests: int = 10_000
min_agreement_rate: float = 0.90
max_shadow_latency_p99_ms: float = 100.0
max_shadow_error_rate: float = 0.001
max_distribution_divergence: float = 0.15
def evaluate_promotion(
report: ShadowReport,
criteria: PromotionCriteria
) -> dict:
"""Check if shadow model meets promotion criteria."""
checks = {
"sufficient_traffic": report.total_requests >= criteria.min_requests,
"agreement_ok": report.agreement_rate >= criteria.min_agreement_rate,
"latency_ok": report.shadow_latency_p99 <= criteria.max_shadow_latency_p99_ms,
"errors_ok": report.shadow_error_rate <= criteria.max_shadow_error_rate,
"distribution_ok": (
report.prediction_distribution_divergence <= criteria.max_distribution_divergence
),
}
return {
"promote": all(checks.values()),
"checks": checks,
"report_summary": {
"requests": report.total_requests,
"agreement": f"{report.agreement_rate:.2%}",
"shadow_p99_ms": f"{report.shadow_latency_p99:.1f}",
"error_rate": f"{report.shadow_error_rate:.4%}",
"js_divergence": f"{report.prediction_distribution_divergence:.4f}",
}
}
Handling Disagreements
When the shadow model disagrees with production, the disagreement itself is valuable data:
def analyze_disagreements(comparisons_df: pd.DataFrame) -> dict:
"""Deep-dive into where shadow and primary disagree."""
disagreements = comparisons_df[~comparisons_df["agreement"]]
if len(disagreements) == 0:
return {"disagreement_count": 0}
# Cluster disagreements by feature patterns
analysis = {
"disagreement_count": len(disagreements),
"disagreement_rate": len(disagreements) / len(comparisons_df),
"shadow_higher_rate": (
disagreements["shadow_prediction"] > disagreements["primary_prediction"]
).mean(),
"mean_absolute_difference": abs(
disagreements["shadow_prediction"] - disagreements["primary_prediction"]
).mean(),
}
# If ground truth is available (delayed labels)
if "ground_truth" in disagreements.columns:
labeled = disagreements.dropna(subset=["ground_truth"])
if len(labeled) > 0:
shadow_correct = (
labeled["shadow_prediction"].round() == labeled["ground_truth"]
).mean()
primary_correct = (
labeled["primary_prediction"].round() == labeled["ground_truth"]
).mean()
analysis["shadow_correct_on_disagreements"] = shadow_correct
analysis["primary_correct_on_disagreements"] = primary_correct
return analysis
When the shadow model is correct more often on disagreements, it provides strong evidence for promotion.
Cost and Duration Planning
Shadow deployment doubles inference compute for the duration of the test. Planning checklist:
| Factor | Typical Value |
|---|---|
| Duration | 1-2 weeks (enough for weekly traffic patterns) |
| Traffic percentage | 100% mirror (sample at 10-50% for cost savings) |
| Extra compute cost | 1.5-2x normal serving cost |
| Storage for logs | ~1KB per comparison × daily request volume |
| Minimum sample | 10,000+ requests for reliable statistics |
For high-traffic services (millions of requests per day), sampling 10% of traffic for shadow processing often provides sufficient data while keeping costs manageable.
The Shadow-to-Production Pipeline
A mature deployment workflow chains shadow deployment with other strategies:
- Shadow deployment — validate model works correctly on real traffic (1-2 weeks)
- Canary deployment — serve to 1-5% of real users (3-7 days)
- A/B test — full experiment with statistical rigor (2-4 weeks)
- Gradual rollout — ramp from 5% → 25% → 50% → 100%
Each stage has automated gates. If any stage fails its criteria, the pipeline halts and alerts the team.
One thing to remember: Shadow deployment is the first line of defense in production ML — it catches infrastructure failures, latency issues, and prediction anomalies before any user is exposed to a new model.
See Also
- Python Ab Testing Ml Models Why taste-testing two cookie recipes with different friends is the fairest way to pick a winner.
- Python Feature Store Design Why a shared ingredient pantry saves every cook in the kitchen from buying the same spices over and over.
- Python Ml Pipeline Orchestration Why a factory assembly line needs a foreman to make sure every step happens in the right order at the right time.
- Python Mlflow Experiment Tracking Find out why writing down every cooking experiment helps you recreate the perfect recipe every time.
- Python Model Explainability Shap How asking 'why did you pick that answer?' turns a mysterious black box into something you can actually trust.