Schema Evolution — Deep Dive

Schema evolution is a cross-cutting concern that spans databases, APIs, serialization formats, caches, and configuration systems. In production Python applications, getting it wrong causes downtime, data loss, or silent corruption. This deep dive covers the technical patterns, tooling, and tradeoffs involved.

1) Database schema evolution with Alembic

Alembic is the standard migration tool for SQLAlchemy. Each migration is a Python file with upgrade() and downgrade() functions:

"""Add email_verified column to users"""
from alembic import op
import sqlalchemy as sa

revision = "a1b2c3d4"
down_revision = "z9y8x7w6"

def upgrade():
    op.add_column("users", sa.Column("email_verified", sa.Boolean(), server_default="false", nullable=False))

def downgrade():
    op.drop_column("users", "email_verified")

Critical practices for production:

Separate schema and data migrations. Schema migrations alter table structure. Data migrations backfill or transform values. Mixing them creates long-running transactions that lock tables.

# Schema migration: add column with default
def upgrade():
    op.add_column("orders", sa.Column("status_v2", sa.String(50), server_default="pending"))

# Separate data migration: backfill
def upgrade():
    conn = op.get_bind()
    conn.execute(sa.text("""
        UPDATE orders SET status_v2 = CASE
            WHEN status = 0 THEN 'pending'
            WHEN status = 1 THEN 'confirmed'
            WHEN status = 2 THEN 'shipped'
            ELSE 'unknown'
        END
        WHERE status_v2 = 'pending'
    """))

Use server_default not default. SQLAlchemy’s default only applies in Python; server_default applies at the database level, which matters for existing rows and for other applications writing to the same database.

Batch backfills for large tables. Updating 10 million rows in a single transaction locks the table. Instead, process in chunks:

def upgrade():
    conn = op.get_bind()
    while True:
        result = conn.execute(sa.text("""
            UPDATE orders SET status_v2 = 'pending'
            WHERE status_v2 IS NULL
            LIMIT 10000
        """))
        if result.rowcount == 0:
            break

2) Zero-downtime migration patterns

The expand-migrate-contract pattern in detail:

Phase 1 — Expand (deploy new code that writes both old and new):

# Application code during expand phase
class OrderService:
    def create_order(self, data):
        order = Order(
            status=0,                    # old column (integer)
            status_v2=STATUS_MAP[0],     # new column (string)
            **data,
        )
        db.session.add(order)

Phase 2 — Migrate (backfill old rows): Run the batch backfill migration. During this phase, both columns exist and new code writes to both. Reads can start using the new column.

Phase 3 — Contract (remove old column):

# After confirming all reads use status_v2
def upgrade():
    op.drop_column("orders", "status")
    op.alter_column("orders", "status_v2", new_column_name="status")

For rename operations specifically, PostgreSQL supports ALTER COLUMN ... RENAME without rewriting the table, making it fast even on large tables.

3) Django migration specifics

Django’s migration system auto-generates migrations from model changes. Key differences from Alembic:

  • Migrations are auto-generated but should always be reviewed.
  • RunPython operations handle data migrations.
  • SeparateDatabaseAndState allows you to tell Django a column exists before actually creating it (useful for gradual rollouts).
from django.db import migrations

def backfill_status(apps, schema_editor):
    Order = apps.get_model("orders", "Order")
    Order.objects.filter(status_v2="").update(
        status_v2=models.Case(
            models.When(status=0, then=models.Value("pending")),
            models.When(status=1, then=models.Value("confirmed")),
            default=models.Value("unknown"),
        )
    )

class Migration(migrations.Migration):
    operations = [
        migrations.RunPython(backfill_status, migrations.RunPython.noop),
    ]

4) API versioning strategies

URL-based versioning:

# FastAPI
from fastapi import APIRouter

v1_router = APIRouter(prefix="/api/v1")
v2_router = APIRouter(prefix="/api/v2")

@v1_router.get("/users/{user_id}")
def get_user_v1(user_id: int) -> UserResponseV1:
    user = get_user(user_id)
    return UserResponseV1.from_orm(user)

