Python Medallion Architecture — Deep Dive

Medallion architecture is more than a naming convention. Implementing it well requires deliberate choices about file formats, schema enforcement, idempotency, and orchestration. This guide walks through a production-grade Python implementation.

1) Bronze layer implementation

Ingestion with metadata enrichment

Every Bronze record should carry provenance metadata so you can trace any downstream issue back to its source:

import json
import uuid
from datetime import datetime, timezone
from pathlib import Path

import boto3

def ingest_to_bronze(
    source_name: str,
    raw_payload: list[dict],
    output_path: str,
) -> str:
    batch_id = str(uuid.uuid4())
    ingestion_ts = datetime.now(timezone.utc).isoformat()

    enriched = [
        {
            "_batch_id": batch_id,
            "_ingested_at": ingestion_ts,
            "_source": source_name,
            **record,
        }
        for record in raw_payload
    ]

    # Write as newline-delimited JSON for maximum compatibility
    file_key = f"bronze/{source_name}/{ingestion_ts[:10]}/{batch_id}.ndjson.gz"
    
    import gzip
    payload = gzip.compress(
        "\n".join(json.dumps(r) for r in enriched).encode()
    )

    s3 = boto3.client("s3")
    s3.put_object(Bucket="data-lake", Key=file_key, Body=payload)
    return batch_id

Key decisions:

  • Format: NDJSON (newline-delimited JSON) preserves every field regardless of schema variations across batches.
  • Compression: gzip reduces storage 5-10x for text-heavy payloads.
  • Immutability: Bronze files are never updated. New data means new files.

Handling schema drift at Bronze

Bronze must absorb schema drift gracefully. If a source adds a new field or changes a type, Bronze stores it as-is. Do not validate or reject at this layer.

# Bronze accepts anything—these records have different shapes
batch_a = [{"user_id": 1, "name": "Alice"}]
batch_b = [{"user_id": "U-002", "name": "Bob", "tier": "premium"}]
# Both are valid Bronze records

2) Silver layer implementation

Schema enforcement with Pydantic

Silver is where you impose structure. Pydantic models define the expected schema and reject non-conforming records:

from pydantic import BaseModel, field_validator
from datetime import date
from typing import Optional

class OrderSilver(BaseModel):
    order_id: int
    customer_id: int
    amount: float
    currency: str
    order_date: date
    region: str
    source: str

    @field_validator("amount")
    @classmethod
    def amount_must_be_positive(cls, v: float) -> float:
        if v <= 0:
            raise ValueError(f"amount must be positive, got {v}")
        return round(v, 2)

    @field_validator("currency")
    @classmethod
    def currency_must_be_iso(cls, v: str) -> str:
        valid = {"USD", "EUR", "GBP", "JPY", "CAD"}
        if v.upper() not in valid:
            raise ValueError(f"unknown currency {v}")
        return v.upper()

Bronze-to-Silver pipeline

import polars as pl
from pydantic import ValidationError

def bronze_to_silver(bronze_path: str, silver_path: str, error_path: str):
    # Read raw NDJSON files
    raw = pl.read_ndjson(bronze_path)

    valid_records = []
    error_records = []

    for row in raw.iter_rows(named=True):
        try:
            validated = OrderSilver(**row)
            valid_records.append(validated.model_dump())
        except ValidationError as e:
            error_records.append({
                **row,
                "_validation_errors": str(e),
                "_failed_at": datetime.now(timezone.utc).isoformat(),
            })

    # Write valid records as Parquet
    if valid_records:
        silver_df = pl.DataFrame(valid_records)
        silver_df.write_parquet(silver_path)

    # Write errors for investigation (dead letter pattern)
    if error_records:
        error_df = pl.DataFrame(error_records)
        error_df.write_parquet(error_path)

    return len(valid_records), len(error_records)

Deduplication strategies

Common approaches for Silver deduplication:

  1. Exact dedup: Group by primary key, keep the latest _ingested_at.
  2. Window dedup: For event streams, deduplicate within a time window using event ID.
  3. Merge/upsert: For Delta Lake tables, use MERGE to update existing rows and insert new ones.
from deltalake import DeltaTable, write_deltalake

def upsert_to_silver_delta(new_data: pl.DataFrame, table_path: str):
    if DeltaTable.is_deltatable(table_path):
        dt = DeltaTable(table_path)
        (
            dt.merge(
                source=new_data.to_arrow(),
                predicate="target.order_id = source.order_id",
                source_alias="source",
                target_alias="target",
            )
            .when_matched_update_all()
            .when_not_matched_insert_all()
            .execute()
        )
    else:
        write_deltalake(table_path, new_data.to_arrow(), mode="overwrite")

3) Gold layer implementation

