Python Incremental Processing — Deep Dive
Production incremental processing in Python requires careful watermark management, idempotent write patterns, late-arriving data handling, and crash recovery. This guide covers implementation patterns for each challenge.
1) Watermark management
Persistent watermark store
import json
from pathlib import Path
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
@dataclass
class WatermarkState:
pipeline_name: str
last_value: str # stored as string for flexibility
value_type: str # "timestamp" or "integer"
updated_at: str
row_count: int = 0
class WatermarkStore:
"""File-backed watermark storage with atomic updates."""
def __init__(self, base_path: str = "/opt/pipeline-state"):
self.base = Path(base_path)
self.base.mkdir(parents=True, exist_ok=True)
def get(self, pipeline_name: str) -> WatermarkState | None:
path = self.base / f"{pipeline_name}.json"
if not path.exists():
return None
return WatermarkState(**json.loads(path.read_text()))
def set(self, state: WatermarkState):
path = self.base / f"{state.pipeline_name}.json"
tmp = path.with_suffix(".tmp")
tmp.write_text(json.dumps(asdict(state), indent=2))
tmp.rename(path) # atomic on POSIX filesystems
def get_value_as_datetime(self, pipeline_name: str) -> datetime | None:
state = self.get(pipeline_name)
if state is None:
return None
return datetime.fromisoformat(state.last_value)
Using the watermark in a pipeline
import polars as pl
from datetime import datetime, timezone
def incremental_extract(
source_path: str,
pipeline_name: str,
timestamp_col: str = "updated_at",
store: WatermarkStore | None = None,
) -> pl.DataFrame:
store = store or WatermarkStore()
watermark = store.get_value_as_datetime(pipeline_name)
# Read source data
df = pl.read_parquet(source_path)
# Filter to only new/changed records
if watermark is not None:
df = df.filter(pl.col(timestamp_col) > watermark)
if len(df) == 0:
return df
# Compute new watermark (max timestamp in this batch)
new_watermark = df[timestamp_col].max()
return df, new_watermark
def incremental_pipeline(
source_path: str,
output_path: str,
pipeline_name: str = "orders_incremental",
):
store = WatermarkStore()
df, new_watermark = incremental_extract(
source_path, pipeline_name, store=store
)
if len(df) == 0:
print("No new records to process")
return
# Transform
cleaned = transform(df)
# Write (idempotent)
upsert_to_target(cleaned, output_path)
# Update watermark ONLY after successful write
store.set(WatermarkState(
pipeline_name=pipeline_name,
last_value=new_watermark.isoformat(),
value_type="timestamp",
updated_at=datetime.now(timezone.utc).isoformat(),
row_count=len(cleaned),
))
2) Idempotent write patterns
Upsert with Delta Lake
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
def upsert_incremental(
new_data: pa.Table,
table_path: str,
merge_key: str = "order_id",
):
if not DeltaTable.is_deltatable(table_path):
write_deltalake(table_path, new_data)
return
dt = DeltaTable(table_path)
(
dt.merge(
source=new_data,
predicate=f"target.{merge_key} = source.{merge_key}",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
Partition overwrite
Replace only the partitions affected by the incremental batch:
import pyarrow.parquet as pq
def partition_overwrite(
new_data: pa.Table,
base_path: str,
partition_col: str = "order_date",
):
"""Overwrite only the partitions present in new_data."""
pq.write_to_dataset(
new_data,
root_path=base_path,
partition_cols=[partition_col],
existing_data_behavior="delete_matching",
)
Delete-then-insert with SQL
from sqlalchemy import create_engine, text
def delete_insert_incremental(
engine,
table_name: str,
new_data: pl.DataFrame,
range_col: str,
range_start,
range_end,
):
"""Delete the target range, then insert fresh data."""
with engine.begin() as conn:
conn.execute(
text(f"""
DELETE FROM {table_name}
WHERE {range_col} >= :start AND {range_col} < :end
"""),
{"start": range_start, "end": range_end},
)
new_data.to_pandas().to_sql(
table_name, conn, if_exists="append", index=False
)
3) Handling late-arriving data
Records that arrive after their logical time window has been processed are called “late arrivals.” They are common in event-driven systems where network delays or offline devices cause lag.
Lookback window
Process a wider window than strictly necessary to catch late arrivals:
from datetime import timedelta
def extract_with_lookback(
source_path: str,
watermark: datetime,
lookback: timedelta = timedelta(hours=6),
timestamp_col: str = "event_time",
) -> pl.DataFrame:
"""Extract records from (watermark - lookback) to catch late arrivals."""
effective_start = watermark - lookback
df = pl.read_parquet(source_path)
return df.filter(pl.col(timestamp_col) > effective_start)
Combined with an upsert write pattern, the lookback window reprocesses recent data to incorporate late arrivals without duplicating records.
Separate late-arrival pipeline
For systems where late arrivals are rare but must be handled:
def detect_late_arrivals(
new_data: pl.DataFrame,
watermark: datetime,
event_time_col: str = "event_time",
ingest_time_col: str = "ingested_at",
) -> tuple[pl.DataFrame, pl.DataFrame]:
"""Split data into on-time and late arrivals."""
on_time = new_data.filter(pl.col(event_time_col) >= watermark)
late = new_data.filter(pl.col(event_time_col) < watermark)
if len(late) > 0:
print(f"Detected {len(late)} late arrivals")
return on_time, late
4) CDC integration with Python
Reading Debezium CDC events from Kafka
from confluent_kafka import Consumer
import json
def consume_cdc_events(
topic: str = "dbserver1.public.orders",
group_id: str = "orders-cdc-consumer",
max_records: int = 10000,
) -> list[dict]:
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
})
consumer.subscribe([topic])
events = []
while len(events) < max_records:
msg = consumer.poll(1.0)
if msg is None:
break
if msg.error():
continue
value = json.loads(msg.value())
payload = value.get("payload", value)
events.append({
"operation": payload.get("op"), # c=create, u=update, d=delete
"before": payload.get("before"),
"after": payload.get("after"),
"source_ts": payload.get("ts_ms"),
})
consumer.commit()
consumer.close()
return events
Applying CDC events to a target table
def apply_cdc_events(
events: list[dict],
target: pl.DataFrame,
key_col: str = "order_id",
) -> pl.DataFrame:
"""Apply CDC events (create, update, delete) to a target DataFrame."""
result = target.clone()
for event in events:
op = event["operation"]
if op == "c": # Create
new_row = pl.DataFrame([event["after"]])
result = pl.concat([result, new_row], how="diagonal")
elif op == "u": # Update
key_value = event["after"][key_col]
result = result.filter(pl.col(key_col) != key_value)
updated_row = pl.DataFrame([event["after"]])
result = pl.concat([result, updated_row], how="diagonal")
elif op == "d": # Delete
key_value = event["before"][key_col]
result = result.filter(pl.col(key_col) != key_value)
return result
5) Crash recovery
The commit protocol
Incremental processing has a critical ordering requirement:
- Process data → 2. Write output → 3. Update watermark
If the pipeline crashes between steps 2 and 3, the watermark is stale and the next run reprocesses the same data. This is safe only if writes are idempotent (upsert or partition overwrite). With append-only writes, you get duplicates.
Two-phase commit for non-idempotent targets
def safe_incremental_run(pipeline_name: str, store: WatermarkStore):
state = store.get(pipeline_name)
# Phase 1: Write to staging
staging_path = f"/tmp/staging/{pipeline_name}/"
data, new_watermark = extract_and_transform(state)
write_to_staging(data, staging_path)
# Phase 2: Atomic promotion + watermark update
try:
promote_staging_to_production(staging_path, production_path)
store.set(WatermarkState(
pipeline_name=pipeline_name,
last_value=new_watermark,
value_type="timestamp",
updated_at=datetime.now(timezone.utc).isoformat(),
row_count=len(data),
))
except Exception:
# Rollback: delete staging, watermark unchanged
cleanup_staging(staging_path)
raise
6) Monitoring incremental pipelines
Track these metrics to detect problems early:
def log_incremental_metrics(
pipeline_name: str,
batch_size: int,
watermark_lag: timedelta,
processing_time: float,
):
metrics = {
"pipeline": pipeline_name,
"batch_size": batch_size,
"watermark_lag_seconds": watermark_lag.total_seconds(),
"processing_seconds": processing_time,
"records_per_second": batch_size / processing_time if processing_time > 0 else 0,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Alert conditions
if watermark_lag > timedelta(hours=2):
alert(f"{pipeline_name}: watermark lag is {watermark_lag}")
if batch_size == 0:
warn(f"{pipeline_name}: empty batch, check source")
return metrics
| Metric | Alert threshold | What it means |
|---|---|---|
| Watermark lag | > 2 hours | Pipeline is falling behind |
| Batch size = 0 | 3 consecutive runs | Source may be down |
| Batch size spike | > 10x average | Backfill or source dump |
| Processing time | > SLA | Scaling or optimization needed |
Production checklist
- Watermark persisted atomically after successful write (not before)
- Write pattern is idempotent (upsert, partition overwrite, or delete-insert)
- Late-arriving data handled via lookback window or separate pipeline
- Crash recovery tested: kill pipeline mid-run, verify correct behavior on restart
- Monitoring tracks watermark lag, batch size, and processing time
- Full reprocessing path exists as fallback (reset watermark to epoch)
- CDC events include all three operations (create, update, delete) when applicable
- Pipeline handles empty batches gracefully (no errors, no unnecessary writes)
One thing to remember: the watermark update must always be the last step after a successful write—updating it prematurely is the single most common cause of silently lost data in incremental pipelines.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.