Python Change Data Capture — Deep Dive
Debezium Event Structure
Debezium emits events with a standardized envelope. Understanding this structure is essential for building reliable Python consumers:
{
"schema": { ... },
"payload": {
"before": {"id": 42, "status": "pending", "amount": 99.95},
"after": {"id": 42, "status": "shipped", "amount": 99.95},
"source": {
"version": "2.5.0",
"connector": "postgresql",
"name": "dbserver",
"ts_ms": 1711584000000,
"db": "orders_db",
"schema": "public",
"table": "orders",
"lsn": 12345678,
"txId": 9876
},
"op": "u",
"ts_ms": 1711584000050
}
}
Key fields for Python processing:
op— operation type:c(create),u(update),d(delete),r(snapshot read).before/after— row state before and after the change. Deletes haveafteras null; creates havebeforeas null.source.lsn— the log sequence number in PostgreSQL’s WAL. Use this for ordering and deduplication.source.txId— transaction ID. Events from the same transaction share this ID.
Direct PostgreSQL Logical Replication in Python
For simpler setups without Kafka, Python can consume PostgreSQL’s logical replication directly using psycopg2:
import psycopg2
from psycopg2.extras import LogicalReplicationConnection
conn = psycopg2.connect(
"dbname=orders host=localhost",
connection_factory=LogicalReplicationConnection,
)
cursor = conn.cursor()
# Create a replication slot (once)
cursor.create_replication_slot("python_cdc", output_plugin="wal2json")
# Start streaming
cursor.start_replication(slot_name="python_cdc", decode=True)
def consume(msg):
payload = json.loads(msg.payload)
for change in payload.get("change", []):
table = change["table"]
kind = change["kind"] # insert, update, delete
columns = {
col["name"]: col["value"]
for col in change.get("columnvalues", change.get("columnnames", []))
}
process_change(table, kind, columns)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
cursor.consume_stream(consume)
The wal2json plugin formats WAL entries as JSON. Alternatives include pgoutput (built into PostgreSQL 10+) and test_decoding.
Important: replication slots retain WAL segments until consumed. If your Python consumer stops for hours, WAL accumulates on disk. Monitor pg_replication_slots and set max_slot_wal_keep_size to prevent disk exhaustion.
Outbox Pattern with SQLAlchemy
The outbox pattern ensures atomic event publishing:
from sqlalchemy import Column, Integer, String, JSON, DateTime
from sqlalchemy.orm import Session
from datetime import datetime
import uuid
class OutboxEvent(Base):
__tablename__ = "outbox_events"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
aggregate_type = Column(String, nullable=False)
aggregate_id = Column(String, nullable=False)
event_type = Column(String, nullable=False)
payload = Column(JSON, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
def place_order(session: Session, order_data: dict):
order = Order(**order_data)
session.add(order)
event = OutboxEvent(
aggregate_type="Order",
aggregate_id=str(order.id),
event_type="OrderPlaced",
payload={
"order_id": order.id,
"user_id": order.user_id,
"amount": order.amount,
},
)
session.add(event)
session.commit() # Both written atomically
Debezium’s outbox event router transforms captures from the outbox table into properly structured domain events on Kafka, then optionally deletes processed outbox rows.
Polling-Based Outbox (Without Debezium)
For teams not running Kafka Connect:
import asyncio
async def outbox_publisher(session_factory, broker):
while True:
async with session_factory() as session:
events = await session.execute(
select(OutboxEvent)
.order_by(OutboxEvent.created_at)
.limit(100)
)
for event in events.scalars():
await broker.publish(
topic=f"events.{event.aggregate_type}",
key=event.aggregate_id,
value=json.dumps(event.payload),
)
await session.delete(event)
await session.commit()
await asyncio.sleep(0.5)
This polls the outbox table and publishes events. The deletion happens in the same transaction as the commit acknowledgment, preventing duplicates in the happy path. For crash scenarios, the consumer must be idempotent.
Schema Evolution in CDC Pipelines
Database schemas change. CDC pipelines must handle:
- New columns — Debezium includes new columns automatically. Python consumers must handle unknown fields gracefully (ignore them or log warnings).
- Removed columns — the
beforeimage may contain columns thatafterdoes not. Use.get()with defaults. - Type changes — a column changing from integer to string breaks consumers expecting integers. Schema Registry with compatibility checks prevents this.
A defensive consumer pattern:
def process_order_change(event):
after = event["payload"].get("after", {})
order_id = after.get("id")
if order_id is None:
logger.warning("Missing order_id in CDC event, skipping")
return
# Handle optional/new fields
priority = after.get("priority", "normal")
metadata = after.get("metadata", {})
update_downstream(order_id, priority=priority, metadata=metadata)
Exactly-Once Processing
CDC events travel through a pipeline: database → Debezium → Kafka → Python consumer → target system. Each hop can introduce duplicates:
- Debezium — snapshots can re-emit events. The
lsnfield deduplicates. - Kafka — consumer rebalances may replay messages. Track processed offsets.
- Target system — network retries may apply the same change twice.
The most robust approach: maintain a processed-event log in the target system and check before applying:
async def idempotent_apply(event, target_db):
lsn = event["payload"]["source"]["lsn"]
existing = await target_db.fetch_one(
"SELECT 1 FROM cdc_watermark WHERE lsn = :lsn", {"lsn": lsn}
)
if existing:
return # Already processed
async with target_db.transaction():
await apply_change(event)
await target_db.execute(
"INSERT INTO cdc_watermark (lsn, processed_at) VALUES (:lsn, NOW())",
{"lsn": lsn},
)
Monitoring CDC Pipelines
| Metric | Meaning | Alert When |
|---|---|---|
| Replication slot lag (bytes) | How far behind the consumer is | > 1 GB |
| Consumer group lag (messages) | Kafka consumer backlog | Growing for > 5 min |
| Event processing latency | Time from database write to target update | > 30 seconds |
| Schema registry compatibility failures | Producers sending incompatible schemas | Any occurrence |
| Outbox table row count | Unpublished events accumulating | > 1000 rows |
PostgreSQL’s pg_stat_replication view shows slot lag. Kafka’s consumer lag is available via kafka-consumer-groups.sh or Prometheus JMX exporters.
Production Checklist
- Set
wal_level = logicalin PostgreSQL (requires restart). - Create dedicated replication slots with meaningful names.
- Monitor WAL disk usage — set
max_slot_wal_keep_sizeas a safety valve. - Use Avro serialization with Schema Registry for CDC events on Kafka.
- Implement idempotent consumers for all target systems.
- Test schema evolution by adding a column in staging and verifying consumer behavior.
- Set up dead-letter handling for events that fail processing after retries.
The one thing to remember: Production CDC in Python requires attention at every stage — WAL management at the source, schema evolution in transit, and idempotent processing at the target — because a broken link anywhere in the chain silently corrupts downstream data.
See Also
- Python Faust Stream Processing How Faust lets Python programs process endless rivers of data in real time, like a factory assembly line that never stops.
- Python Kafka Consumers Understand Python Kafka consumers as organized listeners that read event streams without losing place in the line.
- Python Kafka Producers How Python programs send millions of messages into Kafka like a postal sorting machine that never sleeps.
- Python Pulsar Messaging Why Apache Pulsar is like a super-powered mailroom that handles both quick notes and huge packages for Python applications.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.