Python for Pandemic Modeling — Deep Dive

Architecture of a pandemic modeling system

Production pandemic models operate in a continuous cycle: ingest surveillance data, estimate parameters, generate forecasts, evaluate interventions, and update as new data arrives.

Surveillance Data → Preprocessing → Parameter Estimation (Bayesian) →
  Forward Simulation → Scenario Analysis → Dashboard/Reports

Advanced compartmental models

SEIRHD with age structure

Real pandemic planning requires age-stratified models because disease severity, contact patterns, and vaccination priorities differ by age:

import numpy as np
from scipy.integrate import solve_ivp

class AgeStructuredSEIRHD:
    def __init__(self, age_groups: int, N: np.ndarray, contact_matrix: np.ndarray):
        self.age_groups = age_groups
        self.N = N  # Population per age group
        self.C = contact_matrix  # age_groups x age_groups contact rates
    
    def deriv(self, t, y, params):
        """
        State vector y: [S_0..S_n, E_0..E_n, I_0..I_n, R_0..R_n, H_0..H_n, D_0..D_n]
        """
        n = self.age_groups
        S, E, I, R, H, D = [y[i*n:(i+1)*n] for i in range(6)]
        
        beta = params["beta"]
        sigma = params["sigma"]       # 1/incubation period
        gamma = params["gamma"]       # 1/infectious period
        hosp_rate = params["hosp_rate"]   # age-specific hospitalization rate
        death_rate = params["death_rate"]  # age-specific hospital death rate
        hosp_duration = params["hosp_duration"]
        
        # Force of infection per age group
        # lambda_i = beta * sum_j(C_ij * I_j / N_j)
        force = beta * (self.C @ (I / self.N))
        
        dS = -force * S
        dE = force * S - sigma * E
        dI = sigma * E - gamma * I
        dR = gamma * (1 - hosp_rate) * I + (1 / hosp_duration) * (1 - death_rate) * H
        dH = gamma * hosp_rate * I - (1 / hosp_duration) * H
        dD = (1 / hosp_duration) * death_rate * H
        
        return np.concatenate([dS, dE, dI, dR, dH, dD])
    
    def simulate(self, y0: np.ndarray, t_span: tuple, params: dict) -> dict:
        sol = solve_ivp(
            self.deriv, t_span, y0, args=(params,),
            method="RK45", max_step=1.0, dense_output=True
        )
        
        n = self.age_groups
        t = sol.t
        return {
            "t": t,
            "S": sol.y[:n],
            "E": sol.y[n:2*n],
            "I": sol.y[2*n:3*n],
            "R": sol.y[3*n:4*n],
            "H": sol.y[4*n:5*n],
            "D": sol.y[5*n:6*n],
        }

# Example: 4 age groups (0-17, 18-44, 45-64, 65+)
N = np.array([74_000_000, 120_000_000, 83_000_000, 54_000_000])

# Contact matrix (Mossong et al. POLYMOD study, adapted)
contact_matrix = np.array([
    [7.0, 3.0, 1.5, 0.5],
    [3.0, 8.0, 3.0, 1.0],
    [1.5, 3.0, 5.0, 1.5],
    [0.5, 1.0, 1.5, 3.0],
])

model = AgeStructuredSEIRHD(
    age_groups=4, N=N, contact_matrix=contact_matrix
)

Time-varying transmission rate

Transmission rates change as behavior and policies shift. Model β(t) as a piecewise function or smooth spline:

from scipy.interpolate import CubicSpline

class TimeVaryingBeta:
    def __init__(self, change_points: list[float], beta_values: list[float]):
        self.spline = CubicSpline(change_points, beta_values, bc_type="clamped")
    
    def __call__(self, t: float) -> float:
        return max(0.01, float(self.spline(t)))

# Beta drops at lockdown (day 30), partially recovers (day 90)
beta_func = TimeVaryingBeta(
    change_points=[0, 30, 35, 90, 120, 365],
    beta_values=[0.35, 0.35, 0.12, 0.12, 0.20, 0.20]
)