@v2_router.get("/users/{user_id}")
def get_user_v2(user_id: int) -> UserResponseV2:
    user = get_user(user_id)
    return UserResponseV2.from_orm(user)

Header-based versioning:

from fastapi import Header

@app.get("/users/{user_id}")
def get_user(user_id: int, api_version: str = Header("2024-01-01")):
    user = fetch_user(user_id)
    if api_version < "2024-06-01":
        return serialize_v1(user)
    return serialize_v2(user)

Pydantic model evolution:

from pydantic import BaseModel

class UserBase(BaseModel):
    id: int
    email: str

class UserResponseV1(UserBase):
    name: str  # single name field

class UserResponseV2(UserBase):
    first_name: str
    last_name: str
    name: str  # kept for backward compat, computed from first+last

5) Event and message schema evolution

In event-driven architectures, schema evolution is harder because messages are immutable once published and consumers deploy on different schedules.

Avro with schema registry:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

registry = SchemaRegistryClient({"url": "http://schema-registry:8081"})

# Schema registry enforces BACKWARD compatibility by default:
# - New schemas can add optional fields
# - New schemas can remove fields with defaults
# - New schemas cannot change field types

Protobuf evolution rules:

  • Never reuse field numbers.
  • New fields must be optional or have defaults.
  • Deprecated fields should be reserved, not removed.

Application-level versioned events:

import attrs
from cattrs import Converter

@attrs.define
class OrderCreatedV1:
    order_id: str
    total: float

@attrs.define
class OrderCreatedV2:
    order_id: str
    total_cents: int  # changed to integer cents
    currency: str = "USD"

def evolve_v1_to_v2(event: OrderCreatedV1) -> OrderCreatedV2:
    return OrderCreatedV2(
        order_id=event.order_id,
        total_cents=int(event.total * 100),
        currency="USD",
    )

6) Cache schema evolution

Caches are often overlooked in schema evolution. When you change the structure of cached objects, stale cache entries contain the old format. Strategies:

Version-prefixed keys:

CACHE_SCHEMA_VERSION = "v3"

def cache_key(entity_type: str, entity_id: str) -> str:
    return f"{entity_type}:{CACHE_SCHEMA_VERSION}:{entity_id}"

Incrementing the version effectively invalidates all old entries. Simple but wasteful if the cache is large.

Tolerant deserialization:

def load_from_cache(key: str) -> User | None:
    raw = cache.get(key)
    if raw is None:
        return None
    try:
        return UserSchema.load(raw)
    except ValidationError:
        cache.delete(key)  # evict stale format
        return None

7) Configuration file evolution

When applications read TOML, YAML, or JSON config files, the schema of those files evolves too:

from pydantic import BaseModel, Field

class DatabaseConfigV1(BaseModel):
    url: str

class DatabaseConfigV2(BaseModel):
    host: str
    port: int = 5432
    name: str
    url: str | None = None  # deprecated, kept for backward compat

    @classmethod
    def from_v1(cls, v1: DatabaseConfigV1):
        from urllib.parse import urlparse
        parsed = urlparse(v1.url)
        return cls(host=parsed.hostname, port=parsed.port or 5432, name=parsed.path.lstrip("/"))

8) Testing schema evolution

Every migration should be tested:

def test_migration_a1b2c3d4(alembic_runner):
    # Apply all migrations up to the one before ours
    alembic_runner.migrate_up_to("z9y8x7w6")

    # Insert test data in old format
    alembic_runner.insert("users", {"id": 1, "name": "Alice"})

    # Apply our migration
    alembic_runner.migrate_up_one()

    # Verify new column exists with correct default
    row = alembic_runner.select_one("users", id=1)
    assert row["email_verified"] is False

    # Verify downgrade
    alembic_runner.migrate_down_one()

One thing to remember: Schema evolution is a multi-phase discipline — expand (add new alongside old), migrate (transform existing data), contract (remove old) — applied to every layer where data outlives the code that created it: databases, APIs, caches, events, and configuration.

pythonschema-evolutiondata-modelingmigration

See Also