Aggregation patterns

Gold tables are purpose-built for specific consumers. Each Gold table has a clear owner and documented refresh cadence:

import duckdb

def build_daily_revenue_gold(silver_path: str, gold_path: str):
    conn = duckdb.connect()
    result = conn.execute(f"""
        SELECT
            order_date,
            region,
            currency,
            COUNT(*) as order_count,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value,
            MIN(amount) as min_order,
            MAX(amount) as max_order
        FROM read_parquet('{silver_path}/**/*.parquet')
        GROUP BY order_date, region, currency
        ORDER BY order_date DESC, total_revenue DESC
    """).arrow()

    import pyarrow.parquet as pq
    pq.write_table(result, gold_path)

Slowly changing dimensions in Gold

When Gold tables track entities that change over time (customer tier, product price), implement SCD Type 2 to preserve history. This enables time-travel queries without relying on Delta Lake versioning.

4) Orchestration

Airflow DAG structure

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule="@daily",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["medallion", "orders"],
)
def orders_medallion_pipeline():

    @task
    def ingest_bronze(**context):
        logical_date = context["logical_date"].strftime("%Y-%m-%d")
        # Fetch from source API, write to Bronze
        return {"batch_id": "abc-123", "date": logical_date}

    @task
    def transform_silver(bronze_meta: dict):
        # Read Bronze for the given date, validate, write Silver
        valid, errors = bronze_to_silver(
            f"s3://data-lake/bronze/orders/{bronze_meta['date']}/",
            f"s3://data-lake/silver/orders/{bronze_meta['date']}/",
            f"s3://data-lake/errors/orders/{bronze_meta['date']}/",
        )
        return {"valid": valid, "errors": errors}

    @task
    def build_gold(silver_meta: dict):
        # Aggregate Silver into Gold
        build_daily_revenue_gold(
            "s3://data-lake/silver/orders/",
            "s3://data-lake/gold/daily_revenue/latest.parquet",
        )

    bronze = ingest_bronze()
    silver = transform_silver(bronze)
    build_gold(silver)

orders_medallion_pipeline()

Quality gates between layers

Insert quality checks as separate tasks between layer transitions:

@task
def quality_gate_silver(silver_meta: dict):
    import great_expectations as gx

    context = gx.get_context()
    result = context.run_checkpoint("silver_orders_checkpoint")

    if not result.success:
        raise ValueError(
            f"Silver quality gate failed: {result.statistics}"
        )

If the quality gate fails, the DAG stops before Gold is built—protecting downstream consumers from bad data.

5) Testing the pipeline

Unit tests per layer

def test_bronze_metadata_enrichment():
    records = [{"order_id": 1, "amount": 50.0}]
    result = enrich_bronze("test_source", records)
    assert "_batch_id" in result[0]
    assert "_ingested_at" in result[0]
    assert result[0]["_source"] == "test_source"

def test_silver_rejects_negative_amount():
    from pydantic import ValidationError
    with pytest.raises(ValidationError):
        OrderSilver(
            order_id=1, customer_id=1, amount=-10.0,
            currency="USD", order_date="2026-03-28",
            region="us-east", source="test",
        )

def test_gold_aggregation():
    # Create temporary Silver Parquet, run Gold builder, verify sums
    ...

Integration tests

Run the full pipeline on synthetic data in a test bucket. Verify:

  • Bronze file count matches source record count.
  • Silver row count equals Bronze minus expected rejections.
  • Gold aggregates are mathematically consistent with Silver.

Tradeoffs

ApproachProsCons
Delta Lake for all layersACID, time travel, MERGEHeavier setup, Spark dependency for some ops
Plain Parquet + overwriteSimple, any engine reads itNo transactions, no concurrent writers
Iceberg for Silver/GoldHidden partitioning, schema evolutionCatalog dependency, smaller Python ecosystem
Streaming Bronze, batch Silver/GoldLow latency ingestComplexity of two processing models

Production checklist

  • Bronze stores raw data with _batch_id, _ingested_at, _source metadata
  • Silver validates every record against a typed schema (Pydantic, Pandera)
  • Dead letter table captures Silver validation failures with error details
  • Gold tables have documented owners, refresh cadence, and SLAs
  • Quality gates block layer promotion on failure
  • Orchestrator retries are idempotent (rerunning does not create duplicates)
  • Each layer uses appropriate file format (NDJSON → Parquet → optimized Parquet)
  • Compaction runs periodically on Delta/Iceberg tables
  • Monitoring tracks row counts, error rates, and latency per layer transition

One thing to remember: the medallion pattern’s real value is not the three layers themselves but the explicit contracts between them—each transition is a testable, retriable, observable unit of work.

pythonmedallion-architecturedata-engineering

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.