Network-based transmission models

Compartmental models assume homogeneous mixing. Network models capture realistic contact structures:

import networkx as nx
import numpy as np

def build_population_network(n: int, household_size: int = 4,
                              workplace_size: int = 20,
                              random_contacts: int = 5) -> nx.Graph:
    G = nx.Graph()
    G.add_nodes_from(range(n))
    
    # Household layer (complete graphs)
    for i in range(0, n, household_size):
        household = list(range(i, min(i + household_size, n)))
        for a in household:
            for b in household:
                if a < b:
                    G.add_edge(a, b, layer="household", weight=0.3)
    
    # Workplace layer
    workers = np.random.permutation(n)
    for i in range(0, n, workplace_size):
        workplace = workers[i:i + workplace_size]
        for j, a in enumerate(workplace):
            for b in workplace[j+1:j+4]:  # Not fully connected
                G.add_edge(a, b, layer="workplace", weight=0.05)
    
    # Random community contacts
    for node in range(n):
        contacts = np.random.choice(n, random_contacts, replace=False)
        for contact in contacts:
            if contact != node:
                G.add_edge(node, contact, layer="community", weight=0.02)
    
    return G

def simulate_network_sir(G: nx.Graph, initial_infected: int = 10,
                          days: int = 200, gamma: float = 0.1) -> dict:
    states = {node: "S" for node in G.nodes()}
    initial = np.random.choice(list(G.nodes()), initial_infected, replace=False)
    for node in initial:
        states[node] = "I"
    
    history = {"S": [], "I": [], "R": []}
    infected_days = {node: 0 for node in initial}
    
    for day in range(days):
        new_infections = []
        new_recoveries = []
        
        for node in G.nodes():
            if states[node] == "I":
                infected_days[node] = infected_days.get(node, 0) + 1
                
                # Recovery
                if np.random.random() < gamma:
                    new_recoveries.append(node)
                    continue
                
                # Transmission to neighbors
                for neighbor in G.neighbors(node):
                    if states[neighbor] == "S":
                        weight = G[node][neighbor].get("weight", 0.05)
                        if np.random.random() < weight:
                            new_infections.append(neighbor)
        
        for node in new_infections:
            states[node] = "I"
            infected_days[node] = 0
        for node in new_recoveries:
            states[node] = "R"
        
        counts = {"S": 0, "I": 0, "R": 0}
        for s in states.values():
            counts[s] += 1
        for key in history:
            history[key].append(counts[key])
    
    return history

Bayesian parameter estimation with PyMC

Point estimates of epidemic parameters are misleading. Bayesian inference provides posterior distributions that quantify uncertainty:

import pymc as pm
import pytensor.tensor as pt

def estimate_parameters(daily_cases: np.ndarray, population: int):
    with pm.Model() as model:
        # Priors
        beta = pm.Lognormal("beta", mu=np.log(0.3), sigma=0.5)
        gamma = pm.Lognormal("gamma", mu=np.log(0.1), sigma=0.3)
        
        # Reporting rate (not all cases are detected)
        rho = pm.Beta("rho", alpha=5, beta=5)
        
        # Overdispersion
        phi = pm.Exponential("phi", lam=1)
        
        # Simulate SIR forward
        def sir_step(S_prev, I_prev, beta, gamma, N):
            new_infections = beta * S_prev * I_prev / N
            new_recoveries = gamma * I_prev
            S_new = S_prev - new_infections
            I_new = I_prev + new_infections - new_recoveries
            return S_new, I_new, new_infections
        
        S = population - 1
        I = 1.0
        expected_cases = []
        
        for t in range(len(daily_cases)):
            S, I, new_inf = sir_step(S, I, beta, gamma, population)
            expected_cases.append(new_inf * rho)
        
        expected = pt.stack(expected_cases)
        
        # Negative binomial likelihood (handles overdispersion)
        pm.NegativeBinomial(
            "cases", mu=expected, alpha=phi,
            observed=daily_cases
        )
        
        trace = pm.sample(2000, tune=1000, cores=4, target_accept=0.9)
    
    return trace

