Data Contracts — Deep Dive

Data contracts are the formalization of data interfaces between producing and consuming systems. In Python data engineering, implementing contracts effectively requires combining schema validation, quality assertions, versioning, and CI/CD enforcement into a cohesive system.

1) Contract specification formats

Several specification formats compete in the data contracts space:

Pydantic models (code-first):

from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Literal

class OrderEvent(BaseModel):
    """Contract: Order service → analytics pipeline"""
    model_config = {"strict": True}

    event_id: str = Field(min_length=36, max_length=36)
    event_type: Literal["order.created", "order.updated", "order.cancelled"]
    timestamp: datetime
    order_id: str = Field(min_length=1)
    customer_id: str = Field(min_length=1)
    total_cents: int = Field(ge=0)
    currency: str = Field(pattern=r"^[A-Z]{3}$")
    items: list[dict] = Field(min_length=1)

    @field_validator("timestamp")
    @classmethod
    def not_in_future(cls, v):
        if v > datetime.utcnow():
            raise ValueError("Timestamp cannot be in the future")
        return v

YAML-based (spec-first):

# contracts/order-events.yaml
apiVersion: v2
kind: DataContract
metadata:
  name: order-events
  owner: order-service-team
  slack: "#order-data"
schema:
  type: object
  properties:
    event_id:
      type: string
      format: uuid
    event_type:
      type: string
      enum: [order.created, order.updated, order.cancelled]
    total_cents:
      type: integer
      minimum: 0
quality:
  freshness:
    max_delay_minutes: 30
  completeness:
    customer_id:
      max_null_percent: 0
sla:
  resolution_time: 4h
  escalation: page

JSON Schema: Portable across languages, supported by tools like jsonschema in Python. Good for cross-team contracts where not everyone uses Python.

2) Enforcement at pipeline boundaries

The critical enforcement point is where data enters a pipeline stage:

from pydantic import ValidationError
from typing import Generator
import json
import logging

logger = logging.getLogger(__name__)

def validated_events(raw_messages: list[bytes]) -> Generator[OrderEvent, None, None]:
    """Pipeline gate: only valid events pass through."""
    valid_count = 0
    error_count = 0

    for msg in raw_messages:
        try:
            data = json.loads(msg)
            event = OrderEvent.model_validate(data)
            valid_count += 1
            yield event
        except (json.JSONDecodeError, ValidationError) as e:
            error_count += 1
            logger.warning("Contract violation", extra={
                "error": str(e),
                "raw_message": msg[:500],
            })
            # Send to dead-letter queue
            dead_letter_queue.send(msg, error=str(e))

    # Emit metrics
    metrics.gauge("pipeline.events.valid", valid_count)
    metrics.gauge("pipeline.events.rejected", error_count)

    rejection_rate = error_count / max(valid_count + error_count, 1)
    if rejection_rate > 0.05:
        alerts.fire("High contract violation rate", rate=rejection_rate)

Key decisions at the enforcement point:

  • Reject or quarantine? Dead-letter queues let you investigate violations without blocking the pipeline.
  • Threshold alerting: A single bad record might be noise. A 5% violation rate indicates a systemic issue.
  • Metrics: Track violation rates by contract, by field, and by producing service.

3) Great Expectations for quality contracts

While Pydantic handles structural validation, Great Expectations handles statistical quality assertions on datasets:

import great_expectations as gx

context = gx.get_context()

# Define expectations (the quality portion of the contract)
suite = context.add_expectation_suite("order_events_quality")

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="total_cents", min_value=0, max_value=100_000_00
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnDistinctValuesToBeInSet(
        column="currency", value_set=["USD", "EUR", "GBP"]
    )
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(min_value=1000, max_value=1_000_000)
)

Great Expectations generates data documentation (“Data Docs”) automatically, which serves as living contract documentation that updates with each validation run.

4) Contract versioning

Contracts evolve. The versioning strategy determines how changes propagate:

Semantic versioning:

class OrderEventV1(BaseModel):
    """DEPRECATED: Use V2. Sunset date: 2025-06-01"""
    order_id: str
    total: float  # dollars as float

class OrderEventV2(BaseModel):
    """Current version"""
    order_id: str
    total_cents: int  # integer cents (breaking change from V1)
    currency: str = "USD"  # new field

