Python for Electronic Health Records — Deep Dive
EHR data architecture
Production EHR analytics systems operate within strict technical and regulatory constraints. The architecture must handle data volumes (millions of patients, billions of events), protect privacy (HIPAA, GDPR), and produce reproducible results for clinical validation.
EHR System (Epic/Cerner) → ETL to OMOP CDM → Feature Store →
Model Training (isolated environment) → Validation → Deployment (EHR integration)
Working with OMOP CDM in Python
Schema overview
The OMOP CDM organizes clinical data into standardized tables with vocabulary mappings:
import sqlalchemy as sa
from sqlalchemy.orm import Session
# Key OMOP tables and their relationships
# person → condition_occurrence → concept (diagnosis)
# person → drug_exposure → concept (medication)
# person → measurement → concept (lab test)
# person → observation → concept (clinical observation)
# person → visit_occurrence → visit_detail
engine = sa.create_engine("postgresql://analyst:pass@omop-db/cdm_v5")
Cohort definition
Clinical studies start with cohort identification. OMOP uses the cohort table pattern:
def build_diabetes_cohort(engine) -> pd.DataFrame:
"""
Inclusion: Type 2 diabetes diagnosis + HbA1c >= 6.5%
Exclusion: Type 1 diabetes, age < 18, pregnancy
"""
query = """
WITH t2d_patients AS (
SELECT DISTINCT co.person_id,
MIN(co.condition_start_date) AS index_date
FROM condition_occurrence co
JOIN concept_ancestor ca
ON co.condition_concept_id = ca.descendant_concept_id
WHERE ca.ancestor_concept_id = 201826 -- T2D SNOMED hierarchy
GROUP BY co.person_id
),
with_hba1c AS (
SELECT t.person_id, t.index_date
FROM t2d_patients t
JOIN measurement m ON t.person_id = m.person_id
WHERE m.measurement_concept_id = 3004410 -- HbA1c
AND m.value_as_number >= 6.5
AND m.measurement_date BETWEEN t.index_date - INTERVAL '1 year'
AND t.index_date + INTERVAL '1 year'
),
exclusions AS (
SELECT DISTINCT person_id FROM condition_occurrence
WHERE condition_concept_id IN (
201254, -- Type 1 diabetes
4299535 -- Pregnancy
)
)
SELECT w.person_id, w.index_date,
p.year_of_birth, p.gender_concept_id, p.race_concept_id
FROM with_hba1c w
JOIN person p ON w.person_id = p.person_id
WHERE p.year_of_birth <= EXTRACT(YEAR FROM w.index_date) - 18
AND w.person_id NOT IN (SELECT person_id FROM exclusions)
"""
return pd.read_sql(query, engine)
Temporal feature engineering
EHR data is inherently temporal. Features must capture both point-in-time values and trends:
from typing import Optional
import numpy as np
class TemporalFeatureBuilder:
def __init__(self, measurements: pd.DataFrame, index_dates: pd.DataFrame):
self.measurements = measurements
self.index_dates = index_dates
def build_features(self, concept_id: int, windows: list[int] = [7, 30, 90, 365]) -> pd.DataFrame:
"""Build temporal features for a measurement concept relative to index date."""
merged = self.measurements[
self.measurements["measurement_concept_id"] == concept_id
].merge(self.index_dates, on="person_id")
merged["days_before"] = (
pd.to_datetime(merged["index_date"]) -
pd.to_datetime(merged["measurement_date"])
).dt.days
features = {}
for window in windows:
window_data = merged[
(merged["days_before"] >= 0) & (merged["days_before"] <= window)
]
agg = window_data.groupby("person_id")["value_as_number"].agg([
"last", "mean", "std", "min", "max", "count"
])
agg.columns = [f"c{concept_id}_{window}d_{stat}" for stat in agg.columns]
features[window] = agg
result = self.index_dates[["person_id"]].set_index("person_id")
for window_features in features.values():
result = result.join(window_features, how="left")
return result.reset_index()
def build_trend(self, concept_id: int, window: int = 90) -> pd.DataFrame:
"""Compute slope of measurements over a window (trend detection)."""
merged = self.measurements[
self.measurements["measurement_concept_id"] == concept_id
].merge(self.index_dates, on="person_id")
merged["days_before"] = (
pd.to_datetime(merged["index_date"]) -
pd.to_datetime(merged["measurement_date"])
).dt.days
window_data = merged[
(merged["days_before"] >= 0) & (merged["days_before"] <= window)
]
def compute_slope(group):
if len(group) < 2:
return np.nan
x = group["days_before"].values.astype(float)
y = group["value_as_number"].values
slope = np.polyfit(x, y, 1)[0]
return -slope # Positive = increasing over time
slopes = window_data.groupby("person_id").apply(compute_slope)
slopes.name = f"c{concept_id}_{window}d_trend"
return slopes.reset_index()
Clinical NLP pipeline
Building a production clinical NLP system
import spacy
import medspacy
from medspacy.ner import TargetRule
from medspacy.context import ConTextRule
def build_clinical_nlp():
nlp = medspacy.load(enable=["medspacy_pyrush", "medspacy_context"])
# Add custom entity rules for specific use case
target_matcher = nlp.get_pipe("medspacy_target_matcher")
target_rules = [
TargetRule("ejection fraction", "MEASUREMENT",
pattern=[{"LOWER": "ejection"}, {"LOWER": "fraction"}]),
TargetRule("chest pain", "SYMPTOM",
pattern=[{"LOWER": "chest"}, {"LOWER": "pain"}]),
]
target_matcher.add(target_rules)
# Context rules for assertion detection
context = nlp.get_pipe("medspacy_context")
context_rules = [
ConTextRule("patient denies", "NEGATED_EXISTENCE", direction="FORWARD"),
ConTextRule("no evidence of", "NEGATED_EXISTENCE", direction="FORWARD"),
ConTextRule("family history of", "FAMILY", direction="FORWARD"),
ConTextRule("on admission", "HISTORICAL", direction="FORWARD"),
]
context.add(context_rules)
return nlp
def process_notes(nlp, notes_df: pd.DataFrame) -> pd.DataFrame:
"""Extract structured findings from clinical notes."""
results = []
for row in notes_df.itertuples():
doc = nlp(row.note_text)
for ent in doc.ents:
results.append({
"note_id": row.note_id,
"person_id": row.person_id,
"entity_text": ent.text,
"entity_label": ent.label_,
"is_negated": ent._.is_negated,
"is_historical": ent._.is_historical,
"is_family": ent._.is_family,
"section": ent._.section_category,
"start_char": ent.start_char,
"end_char": ent.end_char,
})
return pd.DataFrame(results)
Extracting numerical values from text
Clinical notes contain critical measurements embedded in free text:
import re
def extract_ejection_fraction(text: str) -> Optional[float]:
"""Extract ejection fraction percentage from clinical note."""
patterns = [
r"(?:ejection fraction|EF|LVEF)\s*(?:of|is|was|:)?\s*(\d{1,2})(?:\s*%|-\d{1,2}%)",
r"(?:ejection fraction|EF|LVEF)\s*(?:of|is|was|:)?\s*(\d{1,2})\s*to\s*(\d{1,2})\s*%",
r"EF\s*(\d{1,2})%",
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
groups = match.groups()
if len(groups) == 2: # Range: take midpoint
return (float(groups[0]) + float(groups[1])) / 2
return float(groups[0])
return None
De-identification at scale
Using Presidio for PHI removal
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def deidentify_note(text: str) -> str:
results = analyzer.analyze(
text=text,
entities=["PERSON", "DATE_TIME", "PHONE_NUMBER", "EMAIL_ADDRESS",
"LOCATION", "US_SSN", "MEDICAL_LICENSE"],
language="en"
)
anonymized = anonymizer.anonymize(
text=text,
analyzer_results=results,
operators={
"PERSON": OperatorConfig("replace", {"new_value": "[NAME]"}),
"DATE_TIME": OperatorConfig("replace", {"new_value": "[DATE]"}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}),
"LOCATION": OperatorConfig("replace", {"new_value": "[LOCATION]"}),
"DEFAULT": OperatorConfig("replace", {"new_value": "[REDACTED]"}),
}
)
return anonymized.text
Date shifting for longitudinal studies
import hashlib
from datetime import timedelta
def get_date_shift(person_id: int, secret_key: str, max_shift_days: int = 365) -> int:
"""Deterministic date shift per patient (same shift for all their dates)."""
hash_input = f"{person_id}:{secret_key}".encode()
hash_value = int(hashlib.sha256(hash_input).hexdigest(), 16)
return (hash_value % (2 * max_shift_days)) - max_shift_days
def shift_dates(df: pd.DataFrame, date_columns: list[str],
secret_key: str) -> pd.DataFrame:
shifted = df.copy()
for _, group in shifted.groupby("person_id"):
shift = get_date_shift(group["person_id"].iloc[0], secret_key)
for col in date_columns:
shifted.loc[group.index, col] = (
pd.to_datetime(group[col]) + timedelta(days=shift)
)
return shifted
Federated learning across institutions
When data cannot leave the hospital, federated learning trains models across institutions without sharing patient records:
import torch
import torch.nn as nn
from typing import OrderedDict
class FederatedEHRModel(nn.Module):
def __init__(self, n_features: int, n_classes: int):
super().__init__()
self.network = nn.Sequential(
nn.Linear(n_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, n_classes),
)
def forward(self, x):
return self.network(x)
def federated_averaging(global_model: nn.Module,
local_updates: list[OrderedDict],
weights: list[float]) -> OrderedDict:
"""Weighted average of local model updates (FedAvg algorithm)."""
total_weight = sum(weights)
averaged = OrderedDict()
for key in global_model.state_dict():
averaged[key] = sum(
w * update[key] for update, w in zip(local_updates, weights)
) / total_weight
return averaged
# Coordination loop
global_model = FederatedEHRModel(n_features=200, n_classes=2)
for round_num in range(50):
# Send global model to each hospital
global_state = global_model.state_dict()
local_updates = []
local_weights = []
for hospital in hospitals:
# Each hospital trains locally for a few epochs
local_model = FederatedEHRModel(200, 2)
local_model.load_state_dict(global_state)
local_state = hospital.train_locally(local_model, epochs=5)
local_updates.append(local_state)
local_weights.append(hospital.n_patients)
# Aggregate updates
new_state = federated_averaging(global_model, local_updates, local_weights)
global_model.load_state_dict(new_state)
Projects like PySyft and NVIDIA FLARE provide production-grade federated learning frameworks for healthcare.
Tradeoffs
| Approach | Strength | Weakness |
|---|---|---|
| OMOP CDM | Cross-institutional compatibility | ETL is complex, lossy |
| Raw EHR extract | Complete data access | Schema varies per institution |
| FHIR API | Real-time, standardized | Bulk historical queries slow |
| Federated learning | Privacy-preserving | Communication overhead, convergence issues |
| Clinical NLP | Unlocks 80% of data in notes | Error-prone, needs validation |
The one thing to remember: Production EHR analytics in Python requires mastering the full stack — from OMOP CDM queries and temporal feature engineering through clinical NLP and de-identification — because the gap between raw hospital data and research-ready features is where most projects fail.
See Also
- Python Fhir Health Data How Python speaks the universal language that lets hospitals, apps, and doctors share your health information safely.