Python Medallion Architecture — Deep Dive
Medallion architecture is more than a naming convention. Implementing it well requires deliberate choices about file formats, schema enforcement, idempotency, and orchestration. This guide walks through a production-grade Python implementation.
1) Bronze layer implementation
Ingestion with metadata enrichment
Every Bronze record should carry provenance metadata so you can trace any downstream issue back to its source:
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
import boto3
def ingest_to_bronze(
source_name: str,
raw_payload: list[dict],
output_path: str,
) -> str:
batch_id = str(uuid.uuid4())
ingestion_ts = datetime.now(timezone.utc).isoformat()
enriched = [
{
"_batch_id": batch_id,
"_ingested_at": ingestion_ts,
"_source": source_name,
**record,
}
for record in raw_payload
]
# Write as newline-delimited JSON for maximum compatibility
file_key = f"bronze/{source_name}/{ingestion_ts[:10]}/{batch_id}.ndjson.gz"
import gzip
payload = gzip.compress(
"\n".join(json.dumps(r) for r in enriched).encode()
)
s3 = boto3.client("s3")
s3.put_object(Bucket="data-lake", Key=file_key, Body=payload)
return batch_id
Key decisions:
- Format: NDJSON (newline-delimited JSON) preserves every field regardless of schema variations across batches.
- Compression: gzip reduces storage 5-10x for text-heavy payloads.
- Immutability: Bronze files are never updated. New data means new files.
Handling schema drift at Bronze
Bronze must absorb schema drift gracefully. If a source adds a new field or changes a type, Bronze stores it as-is. Do not validate or reject at this layer.
# Bronze accepts anything—these records have different shapes
batch_a = [{"user_id": 1, "name": "Alice"}]
batch_b = [{"user_id": "U-002", "name": "Bob", "tier": "premium"}]
# Both are valid Bronze records
2) Silver layer implementation
Schema enforcement with Pydantic
Silver is where you impose structure. Pydantic models define the expected schema and reject non-conforming records:
from pydantic import BaseModel, field_validator
from datetime import date
from typing import Optional
class OrderSilver(BaseModel):
order_id: int
customer_id: int
amount: float
currency: str
order_date: date
region: str
source: str
@field_validator("amount")
@classmethod
def amount_must_be_positive(cls, v: float) -> float:
if v <= 0:
raise ValueError(f"amount must be positive, got {v}")
return round(v, 2)
@field_validator("currency")
@classmethod
def currency_must_be_iso(cls, v: str) -> str:
valid = {"USD", "EUR", "GBP", "JPY", "CAD"}
if v.upper() not in valid:
raise ValueError(f"unknown currency {v}")
return v.upper()
Bronze-to-Silver pipeline
import polars as pl
from pydantic import ValidationError
def bronze_to_silver(bronze_path: str, silver_path: str, error_path: str):
# Read raw NDJSON files
raw = pl.read_ndjson(bronze_path)
valid_records = []
error_records = []
for row in raw.iter_rows(named=True):
try:
validated = OrderSilver(**row)
valid_records.append(validated.model_dump())
except ValidationError as e:
error_records.append({
**row,
"_validation_errors": str(e),
"_failed_at": datetime.now(timezone.utc).isoformat(),
})
# Write valid records as Parquet
if valid_records:
silver_df = pl.DataFrame(valid_records)
silver_df.write_parquet(silver_path)
# Write errors for investigation (dead letter pattern)
if error_records:
error_df = pl.DataFrame(error_records)
error_df.write_parquet(error_path)
return len(valid_records), len(error_records)
Deduplication strategies
Common approaches for Silver deduplication:
- Exact dedup: Group by primary key, keep the latest
_ingested_at. - Window dedup: For event streams, deduplicate within a time window using event ID.
- Merge/upsert: For Delta Lake tables, use MERGE to update existing rows and insert new ones.
from deltalake import DeltaTable, write_deltalake
def upsert_to_silver_delta(new_data: pl.DataFrame, table_path: str):
if DeltaTable.is_deltatable(table_path):
dt = DeltaTable(table_path)
(
dt.merge(
source=new_data.to_arrow(),
predicate="target.order_id = source.order_id",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
else:
write_deltalake(table_path, new_data.to_arrow(), mode="overwrite")
3) Gold layer implementation
Aggregation patterns
Gold tables are purpose-built for specific consumers. Each Gold table has a clear owner and documented refresh cadence:
import duckdb
def build_daily_revenue_gold(silver_path: str, gold_path: str):
conn = duckdb.connect()
result = conn.execute(f"""
SELECT
order_date,
region,
currency,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
MIN(amount) as min_order,
MAX(amount) as max_order
FROM read_parquet('{silver_path}/**/*.parquet')
GROUP BY order_date, region, currency
ORDER BY order_date DESC, total_revenue DESC
""").arrow()
import pyarrow.parquet as pq
pq.write_table(result, gold_path)
Slowly changing dimensions in Gold
When Gold tables track entities that change over time (customer tier, product price), implement SCD Type 2 to preserve history. This enables time-travel queries without relying on Delta Lake versioning.
4) Orchestration
Airflow DAG structure
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule="@daily",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["medallion", "orders"],
)
def orders_medallion_pipeline():
@task
def ingest_bronze(**context):
logical_date = context["logical_date"].strftime("%Y-%m-%d")
# Fetch from source API, write to Bronze
return {"batch_id": "abc-123", "date": logical_date}
@task
def transform_silver(bronze_meta: dict):
# Read Bronze for the given date, validate, write Silver
valid, errors = bronze_to_silver(
f"s3://data-lake/bronze/orders/{bronze_meta['date']}/",
f"s3://data-lake/silver/orders/{bronze_meta['date']}/",
f"s3://data-lake/errors/orders/{bronze_meta['date']}/",
)
return {"valid": valid, "errors": errors}
@task
def build_gold(silver_meta: dict):
# Aggregate Silver into Gold
build_daily_revenue_gold(
"s3://data-lake/silver/orders/",
"s3://data-lake/gold/daily_revenue/latest.parquet",
)
bronze = ingest_bronze()
silver = transform_silver(bronze)
build_gold(silver)
orders_medallion_pipeline()
Quality gates between layers
Insert quality checks as separate tasks between layer transitions:
@task
def quality_gate_silver(silver_meta: dict):
import great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint("silver_orders_checkpoint")
if not result.success:
raise ValueError(
f"Silver quality gate failed: {result.statistics}"
)
If the quality gate fails, the DAG stops before Gold is built—protecting downstream consumers from bad data.
5) Testing the pipeline
Unit tests per layer
def test_bronze_metadata_enrichment():
records = [{"order_id": 1, "amount": 50.0}]
result = enrich_bronze("test_source", records)
assert "_batch_id" in result[0]
assert "_ingested_at" in result[0]
assert result[0]["_source"] == "test_source"
def test_silver_rejects_negative_amount():
from pydantic import ValidationError
with pytest.raises(ValidationError):
OrderSilver(
order_id=1, customer_id=1, amount=-10.0,
currency="USD", order_date="2026-03-28",
region="us-east", source="test",
)
def test_gold_aggregation():
# Create temporary Silver Parquet, run Gold builder, verify sums
...
Integration tests
Run the full pipeline on synthetic data in a test bucket. Verify:
- Bronze file count matches source record count.
- Silver row count equals Bronze minus expected rejections.
- Gold aggregates are mathematically consistent with Silver.
Tradeoffs
| Approach | Pros | Cons |
|---|---|---|
| Delta Lake for all layers | ACID, time travel, MERGE | Heavier setup, Spark dependency for some ops |
| Plain Parquet + overwrite | Simple, any engine reads it | No transactions, no concurrent writers |
| Iceberg for Silver/Gold | Hidden partitioning, schema evolution | Catalog dependency, smaller Python ecosystem |
| Streaming Bronze, batch Silver/Gold | Low latency ingest | Complexity of two processing models |
Production checklist
- Bronze stores raw data with
_batch_id,_ingested_at,_sourcemetadata - Silver validates every record against a typed schema (Pydantic, Pandera)
- Dead letter table captures Silver validation failures with error details
- Gold tables have documented owners, refresh cadence, and SLAs
- Quality gates block layer promotion on failure
- Orchestrator retries are idempotent (rerunning does not create duplicates)
- Each layer uses appropriate file format (NDJSON → Parquet → optimized Parquet)
- Compaction runs periodically on Delta/Iceberg tables
- Monitoring tracks row counts, error rates, and latency per layer transition
One thing to remember: the medallion pattern’s real value is not the three layers themselves but the explicit contracts between them—each transition is a testable, retriable, observable unit of work.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.