Python for Electronic Health Records — Deep Dive

EHR data architecture

Production EHR analytics systems operate within strict technical and regulatory constraints. The architecture must handle data volumes (millions of patients, billions of events), protect privacy (HIPAA, GDPR), and produce reproducible results for clinical validation.

EHR System (Epic/Cerner) → ETL to OMOP CDM → Feature Store → 
  Model Training (isolated environment) → Validation → Deployment (EHR integration)

Working with OMOP CDM in Python

Schema overview

The OMOP CDM organizes clinical data into standardized tables with vocabulary mappings:

import sqlalchemy as sa
from sqlalchemy.orm import Session

# Key OMOP tables and their relationships
# person → condition_occurrence → concept (diagnosis)
# person → drug_exposure → concept (medication)
# person → measurement → concept (lab test)
# person → observation → concept (clinical observation)
# person → visit_occurrence → visit_detail

engine = sa.create_engine("postgresql://analyst:pass@omop-db/cdm_v5")

Cohort definition

Clinical studies start with cohort identification. OMOP uses the cohort table pattern:

def build_diabetes_cohort(engine) -> pd.DataFrame:
    """
    Inclusion: Type 2 diabetes diagnosis + HbA1c >= 6.5%
    Exclusion: Type 1 diabetes, age < 18, pregnancy
    """
    query = """
    WITH t2d_patients AS (
        SELECT DISTINCT co.person_id,
               MIN(co.condition_start_date) AS index_date
        FROM condition_occurrence co
        JOIN concept_ancestor ca 
            ON co.condition_concept_id = ca.descendant_concept_id
        WHERE ca.ancestor_concept_id = 201826  -- T2D SNOMED hierarchy
        GROUP BY co.person_id
    ),
    with_hba1c AS (
        SELECT t.person_id, t.index_date
        FROM t2d_patients t
        JOIN measurement m ON t.person_id = m.person_id
        WHERE m.measurement_concept_id = 3004410  -- HbA1c
          AND m.value_as_number >= 6.5
          AND m.measurement_date BETWEEN t.index_date - INTERVAL '1 year' 
                                      AND t.index_date + INTERVAL '1 year'
    ),
    exclusions AS (
        SELECT DISTINCT person_id FROM condition_occurrence
        WHERE condition_concept_id IN (
            201254,  -- Type 1 diabetes
            4299535  -- Pregnancy
        )
    )
    SELECT w.person_id, w.index_date,
           p.year_of_birth, p.gender_concept_id, p.race_concept_id
    FROM with_hba1c w
    JOIN person p ON w.person_id = p.person_id
    WHERE p.year_of_birth <= EXTRACT(YEAR FROM w.index_date) - 18
      AND w.person_id NOT IN (SELECT person_id FROM exclusions)
    """
    return pd.read_sql(query, engine)

Temporal feature engineering

EHR data is inherently temporal. Features must capture both point-in-time values and trends:

from typing import Optional
import numpy as np

class TemporalFeatureBuilder:
    def __init__(self, measurements: pd.DataFrame, index_dates: pd.DataFrame):
        self.measurements = measurements
        self.index_dates = index_dates
    
    def build_features(self, concept_id: int, windows: list[int] = [7, 30, 90, 365]) -> pd.DataFrame:
        """Build temporal features for a measurement concept relative to index date."""
        merged = self.measurements[
            self.measurements["measurement_concept_id"] == concept_id
        ].merge(self.index_dates, on="person_id")
        
        merged["days_before"] = (
            pd.to_datetime(merged["index_date"]) - 
            pd.to_datetime(merged["measurement_date"])
        ).dt.days
        
        features = {}
        for window in windows:
            window_data = merged[
                (merged["days_before"] >= 0) & (merged["days_before"] <= window)
            ]
            
            agg = window_data.groupby("person_id")["value_as_number"].agg([
                "last", "mean", "std", "min", "max", "count"
            ])
            agg.columns = [f"c{concept_id}_{window}d_{stat}" for stat in agg.columns]
            features[window] = agg
        
        result = self.index_dates[["person_id"]].set_index("person_id")
        for window_features in features.values():
            result = result.join(window_features, how="left")
        
        return result.reset_index()
    
    def build_trend(self, concept_id: int, window: int = 90) -> pd.DataFrame:
        """Compute slope of measurements over a window (trend detection)."""
        merged = self.measurements[
            self.measurements["measurement_concept_id"] == concept_id
        ].merge(self.index_dates, on="person_id")
        
        merged["days_before"] = (
            pd.to_datetime(merged["index_date"]) - 
            pd.to_datetime(merged["measurement_date"])
        ).dt.days
        
        window_data = merged[
            (merged["days_before"] >= 0) & (merged["days_before"] <= window)
        ]
        
        def compute_slope(group):
            if len(group) < 2:
                return np.nan
            x = group["days_before"].values.astype(float)
            y = group["value_as_number"].values
            slope = np.polyfit(x, y, 1)[0]
            return -slope  # Positive = increasing over time
        
        slopes = window_data.groupby("person_id").apply(compute_slope)
        slopes.name = f"c{concept_id}_{window}d_trend"
        return slopes.reset_index()