Compatibility rules:

  • Backward compatible: New consumers can read old data. Achieved by making new fields optional with defaults.
  • Forward compatible: Old consumers can read new data. Achieved by ignoring unknown fields.
  • Full compatible: Both directions work. The safest but most constraining.

Migration during transition:

def normalize_order_event(raw: dict) -> OrderEventV2:
    """Accept V1 or V2 format, always return V2."""
    if "total" in raw and "total_cents" not in raw:
        # V1 format: convert float dollars to integer cents
        raw["total_cents"] = int(round(raw.pop("total") * 100))
        raw.setdefault("currency", "USD")
    return OrderEventV2.model_validate(raw)

5) CI/CD integration

Contracts should be tested in CI:

# tests/test_contracts.py
import pytest
from contracts.order_events import OrderEvent

def test_valid_order_event():
    event = OrderEvent.model_validate({
        "event_id": "a" * 36,
        "event_type": "order.created",
        "timestamp": "2025-01-15T10:30:00",
        "order_id": "ORD-001",
        "customer_id": "CUST-001",
        "total_cents": 2999,
        "currency": "USD",
        "items": [{"sku": "ITEM-1", "qty": 1}],
    })
    assert event.total_cents == 2999

def test_rejects_empty_items():
    with pytest.raises(Exception):
        OrderEvent.model_validate({
            "event_id": "a" * 36,
            "event_type": "order.created",
            "timestamp": "2025-01-15T10:30:00",
            "order_id": "ORD-001",
            "customer_id": "CUST-001",
            "total_cents": 2999,
            "currency": "USD",
            "items": [],
        })

def test_rejects_future_timestamp():
    with pytest.raises(Exception):
        OrderEvent.model_validate({
            "event_id": "a" * 36,
            "event_type": "order.created",
            "timestamp": "2030-01-15T10:30:00",
            "order_id": "ORD-001",
            "customer_id": "CUST-001",
            "total_cents": 2999,
            "currency": "USD",
            "items": [{"sku": "ITEM-1"}],
        })

Contract-breaking detection in CI:

# scripts/check_contract_compat.py
from pydantic import TypeAdapter
import json

def schemas_are_backward_compatible(old_schema: dict, new_schema: dict) -> bool:
    """Check that all required fields in old are present in new with same types."""
    old_required = set(old_schema.get("required", []))
    new_props = new_schema.get("properties", {})

    for field in old_required:
        if field not in new_props:
            return False
        # Check type compatibility
        if old_schema["properties"][field]["type"] != new_props[field]["type"]:
            return False
    return True

6) Monitoring and observability

Contracts in production need continuous monitoring:

from prometheus_client import Counter, Histogram

contract_validations = Counter(
    "data_contract_validations_total",
    "Total contract validation attempts",
    ["contract", "result"],  # result: pass, fail
)

contract_violation_fields = Counter(
    "data_contract_field_violations_total",
    "Field-level contract violations",
    ["contract", "field", "violation_type"],
)

def monitor_contract(contract_name: str, data: dict, model_class):
    try:
        result = model_class.model_validate(data)
        contract_validations.labels(contract=contract_name, result="pass").inc()
        return result
    except ValidationError as e:
        contract_validations.labels(contract=contract_name, result="fail").inc()
        for error in e.errors():
            field = ".".join(str(loc) for loc in error["loc"])
            contract_violation_fields.labels(
                contract=contract_name,
                field=field,
                violation_type=error["type"],
            ).inc()
        raise

Dashboards built from these metrics show: violation rates over time, most frequently violated fields, and which producing services cause the most issues.

7) Organizational workflow

A mature data contract workflow:

  1. Propose: Producer team drafts a contract change as a pull request.
  2. Review: Consumer teams review the change, checking compatibility.
  3. Test: CI runs compatibility checks and sample data validation.
  4. Deploy: Contract update ships with the producing service.
  5. Monitor: Violation metrics verify the change works in production.
  6. Deprecate: Old contract versions get sunset dates and migration guides.

This mirrors API governance but applied to data interfaces.

One thing to remember: Data contracts combine structural schemas, quality assertions, ownership metadata, and CI/CD enforcement into a single system that guarantees data reliability between teams — turning informal assumptions into testable, monitored agreements.

pythondata-contractsdata-engineeringdata-quality

See Also