Python Slowly Changing Dimensions — Deep Dive
Implementing Slowly Changing Dimensions in production Python pipelines requires careful attention to change detection, surrogate key generation, date-range integrity, and performance at scale. This guide covers a full Type 2 implementation with real code patterns.
1) Change detection with hashing
Comparing every tracked column row-by-row is slow for large dimensions. Hash the columns instead:
import hashlib
import polars as pl
TRACKED_COLUMNS = ["name", "city", "tier", "email"]
def compute_row_hash(df: pl.DataFrame, columns: list[str]) -> pl.DataFrame:
"""Add a _row_hash column based on tracked columns."""
return df.with_columns(
pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("__NULL__") for c in columns])
.map_elements(
lambda s: hashlib.sha256(s.encode()).hexdigest()[:16],
return_dtype=pl.Utf8,
)
.alias("_row_hash")
)
# Incoming records from source
incoming = pl.DataFrame({
"customer_id": [101, 102, 103],
"name": ["Alice", "Bob", "Carol"],
"city": ["Chicago", "Boston", "Denver"],
"tier": ["gold", "silver", "bronze"],
"email": ["alice@co.com", "bob@co.com", "carol@co.com"],
})
incoming = compute_row_hash(incoming, TRACKED_COLUMNS)
Compare hashes between incoming and current dimension to classify records:
def classify_changes(
incoming: pl.DataFrame,
current: pl.DataFrame,
business_key: str = "customer_id",
) -> dict[str, pl.DataFrame]:
"""Classify incoming records as new, changed, or unchanged."""
merged = incoming.join(
current.filter(pl.col("is_current") == True)
.select([business_key, "_row_hash"])
.rename({"_row_hash": "_current_hash"}),
on=business_key,
how="left",
)
new_records = merged.filter(pl.col("_current_hash").is_null())
changed = merged.filter(
pl.col("_current_hash").is_not_null()
& (pl.col("_row_hash") != pl.col("_current_hash"))
)
unchanged = merged.filter(
pl.col("_current_hash").is_not_null()
& (pl.col("_row_hash") == pl.col("_current_hash"))
)
return {
"new": new_records.drop("_current_hash"),
"changed": changed.drop("_current_hash"),
"unchanged": unchanged.drop("_current_hash"),
}
2) Type 2 implementation with pure Python
Surrogate key generation
Use a deterministic surrogate based on business key + version or a UUID:
import uuid
from datetime import date
FAR_FUTURE = date(9999, 12, 31)
def apply_scd_type2(
current_dim: pl.DataFrame,
changes: dict[str, pl.DataFrame],
effective_date: date,
) -> pl.DataFrame:
"""Apply SCD Type 2 logic and return the updated dimension table."""
today = effective_date
# 1. Close changed records in current dimension
changed_keys = set(changes["changed"]["customer_id"].to_list())
closed = current_dim.with_columns([
pl.when(
pl.col("customer_id").is_in(changed_keys) & pl.col("is_current")
)
.then(pl.lit(today))
.otherwise(pl.col("valid_to"))
.alias("valid_to"),
pl.when(
pl.col("customer_id").is_in(changed_keys) & pl.col("is_current")
)
.then(pl.lit(False))
.otherwise(pl.col("is_current"))
.alias("is_current"),
])
# 2. Create new version rows for changed records
new_versions = changes["changed"].select(TRACKED_COLUMNS + ["customer_id", "_row_hash"]).with_columns([
pl.Series("surrogate_key", [str(uuid.uuid4()) for _ in range(len(changes["changed"]))]),
pl.lit(today).alias("valid_from"),
pl.lit(FAR_FUTURE).alias("valid_to"),
pl.lit(True).alias("is_current"),
])
# 3. Create rows for brand-new records
new_inserts = changes["new"].select(TRACKED_COLUMNS + ["customer_id", "_row_hash"]).with_columns([
pl.Series("surrogate_key", [str(uuid.uuid4()) for _ in range(len(changes["new"]))]),
pl.lit(today).alias("valid_from"),
pl.lit(FAR_FUTURE).alias("valid_to"),
pl.lit(True).alias("is_current"),
])
# 4. Combine
result = pl.concat([closed, new_versions, new_inserts], how="diagonal")
return result
3) Type 2 with Delta Lake MERGE
For larger datasets, Delta Lake’s MERGE operation is more efficient than full-table rewrites:
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
def scd2_delta_merge(
table_path: str,
staged_updates: pa.Table,
business_key: str = "customer_id",
):
"""
Perform SCD Type 2 using Delta Lake MERGE.
staged_updates must contain the changed/new records with
_row_hash, valid_from, valid_to, is_current columns.
"""
if not DeltaTable.is_deltatable(table_path):
write_deltalake(table_path, staged_updates)
return
dt = DeltaTable(table_path)
(
dt.merge(
source=staged_updates,
predicate=f"target.{business_key} = source.{business_key} AND target.is_current = true",
source_alias="source",
target_alias="target",
)
# Close the old current record
.when_matched_update({
"valid_to": "source.valid_from",
"is_current": "false",
})
# Insert the new version
.when_not_matched_insert_all()
.execute()
)
Note: Delta MERGE treats matched + not-matched in a single atomic transaction, preventing partial states.
4) Handling edge cases
Retroactive changes
Sometimes a source system corrects a historical record. For example, a customer’s city was wrong in February and gets corrected in March. Options:
- Insert correction as new Type 2 row with today’s effective date. Simple but historically inaccurate—February reports still show the wrong city.
- Backfill: Rewrite the historical row with corrected data and adjust date ranges. Accurate but operationally risky. Use Delta Lake time travel as a safety net.
Multiple changes in one batch
If a customer appears twice in a single batch with different values, deduplicate before applying SCD. Keep the record with the latest source timestamp:
def deduplicate_batch(df: pl.DataFrame, key: str, ts_col: str) -> pl.DataFrame:
return (
df.sort(ts_col, descending=True)
.unique(subset=[key], keep="first")
)
Reprocessing and idempotency
SCD Type 2 is naturally non-idempotent—running it twice creates duplicate versions. Guard against this by:
- Checking if the incoming hash already matches the current hash (skip if unchanged).
- Using the batch date as
valid_fromrather than wall-clock time. - Running deduplication on the dimension table after reprocessing.
5) Performance considerations
| Scale | Approach | Why |
|---|---|---|
| < 1M rows | Polars full-table compare | Fast, simple, in-memory |
| 1M-100M rows | Delta Lake MERGE | Atomic, file-level granularity |
| 100M+ rows | Spark + Iceberg/Delta | Distributed, partition pruning |
Partition the dimension table
For large Type 2 tables, partition by a date column or region to limit the scan surface during MERGE:
write_deltalake(
"s3://warehouse/dim_customer/",
table,
partition_by=["region"],
mode="overwrite",
)
Index on business key
If using a SQL warehouse (Snowflake, Redshift, Postgres), ensure a covering index on (business_key, is_current) to speed up lookups.
6) Validation
After every SCD run, validate invariants:
def validate_scd2(dim: pl.DataFrame, business_key: str):
# Every business key has exactly one current record
current_counts = (
dim.filter(pl.col("is_current"))
.group_by(business_key)
.agg(pl.count().alias("n"))
)
assert current_counts["n"].max() == 1, "Duplicate current records found"
# No gaps in date ranges for each business key
for key, group in dim.group_by(business_key):
sorted_group = group.sort("valid_from")
for i in range(1, len(sorted_group)):
prev_to = sorted_group[i - 1]["valid_to"].item()
curr_from = sorted_group[i]["valid_from"].item()
assert prev_to == curr_from, (
f"Date gap for {key}: {prev_to} → {curr_from}"
)
# valid_from < valid_to for all rows
assert (dim["valid_from"] < dim["valid_to"]).all(), "Invalid date range found"
7) Testing strategy
import pytest
def test_new_record_gets_current_flag():
result = apply_scd_type2(empty_dim, {"new": one_record, "changed": empty}, today)
assert result.filter(pl.col("is_current")).height == 1
def test_changed_record_closes_old_version():
result = apply_scd_type2(existing_dim, {"new": empty, "changed": updated_record}, today)
old = result.filter(~pl.col("is_current") & (pl.col("customer_id") == 101))
assert old["valid_to"].item() == today
def test_unchanged_record_not_duplicated():
result = apply_scd_type2(existing_dim, {"new": empty, "changed": empty}, today)
assert result.height == existing_dim.height
def test_idempotent_rerun():
first = apply_scd_type2(existing_dim, changes, today)
second = apply_scd_type2(first, changes, today)
assert first.height == second.height # no new rows on rerun
One thing to remember: SCD Type 2 is only as reliable as your change detection—hash your tracked columns, validate date-range integrity after every run, and always guard against duplicate current records.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.