Data Contracts — Deep Dive
Data contracts are the formalization of data interfaces between producing and consuming systems. In Python data engineering, implementing contracts effectively requires combining schema validation, quality assertions, versioning, and CI/CD enforcement into a cohesive system.
1) Contract specification formats
Several specification formats compete in the data contracts space:
Pydantic models (code-first):
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Literal
class OrderEvent(BaseModel):
"""Contract: Order service → analytics pipeline"""
model_config = {"strict": True}
event_id: str = Field(min_length=36, max_length=36)
event_type: Literal["order.created", "order.updated", "order.cancelled"]
timestamp: datetime
order_id: str = Field(min_length=1)
customer_id: str = Field(min_length=1)
total_cents: int = Field(ge=0)
currency: str = Field(pattern=r"^[A-Z]{3}$")
items: list[dict] = Field(min_length=1)
@field_validator("timestamp")
@classmethod
def not_in_future(cls, v):
if v > datetime.utcnow():
raise ValueError("Timestamp cannot be in the future")
return v
YAML-based (spec-first):
# contracts/order-events.yaml
apiVersion: v2
kind: DataContract
metadata:
name: order-events
owner: order-service-team
slack: "#order-data"
schema:
type: object
properties:
event_id:
type: string
format: uuid
event_type:
type: string
enum: [order.created, order.updated, order.cancelled]
total_cents:
type: integer
minimum: 0
quality:
freshness:
max_delay_minutes: 30
completeness:
customer_id:
max_null_percent: 0
sla:
resolution_time: 4h
escalation: page
JSON Schema: Portable across languages, supported by tools like jsonschema in Python. Good for cross-team contracts where not everyone uses Python.
2) Enforcement at pipeline boundaries
The critical enforcement point is where data enters a pipeline stage:
from pydantic import ValidationError
from typing import Generator
import json
import logging
logger = logging.getLogger(__name__)
def validated_events(raw_messages: list[bytes]) -> Generator[OrderEvent, None, None]:
"""Pipeline gate: only valid events pass through."""
valid_count = 0
error_count = 0
for msg in raw_messages:
try:
data = json.loads(msg)
event = OrderEvent.model_validate(data)
valid_count += 1
yield event
except (json.JSONDecodeError, ValidationError) as e:
error_count += 1
logger.warning("Contract violation", extra={
"error": str(e),
"raw_message": msg[:500],
})
# Send to dead-letter queue
dead_letter_queue.send(msg, error=str(e))
# Emit metrics
metrics.gauge("pipeline.events.valid", valid_count)
metrics.gauge("pipeline.events.rejected", error_count)
rejection_rate = error_count / max(valid_count + error_count, 1)
if rejection_rate > 0.05:
alerts.fire("High contract violation rate", rate=rejection_rate)
Key decisions at the enforcement point:
- Reject or quarantine? Dead-letter queues let you investigate violations without blocking the pipeline.
- Threshold alerting: A single bad record might be noise. A 5% violation rate indicates a systemic issue.
- Metrics: Track violation rates by contract, by field, and by producing service.
3) Great Expectations for quality contracts
While Pydantic handles structural validation, Great Expectations handles statistical quality assertions on datasets:
import great_expectations as gx
context = gx.get_context()
# Define expectations (the quality portion of the contract)
suite = context.add_expectation_suite("order_events_quality")
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_cents", min_value=0, max_value=100_000_00
)
)
suite.add_expectation(
gx.expectations.ExpectColumnDistinctValuesToBeInSet(
column="currency", value_set=["USD", "EUR", "GBP"]
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(min_value=1000, max_value=1_000_000)
)
Great Expectations generates data documentation (“Data Docs”) automatically, which serves as living contract documentation that updates with each validation run.
4) Contract versioning
Contracts evolve. The versioning strategy determines how changes propagate:
Semantic versioning:
class OrderEventV1(BaseModel):
"""DEPRECATED: Use V2. Sunset date: 2025-06-01"""
order_id: str
total: float # dollars as float
class OrderEventV2(BaseModel):
"""Current version"""
order_id: str
total_cents: int # integer cents (breaking change from V1)
currency: str = "USD" # new field
Compatibility rules:
- Backward compatible: New consumers can read old data. Achieved by making new fields optional with defaults.
- Forward compatible: Old consumers can read new data. Achieved by ignoring unknown fields.
- Full compatible: Both directions work. The safest but most constraining.
Migration during transition:
def normalize_order_event(raw: dict) -> OrderEventV2:
"""Accept V1 or V2 format, always return V2."""
if "total" in raw and "total_cents" not in raw:
# V1 format: convert float dollars to integer cents
raw["total_cents"] = int(round(raw.pop("total") * 100))
raw.setdefault("currency", "USD")
return OrderEventV2.model_validate(raw)
5) CI/CD integration
Contracts should be tested in CI:
# tests/test_contracts.py
import pytest
from contracts.order_events import OrderEvent
def test_valid_order_event():
event = OrderEvent.model_validate({
"event_id": "a" * 36,
"event_type": "order.created",
"timestamp": "2025-01-15T10:30:00",
"order_id": "ORD-001",
"customer_id": "CUST-001",
"total_cents": 2999,
"currency": "USD",
"items": [{"sku": "ITEM-1", "qty": 1}],
})
assert event.total_cents == 2999
def test_rejects_empty_items():
with pytest.raises(Exception):
OrderEvent.model_validate({
"event_id": "a" * 36,
"event_type": "order.created",
"timestamp": "2025-01-15T10:30:00",
"order_id": "ORD-001",
"customer_id": "CUST-001",
"total_cents": 2999,
"currency": "USD",
"items": [],
})
def test_rejects_future_timestamp():
with pytest.raises(Exception):
OrderEvent.model_validate({
"event_id": "a" * 36,
"event_type": "order.created",
"timestamp": "2030-01-15T10:30:00",
"order_id": "ORD-001",
"customer_id": "CUST-001",
"total_cents": 2999,
"currency": "USD",
"items": [{"sku": "ITEM-1"}],
})
Contract-breaking detection in CI:
# scripts/check_contract_compat.py
from pydantic import TypeAdapter
import json
def schemas_are_backward_compatible(old_schema: dict, new_schema: dict) -> bool:
"""Check that all required fields in old are present in new with same types."""
old_required = set(old_schema.get("required", []))
new_props = new_schema.get("properties", {})
for field in old_required:
if field not in new_props:
return False
# Check type compatibility
if old_schema["properties"][field]["type"] != new_props[field]["type"]:
return False
return True
6) Monitoring and observability
Contracts in production need continuous monitoring:
from prometheus_client import Counter, Histogram
contract_validations = Counter(
"data_contract_validations_total",
"Total contract validation attempts",
["contract", "result"], # result: pass, fail
)
contract_violation_fields = Counter(
"data_contract_field_violations_total",
"Field-level contract violations",
["contract", "field", "violation_type"],
)
def monitor_contract(contract_name: str, data: dict, model_class):
try:
result = model_class.model_validate(data)
contract_validations.labels(contract=contract_name, result="pass").inc()
return result
except ValidationError as e:
contract_validations.labels(contract=contract_name, result="fail").inc()
for error in e.errors():
field = ".".join(str(loc) for loc in error["loc"])
contract_violation_fields.labels(
contract=contract_name,
field=field,
violation_type=error["type"],
).inc()
raise
Dashboards built from these metrics show: violation rates over time, most frequently violated fields, and which producing services cause the most issues.
7) Organizational workflow
A mature data contract workflow:
- Propose: Producer team drafts a contract change as a pull request.
- Review: Consumer teams review the change, checking compatibility.
- Test: CI runs compatibility checks and sample data validation.
- Deploy: Contract update ships with the producing service.
- Monitor: Violation metrics verify the change works in production.
- Deprecate: Old contract versions get sunset dates and migration guides.
This mirrors API governance but applied to data interfaces.
One thing to remember: Data contracts combine structural schemas, quality assertions, ownership metadata, and CI/CD enforcement into a single system that guarantees data reliability between teams — turning informal assumptions into testable, monitored agreements.
See Also
- Python Airflow Anti Patterns How Airflow Anti Patterns helps Python teams reduce surprises and keep systems predictable.
- Python Airflow Automation Playbook How Airflow Automation Playbook helps Python teams reduce surprises and keep systems predictable.
- Python Airflow Best Practices How Airflow Best Practices helps Python teams reduce surprises and keep systems predictable.
- Python Airflow Caching Patterns How Airflow Caching Patterns helps Python teams reduce surprises and keep systems predictable.
- Python Airflow Configuration Management How Airflow Configuration Management helps Python teams reduce surprises and keep systems predictable.