Python Slowly Changing Dimensions — Deep Dive

Implementing Slowly Changing Dimensions in production Python pipelines requires careful attention to change detection, surrogate key generation, date-range integrity, and performance at scale. This guide covers a full Type 2 implementation with real code patterns.

1) Change detection with hashing

Comparing every tracked column row-by-row is slow for large dimensions. Hash the columns instead:

import hashlib
import polars as pl

TRACKED_COLUMNS = ["name", "city", "tier", "email"]

def compute_row_hash(df: pl.DataFrame, columns: list[str]) -> pl.DataFrame:
    """Add a _row_hash column based on tracked columns."""
    return df.with_columns(
        pl.concat_str([pl.col(c).cast(pl.Utf8).fill_null("__NULL__") for c in columns])
        .map_elements(
            lambda s: hashlib.sha256(s.encode()).hexdigest()[:16],
            return_dtype=pl.Utf8,
        )
        .alias("_row_hash")
    )

# Incoming records from source
incoming = pl.DataFrame({
    "customer_id": [101, 102, 103],
    "name": ["Alice", "Bob", "Carol"],
    "city": ["Chicago", "Boston", "Denver"],
    "tier": ["gold", "silver", "bronze"],
    "email": ["alice@co.com", "bob@co.com", "carol@co.com"],
})

incoming = compute_row_hash(incoming, TRACKED_COLUMNS)

Compare hashes between incoming and current dimension to classify records:

def classify_changes(
    incoming: pl.DataFrame,
    current: pl.DataFrame,
    business_key: str = "customer_id",
) -> dict[str, pl.DataFrame]:
    """Classify incoming records as new, changed, or unchanged."""
    
    merged = incoming.join(
        current.filter(pl.col("is_current") == True)
              .select([business_key, "_row_hash"])
              .rename({"_row_hash": "_current_hash"}),
        on=business_key,
        how="left",
    )

    new_records = merged.filter(pl.col("_current_hash").is_null())
    changed = merged.filter(
        pl.col("_current_hash").is_not_null()
        & (pl.col("_row_hash") != pl.col("_current_hash"))
    )
    unchanged = merged.filter(
        pl.col("_current_hash").is_not_null()
        & (pl.col("_row_hash") == pl.col("_current_hash"))
    )

    return {
        "new": new_records.drop("_current_hash"),
        "changed": changed.drop("_current_hash"),
        "unchanged": unchanged.drop("_current_hash"),
    }

2) Type 2 implementation with pure Python

Surrogate key generation

Use a deterministic surrogate based on business key + version or a UUID:

import uuid
from datetime import date

FAR_FUTURE = date(9999, 12, 31)

def apply_scd_type2(
    current_dim: pl.DataFrame,
    changes: dict[str, pl.DataFrame],
    effective_date: date,
) -> pl.DataFrame:
    """Apply SCD Type 2 logic and return the updated dimension table."""
    
    today = effective_date
    
    # 1. Close changed records in current dimension
    changed_keys = set(changes["changed"]["customer_id"].to_list())
    
    closed = current_dim.with_columns([
        pl.when(
            pl.col("customer_id").is_in(changed_keys) & pl.col("is_current")
        )
        .then(pl.lit(today))
        .otherwise(pl.col("valid_to"))
        .alias("valid_to"),
        
        pl.when(
            pl.col("customer_id").is_in(changed_keys) & pl.col("is_current")
        )
        .then(pl.lit(False))
        .otherwise(pl.col("is_current"))
        .alias("is_current"),
    ])
    
    # 2. Create new version rows for changed records
    new_versions = changes["changed"].select(TRACKED_COLUMNS + ["customer_id", "_row_hash"]).with_columns([
        pl.Series("surrogate_key", [str(uuid.uuid4()) for _ in range(len(changes["changed"]))]),
        pl.lit(today).alias("valid_from"),
        pl.lit(FAR_FUTURE).alias("valid_to"),
        pl.lit(True).alias("is_current"),
    ])
    
    # 3. Create rows for brand-new records
    new_inserts = changes["new"].select(TRACKED_COLUMNS + ["customer_id", "_row_hash"]).with_columns([
        pl.Series("surrogate_key", [str(uuid.uuid4()) for _ in range(len(changes["new"]))]),
        pl.lit(today).alias("valid_from"),
        pl.lit(FAR_FUTURE).alias("valid_to"),
        pl.lit(True).alias("is_current"),
    ])
    
    # 4. Combine
    result = pl.concat([closed, new_versions, new_inserts], how="diagonal")
    return result

3) Type 2 with Delta Lake MERGE

For larger datasets, Delta Lake’s MERGE operation is more efficient than full-table rewrites:

from deltalake import DeltaTable, write_deltalake
import pyarrow as pa

def scd2_delta_merge(
    table_path: str,
    staged_updates: pa.Table,
    business_key: str = "customer_id",
):
    """
    Perform SCD Type 2 using Delta Lake MERGE.
    
    staged_updates must contain the changed/new records with
    _row_hash, valid_from, valid_to, is_current columns.
    """
    if not DeltaTable.is_deltatable(table_path):
        write_deltalake(table_path, staged_updates)
        return

    dt = DeltaTable(table_path)

    (
        dt.merge(
            source=staged_updates,
            predicate=f"target.{business_key} = source.{business_key} AND target.is_current = true",
            source_alias="source",
            target_alias="target",
        )
        # Close the old current record
        .when_matched_update({
            "valid_to": "source.valid_from",
            "is_current": "false",
        })
        # Insert the new version
        .when_not_matched_insert_all()
        .execute()
    )

Note: Delta MERGE treats matched + not-matched in a single atomic transaction, preventing partial states.

4) Handling edge cases

Retroactive changes

Sometimes a source system corrects a historical record. For example, a customer’s city was wrong in February and gets corrected in March. Options:

  • Insert correction as new Type 2 row with today’s effective date. Simple but historically inaccurate—February reports still show the wrong city.
  • Backfill: Rewrite the historical row with corrected data and adjust date ranges. Accurate but operationally risky. Use Delta Lake time travel as a safety net.

Multiple changes in one batch

If a customer appears twice in a single batch with different values, deduplicate before applying SCD. Keep the record with the latest source timestamp:

def deduplicate_batch(df: pl.DataFrame, key: str, ts_col: str) -> pl.DataFrame:
    return (
        df.sort(ts_col, descending=True)
        .unique(subset=[key], keep="first")
    )

Reprocessing and idempotency

SCD Type 2 is naturally non-idempotent—running it twice creates duplicate versions. Guard against this by:

  1. Checking if the incoming hash already matches the current hash (skip if unchanged).
  2. Using the batch date as valid_from rather than wall-clock time.
  3. Running deduplication on the dimension table after reprocessing.

5) Performance considerations

ScaleApproachWhy
< 1M rowsPolars full-table compareFast, simple, in-memory
1M-100M rowsDelta Lake MERGEAtomic, file-level granularity
100M+ rowsSpark + Iceberg/DeltaDistributed, partition pruning

Partition the dimension table

For large Type 2 tables, partition by a date column or region to limit the scan surface during MERGE:

write_deltalake(
    "s3://warehouse/dim_customer/",
    table,
    partition_by=["region"],
    mode="overwrite",
)

Index on business key

If using a SQL warehouse (Snowflake, Redshift, Postgres), ensure a covering index on (business_key, is_current) to speed up lookups.

6) Validation

After every SCD run, validate invariants:

def validate_scd2(dim: pl.DataFrame, business_key: str):
    # Every business key has exactly one current record
    current_counts = (
        dim.filter(pl.col("is_current"))
        .group_by(business_key)
        .agg(pl.count().alias("n"))
    )
    assert current_counts["n"].max() == 1, "Duplicate current records found"

    # No gaps in date ranges for each business key
    for key, group in dim.group_by(business_key):
        sorted_group = group.sort("valid_from")
        for i in range(1, len(sorted_group)):
            prev_to = sorted_group[i - 1]["valid_to"].item()
            curr_from = sorted_group[i]["valid_from"].item()
            assert prev_to == curr_from, (
                f"Date gap for {key}: {prev_to}{curr_from}"
            )

    # valid_from < valid_to for all rows
    assert (dim["valid_from"] < dim["valid_to"]).all(), "Invalid date range found"

7) Testing strategy

import pytest

def test_new_record_gets_current_flag():
    result = apply_scd_type2(empty_dim, {"new": one_record, "changed": empty}, today)
    assert result.filter(pl.col("is_current")).height == 1

def test_changed_record_closes_old_version():
    result = apply_scd_type2(existing_dim, {"new": empty, "changed": updated_record}, today)
    old = result.filter(~pl.col("is_current") & (pl.col("customer_id") == 101))
    assert old["valid_to"].item() == today

def test_unchanged_record_not_duplicated():
    result = apply_scd_type2(existing_dim, {"new": empty, "changed": empty}, today)
    assert result.height == existing_dim.height

def test_idempotent_rerun():
    first = apply_scd_type2(existing_dim, changes, today)
    second = apply_scd_type2(first, changes, today)
    assert first.height == second.height  # no new rows on rerun

One thing to remember: SCD Type 2 is only as reliable as your change detection—hash your tracked columns, validate date-range integrity after every run, and always guard against duplicate current records.

pythonscddata-warehousing

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.