# Analyze posterior
# trace.posterior["beta"].mean()  → point estimate
# az.hdi(trace, hdi_prob=0.95)   → 95% credible interval

Intervention optimization

Given a model, optimize intervention timing and intensity:

from scipy.optimize import differential_evolution

def evaluate_intervention(params, model, population_params, observed_data):
    lockdown_start, lockdown_duration, lockdown_intensity = params
    
    # Modify beta during lockdown period
    beta_schedule = np.full(365, population_params["base_beta"])
    start = int(lockdown_start)
    end = min(start + int(lockdown_duration), 365)
    beta_schedule[start:end] *= (1 - lockdown_intensity)
    
    # Simulate with time-varying beta
    total_deaths = simulate_with_schedule(model, beta_schedule)
    economic_cost = lockdown_duration * lockdown_intensity * 1e9  # simplified
    
    # Multi-objective: minimize deaths + economic cost
    return total_deaths * 1e6 + economic_cost

result = differential_evolution(
    evaluate_intervention,
    bounds=[(1, 100), (14, 180), (0.3, 0.9)],  # start, duration, intensity
    args=(model, pop_params, data),
    maxiter=500, seed=42
)

Ensemble forecasting

No single model is correct. Production forecasting systems combine multiple models:

def ensemble_forecast(models: list, weights: list[float], 
                       n_days: int, n_samples: int = 1000) -> dict:
    all_predictions = []
    
    for model, weight in zip(models, weights):
        # Each model generates probabilistic forecasts
        samples = model.sample_forecast(n_days, n_samples=int(n_samples * weight))
        all_predictions.append(samples)
    
    combined = np.concatenate(all_predictions, axis=0)
    
    return {
        "median": np.median(combined, axis=0),
        "ci_95_lower": np.percentile(combined, 2.5, axis=0),
        "ci_95_upper": np.percentile(combined, 97.5, axis=0),
        "ci_50_lower": np.percentile(combined, 25, axis=0),
        "ci_50_upper": np.percentile(combined, 75, axis=0),
    }

The US CDC COVID-19 Forecast Hub aggregated 50+ models (most Python-based) into ensemble forecasts that consistently outperformed any individual model.

Vaccine allocation optimization

With limited vaccine supply, allocation strategy matters:

def optimize_vaccine_allocation(model, total_doses: int, age_groups: int):
    """Find optimal age-group allocation to minimize deaths."""
    
    def objective(allocation_fractions):
        doses = (np.array(allocation_fractions) * total_doses).astype(int)
        result = model.simulate_with_vaccination(doses)
        return result["total_deaths"]
    
    from scipy.optimize import minimize
    
    # Constraint: fractions sum to 1
    constraints = {"type": "eq", "fun": lambda x: sum(x) - 1.0}
    bounds = [(0, 1)] * age_groups
    x0 = np.ones(age_groups) / age_groups
    
    result = minimize(objective, x0, bounds=bounds, constraints=constraints)
    return result.x

During COVID-19, such optimization consistently showed that vaccinating the elderly first minimized deaths, while vaccinating working-age adults first minimized total infections.

Tradeoffs

Model typeStrengthWeakness
Compartmental (SIR/SEIR)Fast, analytically tractableAssumes homogeneous mixing
Age-structured compartmentalCaptures demographic riskRequires contact matrices
Network-basedRealistic contact patternsComputationally expensive
Agent-based (Covasim, Mesa)Maximum flexibilitySlow, many parameters
Statistical (curve fitting)Fast, data-drivenNo mechanism, poor extrapolation
Bayesian ensembleQuantifies uncertaintyComplex, computationally heavy

The one thing to remember: Production pandemic models in Python combine age-structured compartmental dynamics with Bayesian parameter estimation and ensemble forecasting — and their real value is not predicting exact case counts, but quantifying the uncertainty around intervention scenarios so decision-makers can plan for plausible ranges rather than false certainty.

pythonepidemiologypublic-health

See Also