Python Schema Registry — Deep Dive
A schema registry in production involves registration workflows, compatibility enforcement, serialization integration, and CI/CD hooks. This guide covers implementation patterns with Confluent Schema Registry and lightweight alternatives.
1) Confluent Schema Registry with Python
Client setup
from confluent_kafka.schema_registry import SchemaRegistryClient
registry = SchemaRegistryClient({
"url": "http://schema-registry:8081",
# For Confluent Cloud:
# "basic.auth.user.info": f"{api_key}:{api_secret}",
})
Registering an Avro schema
from confluent_kafka.schema_registry import Schema
order_schema_v1 = Schema(
schema_str="""{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "customer_id", "type": "long"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "order_date", "type": "string"}
]
}""",
schema_type="AVRO",
)
# Register under subject "orders-value"
schema_id = registry.register_schema(
subject_name="orders-value",
schema=order_schema_v1,
)
print(f"Registered schema ID: {schema_id}")
Evolving the schema
Add an optional field with a default value (backward compatible):
order_schema_v2 = Schema(
schema_str="""{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "long"},
{"name": "customer_id", "type": "long"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string"},
{"name": "order_date", "type": "string"},
{"name": "region", "type": ["null", "string"], "default": null}
]
}""",
schema_type="AVRO",
)
# This succeeds under BACKWARD compatibility
schema_id_v2 = registry.register_schema("orders-value", order_schema_v2)
Compatibility checking before registration
def safe_register(subject: str, schema: Schema) -> int:
"""Check compatibility before registering."""
is_compatible = registry.test_compatibility(
subject_name=subject,
schema=schema,
)
if not is_compatible:
raise IncompatibleSchemaError(
f"Schema for {subject} is not compatible with the latest version"
)
return registry.register_schema(subject, schema)
2) Producing and consuming with schema-aware serialization
Avro producer
from confluent_kafka import Producer
from confluent_kafka.serialization import (
SerializationContext,
MessageField,
)
from confluent_kafka.schema_registry.avro import AvroSerializer
serializer = AvroSerializer(
schema_registry_client=registry,
schema_str=order_schema_v2.schema_str,
)
producer = Producer({"bootstrap.servers": "kafka:9092"})
def produce_order(order: dict):
producer.produce(
topic="orders",
value=serializer(
order,
SerializationContext("orders", MessageField.VALUE),
),
on_delivery=lambda err, msg: print(err or f"Delivered to {msg.offset()}"),
)
producer.flush()
produce_order({
"order_id": 42,
"customer_id": 101,
"amount": 99.50,
"currency": "USD",
"order_date": "2026-03-28",
"region": "us-east",
})
Avro consumer
from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer
deserializer = AvroDeserializer(
schema_registry_client=registry,
)
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": "order-processor",
"auto.offset.reset": "earliest",
})
consumer.subscribe(["orders"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Error: {msg.error()}")
continue
order = deserializer(
msg.value(),
SerializationContext("orders", MessageField.VALUE),
)
print(f"Received order: {order}")
The deserializer reads the schema ID from the message header, fetches the writer schema from the registry, and resolves differences with the reader schema using Avro’s resolution rules.
3) JSON Schema registry pattern
For REST APIs and file-based pipelines, JSON Schema is often more practical than Avro:
import json
from pathlib import Path
import hashlib
class SimpleJsonSchemaRegistry:
"""Lightweight file-based schema registry for non-Kafka use cases."""
def __init__(self, base_path: str):
self.base = Path(base_path)
self.base.mkdir(parents=True, exist_ok=True)
def register(self, subject: str, schema: dict) -> int:
subject_dir = self.base / subject
subject_dir.mkdir(exist_ok=True)
# Determine next version
existing = sorted(subject_dir.glob("v*.json"))
next_version = len(existing) + 1
# Check if schema is identical to latest
if existing:
latest = json.loads(existing[-1].read_text())
if latest == schema:
return len(existing) # no change
# Compatibility check
if existing and next_version > 1:
latest = json.loads(existing[-1].read_text())
self._check_backward_compatible(latest, schema, subject)
# Write new version
path = subject_dir / f"v{next_version}.json"
path.write_text(json.dumps(schema, indent=2))
return next_version
def get_latest(self, subject: str) -> tuple[int, dict]:
subject_dir = self.base / subject
versions = sorted(subject_dir.glob("v*.json"))
if not versions:
raise KeyError(f"No schemas for subject {subject}")
version = len(versions)
schema = json.loads(versions[-1].read_text())
return version, schema
def _check_backward_compatible(
self, old: dict, new: dict, subject: str
):
"""Basic backward compatibility: new schema must accept all old required fields."""
old_required = set(old.get("required", []))
new_required = set(new.get("required", []))
new_fields = set(new.get("properties", {}).keys())
# New schema cannot require fields that old schema didn't have
added_required = new_required - old_required
if added_required:
raise ValueError(
f"Backward incompatible: new required fields {added_required} "
f"for subject {subject}"
)
# Old required fields must still exist in new schema
removed = old_required - new_fields
if removed:
raise ValueError(
f"Backward incompatible: removed fields {removed} "
f"that were required in subject {subject}"
)
Usage
reg = SimpleJsonSchemaRegistry("/opt/schemas")
v1 = reg.register("orders", {
"type": "object",
"required": ["order_id", "amount"],
"properties": {
"order_id": {"type": "integer"},
"amount": {"type": "number"},
},
})
# Adding an optional field is backward compatible
v2 = reg.register("orders", {
"type": "object",
"required": ["order_id", "amount"],
"properties": {
"order_id": {"type": "integer"},
"amount": {"type": "number"},
"region": {"type": "string"},
},
})
# Adding a new REQUIRED field fails backward compatibility
try:
reg.register("orders", {
"type": "object",
"required": ["order_id", "amount", "region"],
"properties": {
"order_id": {"type": "integer"},
"amount": {"type": "number"},
"region": {"type": "string"},
},
})
except ValueError as e:
print(f"Rejected: {e}")
4) CI/CD integration
Schema validation in pull requests
Add a CI step that validates schema changes against the registry before merging:
# ci/check_schema_compat.py
import sys
import json
from confluent_kafka.schema_registry import SchemaRegistryClient, Schema
registry = SchemaRegistryClient({"url": sys.argv[1]})
subject = sys.argv[2]
schema_file = sys.argv[3]
with open(schema_file) as f:
new_schema = Schema(f.read(), "AVRO")
compatible = registry.test_compatibility(subject, new_schema)
if not compatible:
print(f"FAIL: Schema in {schema_file} is not compatible with {subject}")
sys.exit(1)
print(f"OK: Schema is compatible with {subject}")
# .github/workflows/schema-check.yml
- name: Check schema compatibility
run: |
python ci/check_schema_compat.py \
$SCHEMA_REGISTRY_URL \
orders-value \
schemas/orders.avsc
Automated registration on merge
After a PR merges, register the new schema version automatically:
- name: Register schema
if: github.ref == 'refs/heads/main'
run: |
python ci/register_schema.py \
$SCHEMA_REGISTRY_URL \
orders-value \
schemas/orders.avsc
5) Schema evolution strategies
| Change | Backward | Forward | Full |
|---|---|---|---|
| Add optional field with default | ✅ | ✅ | ✅ |
| Remove optional field with default | ✅ | ✅ | ✅ |
| Add required field (no default) | ❌ | ✅ | ❌ |
| Remove required field | ✅ | ❌ | ❌ |
| Rename field | ❌ | ❌ | ❌ |
| Change field type | ❌ | ❌ | ❌ |
Renaming fields is always a breaking change. The safe path: add the new field, backfill it, migrate consumers, then deprecate the old field.
Changing types requires creating a new subject or using a union type in Avro (e.g., ["int", "long"] for widening).
6) Monitoring and governance
Track registry health with:
- Schema count per subject — unexpectedly high counts suggest frequent breaking changes.
- Compatibility failures — track how often registrations are rejected.
- Consumer lag after schema changes — a spike in consumer lag after a new version may indicate deserialization issues.
- Schema usage audit — which services use which schema versions. Helps decide when to deprecate old versions.
def audit_schema_usage(registry: SchemaRegistryClient):
subjects = registry.get_subjects()
for subject in subjects:
versions = registry.get_versions(subject)
latest = registry.get_latest_version(subject)
print(
f"{subject}: {len(versions)} versions, "
f"latest={latest.version}, id={latest.schema_id}"
)
One thing to remember: a schema registry’s value is not just storing schemas—it is the compatibility enforcement that prevents a producer’s “harmless” change from silently breaking every consumer downstream.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.