Python A/B Testing Framework — Deep Dive

Architecture of an Experimentation Platform

A production A/B testing system has four layers: experiment configuration, user assignment, event tracking, and statistical analysis.

┌──────────────┐   ┌──────────────┐   ┌──────────────┐
│  Experiment   │   │  Assignment  │   │   Analysis   │
│   Config      │──▶│   Engine     │──▶│   Pipeline   │
└──────────────┘   └──────┬───────┘   └──────────────┘

                   ┌──────▼───────┐
                   │    Event     │
                   │   Tracking   │
                   └──────────────┘

Experiment Configuration

from dataclasses import dataclass, field
from datetime import datetime, date
from enum import Enum

class ExperimentStatus(str, Enum):
    DRAFT = "draft"
    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"

@dataclass
class Variant:
    key: str
    weight: float = 0.5  # Traffic allocation
    description: str = ""

@dataclass
class Experiment:
    id: str
    name: str
    hypothesis: str
    primary_metric: str
    guardrail_metrics: list[str] = field(default_factory=list)
    variants: list[Variant] = field(default_factory=lambda: [
        Variant(key="control", weight=0.5),
        Variant(key="treatment", weight=0.5),
    ])
    status: ExperimentStatus = ExperimentStatus.DRAFT
    min_sample_size: int = 1000
    max_duration_days: int = 30
    started_at: datetime | None = None
    owner: str = ""
    target_audience: dict = field(default_factory=dict)

Consistent Assignment Engine

The assignment engine must be deterministic (same user always gets same variant) and uniformly distributed:

import hashlib
from typing import Optional

class AssignmentEngine:
    def __init__(self, salt: str = "ab-test-v1"):
        self.salt = salt

    def assign(
        self, 
        experiment: Experiment, 
        user_id: str,
        overrides: dict[str, str] | None = None,
    ) -> Optional[str]:
        """Return variant key for this user, or None if not eligible."""
        if experiment.status != ExperimentStatus.RUNNING:
            return None

        # Check overrides (for internal testing)
        if overrides and experiment.id in overrides:
            return overrides[experiment.id]

        # Check audience targeting
        # (simplified — production would evaluate rules)

        # Consistent hash assignment
        hash_input = f"{self.salt}:{experiment.id}:{user_id}".encode()
        hash_value = int(hashlib.sha256(hash_input).hexdigest()[:8], 16)
        bucket = (hash_value % 10000) / 10000  # 0.0000 to 0.9999

        cumulative = 0.0
        for variant in experiment.variants:
            cumulative += variant.weight
            if bucket < cumulative:
                return variant.key

        return experiment.variants[-1].key  # Fallback

Multi-Layer Hashing

When running many experiments simultaneously, you want independent assignment across experiments. Using the experiment ID in the hash input ensures that a user’s assignment in Experiment A doesn’t correlate with their assignment in Experiment B:

# User "alice" might be in:
# Experiment "checkout-v2": treatment (bucket 0.7234)
# Experiment "pricing-test": control (bucket 0.1892)
# These are independent because the experiment ID changes the hash

Event Tracking

Track both exposure (when a user sees a variant) and conversion (when they do the thing you’re measuring):

from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class ExperimentEvent:
    experiment_id: str
    variant: str
    user_id: str
    event_type: str  # "exposure" or metric name
    value: float = 1.0
    timestamp: datetime = field(default_factory=datetime.utcnow)
    metadata: dict = field(default_factory=dict)

class EventTracker:
    def __init__(self, storage):
        self.storage = storage
        self._exposure_cache: set[tuple[str, str]] = set()

    async def track_exposure(self, experiment_id: str, variant: str, user_id: str):
        """Track that a user was exposed to a variant. Deduplicate."""
        key = (experiment_id, user_id)
        if key in self._exposure_cache:
            return
        self._exposure_cache.add(key)

        event = ExperimentEvent(
            experiment_id=experiment_id,
            variant=variant,
            user_id=user_id,
            event_type="exposure",
        )
        await self.storage.append(event)

    async def track_conversion(
        self, 
        experiment_id: str, 
        variant: str, 
        user_id: str,
        metric: str,
        value: float = 1.0,
    ):
        event = ExperimentEvent(
            experiment_id=experiment_id,
            variant=variant,
            user_id=user_id,
            event_type=metric,
            value=value,
        )
        await self.storage.append(event)