Clinical NLP pipeline

Building a production clinical NLP system

import spacy
import medspacy
from medspacy.ner import TargetRule
from medspacy.context import ConTextRule

def build_clinical_nlp():
    nlp = medspacy.load(enable=["medspacy_pyrush", "medspacy_context"])
    
    # Add custom entity rules for specific use case
    target_matcher = nlp.get_pipe("medspacy_target_matcher")
    target_rules = [
        TargetRule("ejection fraction", "MEASUREMENT",
                   pattern=[{"LOWER": "ejection"}, {"LOWER": "fraction"}]),
        TargetRule("chest pain", "SYMPTOM",
                   pattern=[{"LOWER": "chest"}, {"LOWER": "pain"}]),
    ]
    target_matcher.add(target_rules)
    
    # Context rules for assertion detection
    context = nlp.get_pipe("medspacy_context")
    context_rules = [
        ConTextRule("patient denies", "NEGATED_EXISTENCE", direction="FORWARD"),
        ConTextRule("no evidence of", "NEGATED_EXISTENCE", direction="FORWARD"),
        ConTextRule("family history of", "FAMILY", direction="FORWARD"),
        ConTextRule("on admission", "HISTORICAL", direction="FORWARD"),
    ]
    context.add(context_rules)
    
    return nlp

def process_notes(nlp, notes_df: pd.DataFrame) -> pd.DataFrame:
    """Extract structured findings from clinical notes."""
    results = []
    for row in notes_df.itertuples():
        doc = nlp(row.note_text)
        for ent in doc.ents:
            results.append({
                "note_id": row.note_id,
                "person_id": row.person_id,
                "entity_text": ent.text,
                "entity_label": ent.label_,
                "is_negated": ent._.is_negated,
                "is_historical": ent._.is_historical,
                "is_family": ent._.is_family,
                "section": ent._.section_category,
                "start_char": ent.start_char,
                "end_char": ent.end_char,
            })
    return pd.DataFrame(results)

Extracting numerical values from text

Clinical notes contain critical measurements embedded in free text:

import re

def extract_ejection_fraction(text: str) -> Optional[float]:
    """Extract ejection fraction percentage from clinical note."""
    patterns = [
        r"(?:ejection fraction|EF|LVEF)\s*(?:of|is|was|:)?\s*(\d{1,2})(?:\s*%|-\d{1,2}%)",
        r"(?:ejection fraction|EF|LVEF)\s*(?:of|is|was|:)?\s*(\d{1,2})\s*to\s*(\d{1,2})\s*%",
        r"EF\s*(\d{1,2})%",
    ]
    
    for pattern in patterns:
        match = re.search(pattern, text, re.IGNORECASE)
        if match:
            groups = match.groups()
            if len(groups) == 2:  # Range: take midpoint
                return (float(groups[0]) + float(groups[1])) / 2
            return float(groups[0])
    return None

De-identification at scale

Using Presidio for PHI removal