Exposure deduplication matters — counting the same user multiple times inflates your sample size and gives false confidence.

Statistical Analysis

Frequentist Approach (Z-Test for Proportions)

import math
from dataclasses import dataclass

@dataclass
class VariantStats:
    variant: str
    users: int
    conversions: int

    @property
    def rate(self) -> float:
        return self.conversions / self.users if self.users > 0 else 0.0

@dataclass
class TestResult:
    control: VariantStats
    treatment: VariantStats
    z_score: float
    p_value: float
    lift: float
    confidence_interval: tuple[float, float]
    significant: bool

def analyze_proportions(control: VariantStats, treatment: VariantStats, alpha: float = 0.05) -> TestResult:
    """Two-proportion z-test."""
    p_c = control.rate
    p_t = treatment.rate
    n_c = control.users
    n_t = treatment.users

    if n_c == 0 or n_t == 0:
        return TestResult(
            control=control, treatment=treatment,
            z_score=0, p_value=1.0, lift=0,
            confidence_interval=(0, 0), significant=False,
        )

    # Pooled proportion
    p_pool = (control.conversions + treatment.conversions) / (n_c + n_t)
    
    # Standard error
    se = math.sqrt(p_pool * (1 - p_pool) * (1/n_c + 1/n_t))
    
    if se == 0:
        return TestResult(
            control=control, treatment=treatment,
            z_score=0, p_value=1.0, lift=0,
            confidence_interval=(0, 0), significant=False,
        )

    z = (p_t - p_c) / se
    
    # Two-tailed p-value using normal approximation
    p_value = 2 * (1 - _normal_cdf(abs(z)))
    
    # Lift
    lift = (p_t - p_c) / p_c if p_c > 0 else 0
    
    # 95% CI for the difference
    se_diff = math.sqrt(p_c*(1-p_c)/n_c + p_t*(1-p_t)/n_t)
    z_crit = 1.96
    ci = (p_t - p_c - z_crit * se_diff, p_t - p_c + z_crit * se_diff)

    return TestResult(
        control=control, treatment=treatment,
        z_score=z, p_value=p_value, lift=lift,
        confidence_interval=ci, significant=p_value < alpha,
    )

def _normal_cdf(x: float) -> float:
    """Approximation of the standard normal CDF."""
    return 0.5 * (1 + math.erf(x / math.sqrt(2)))

Bayesian Approach

Bayesian analysis gives you direct probability statements (“there’s a 94% chance treatment is better”) instead of p-values:

import numpy as np
from scipy import stats

@dataclass
class BayesianResult:
    prob_treatment_better: float
    expected_lift: float
    credible_interval: tuple[float, float]
    risk: float  # Expected loss if you choose treatment

def bayesian_ab_test(
    control_conversions: int,
    control_total: int,
    treatment_conversions: int, 
    treatment_total: int,
    simulations: int = 100_000,
) -> BayesianResult:
    """Beta-Binomial model with Monte Carlo simulation."""
    # Prior: Beta(1, 1) = uniform
    alpha_prior, beta_prior = 1, 1

    # Posterior distributions
    control_samples = np.random.beta(
        alpha_prior + control_conversions,
        beta_prior + control_total - control_conversions,
        simulations,
    )
    treatment_samples = np.random.beta(
        alpha_prior + treatment_conversions,
        beta_prior + treatment_total - treatment_conversions,
        simulations,
    )

    # Probability treatment > control
    prob_better = (treatment_samples > control_samples).mean()

    # Expected lift
    lift_samples = (treatment_samples - control_samples) / control_samples
    expected_lift = lift_samples.mean()

    # 95% credible interval for the lift
    ci = (np.percentile(lift_samples, 2.5), np.percentile(lift_samples, 97.5))

    # Expected loss (risk of choosing treatment if it's actually worse)
    loss = np.maximum(control_samples - treatment_samples, 0).mean()

    return BayesianResult(
        prob_treatment_better=prob_better,
        expected_lift=expected_lift,
        credible_interval=ci,
        risk=loss,
    )