from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def deidentify_note(text: str) -> str:
    results = analyzer.analyze(
        text=text,
        entities=["PERSON", "DATE_TIME", "PHONE_NUMBER", "EMAIL_ADDRESS",
                  "LOCATION", "US_SSN", "MEDICAL_LICENSE"],
        language="en"
    )
    
    anonymized = anonymizer.anonymize(
        text=text,
        analyzer_results=results,
        operators={
            "PERSON": OperatorConfig("replace", {"new_value": "[NAME]"}),
            "DATE_TIME": OperatorConfig("replace", {"new_value": "[DATE]"}),
            "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}),
            "LOCATION": OperatorConfig("replace", {"new_value": "[LOCATION]"}),
            "DEFAULT": OperatorConfig("replace", {"new_value": "[REDACTED]"}),
        }
    )
    return anonymized.text

Date shifting for longitudinal studies

import hashlib
from datetime import timedelta

def get_date_shift(person_id: int, secret_key: str, max_shift_days: int = 365) -> int:
    """Deterministic date shift per patient (same shift for all their dates)."""
    hash_input = f"{person_id}:{secret_key}".encode()
    hash_value = int(hashlib.sha256(hash_input).hexdigest(), 16)
    return (hash_value % (2 * max_shift_days)) - max_shift_days

def shift_dates(df: pd.DataFrame, date_columns: list[str], 
                secret_key: str) -> pd.DataFrame:
    shifted = df.copy()
    for _, group in shifted.groupby("person_id"):
        shift = get_date_shift(group["person_id"].iloc[0], secret_key)
        for col in date_columns:
            shifted.loc[group.index, col] = (
                pd.to_datetime(group[col]) + timedelta(days=shift)
            )
    return shifted

Federated learning across institutions

When data cannot leave the hospital, federated learning trains models across institutions without sharing patient records:

import torch
import torch.nn as nn
from typing import OrderedDict

class FederatedEHRModel(nn.Module):
    def __init__(self, n_features: int, n_classes: int):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(n_features, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, n_classes),
        )
    
    def forward(self, x):
        return self.network(x)

def federated_averaging(global_model: nn.Module, 
                         local_updates: list[OrderedDict],
                         weights: list[float]) -> OrderedDict:
    """Weighted average of local model updates (FedAvg algorithm)."""
    total_weight = sum(weights)
    averaged = OrderedDict()
    
    for key in global_model.state_dict():
        averaged[key] = sum(
            w * update[key] for update, w in zip(local_updates, weights)
        ) / total_weight
    
    return averaged

# Coordination loop
global_model = FederatedEHRModel(n_features=200, n_classes=2)

for round_num in range(50):
    # Send global model to each hospital
    global_state = global_model.state_dict()
    
    local_updates = []
    local_weights = []
    
    for hospital in hospitals:
        # Each hospital trains locally for a few epochs
        local_model = FederatedEHRModel(200, 2)
        local_model.load_state_dict(global_state)
        local_state = hospital.train_locally(local_model, epochs=5)
        
        local_updates.append(local_state)
        local_weights.append(hospital.n_patients)
    
    # Aggregate updates
    new_state = federated_averaging(global_model, local_updates, local_weights)
    global_model.load_state_dict(new_state)

Projects like PySyft and NVIDIA FLARE provide production-grade federated learning frameworks for healthcare.

Tradeoffs

ApproachStrengthWeakness
OMOP CDMCross-institutional compatibilityETL is complex, lossy
Raw EHR extractComplete data accessSchema varies per institution
FHIR APIReal-time, standardizedBulk historical queries slow
Federated learningPrivacy-preservingCommunication overhead, convergence issues
Clinical NLPUnlocks 80% of data in notesError-prone, needs validation

The one thing to remember: Production EHR analytics in Python requires mastering the full stack — from OMOP CDM queries and temporal feature engineering through clinical NLP and de-identification — because the gap between raw hospital data and research-ready features is where most projects fail.

pythonhealthcaredata-engineering

See Also

  • Python Fhir Health Data How Python speaks the universal language that lets hospitals, apps, and doctors share your health information safely.