The Bayesian approach has practical advantages: it answers “what’s the probability this is better?” directly, and it lets you stop early without the peeking problem of frequentist tests.

Sequential Testing (Safe Peeking)

If you need to monitor results during the experiment without inflating false positives:

def sequential_test(
    control: VariantStats,
    treatment: VariantStats,
    alpha: float = 0.05,
    num_peeks: int = 10,
) -> dict:
    """O'Brien-Fleming-like spending function for sequential monitoring."""
    # Adjust alpha based on peek number
    # Alpha spending: more conservative early, less conservative late
    info_fraction = (control.users + treatment.users) / (2 * 10000)  # Expected total
    info_fraction = min(info_fraction, 1.0)
    
    # O'Brien-Fleming boundary
    if info_fraction > 0:
        adjusted_z = 1.96 / math.sqrt(info_fraction)
    else:
        adjusted_z = float('inf')

    result = analyze_proportions(control, treatment, alpha)
    
    return {
        "result": result,
        "boundary": adjusted_z,
        "can_stop": abs(result.z_score) > adjusted_z,
        "info_fraction": info_fraction,
    }

The boundary starts very high (hard to cross early) and decreases as more data arrives. This maintains the overall false positive rate while allowing early stopping.

FastAPI Integration

from fastapi import FastAPI, Request, Depends

app = FastAPI()
engine = AssignmentEngine()
tracker = EventTracker(storage)

@app.middleware("http")
async def experiment_middleware(request: Request, call_next):
    user_id = request.headers.get("x-user-id", "anonymous")
    request.state.experiments = ExperimentContext(engine, tracker, user_id)
    response = await call_next(request)
    return response

class ExperimentContext:
    def __init__(self, engine, tracker, user_id):
        self.engine = engine
        self.tracker = tracker
        self.user_id = user_id
        self._assignments: dict[str, str] = {}

    async def get_variant(self, experiment: Experiment) -> str:
        if experiment.id not in self._assignments:
            variant = self.engine.assign(experiment, self.user_id)
            if variant:
                self._assignments[experiment.id] = variant
                await self.tracker.track_exposure(
                    experiment.id, variant, self.user_id
                )
        return self._assignments.get(experiment.id, "control")

# In route handlers:
@app.get("/pricing")
async def pricing_page(request: Request):
    variant = await request.state.experiments.get_variant(pricing_experiment)
    if variant == "treatment":
        return annual_pricing_page()
    return monthly_pricing_page()

Automated Experiment Lifecycle

class ExperimentAutomation:
    def __init__(self, experiments, analyzer):
        self.experiments = experiments
        self.analyzer = analyzer

    async def check_experiments(self):
        for exp in self.experiments.get_running():
            stats = await self.analyzer.get_stats(exp)
            
            # Auto-stop if guardrail violated
            for metric in exp.guardrail_metrics:
                result = stats.get(metric)
                if result and result.significant and result.lift < -0.02:
                    await self.experiments.pause(exp.id)
                    await self.notify(
                        f"Experiment {exp.name} paused: "
                        f"guardrail {metric} degraded by {result.lift:.1%}"
                    )
                    break
            
            # Auto-complete if sufficient sample and significance
            primary = stats.get(exp.primary_metric)
            if primary and primary.control.users >= exp.min_sample_size:
                if primary.significant:
                    await self.experiments.complete(exp.id, winner=primary)
                    await self.notify(
                        f"Experiment {exp.name} completed: "
                        f"treatment {'won' if primary.lift > 0 else 'lost'} "
                        f"with {primary.lift:.1%} lift (p={primary.p_value:.4f})"
                    )
            
            # Auto-stop if max duration exceeded
            if exp.started_at:
                days_running = (datetime.utcnow() - exp.started_at).days
                if days_running > exp.max_duration_days:
                    await self.experiments.complete(exp.id, reason="max_duration")

One thing to remember: A production A/B testing framework combines consistent user assignment (hashed, deterministic), proper statistical analysis (frequentist or Bayesian with peeking corrections), guardrail metrics for safety, and automated lifecycle management. The math matters — cutting corners on statistics means your “data-driven” decisions are driven by noise.

pythonexperimentationdata

See Also