Python Data Quality Checks — Deep Dive
Production data quality requires more than assertions. You need composable check definitions, historical baselines, anomaly detection, dead-letter handling, and integration with orchestrators. This guide covers the full implementation stack in Python.
1) Great Expectations in production
Setting up a data context
import great_expectations as gx
context = gx.get_context(project_root_dir="/opt/pipelines/gx")
# Register a data source
datasource = context.sources.add_or_update_pandas("orders_source")
data_asset = datasource.add_dataframe_asset(name="silver_orders")
Defining expectation suites
suite = context.add_or_update_expectation_suite("silver_orders_suite")
# Schema checks
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnToExist(column="amount")
)
# Value checks
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0.01, max_value=1_000_000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="currency", value_set=["USD", "EUR", "GBP", "JPY", "CAD"]
)
)
# Aggregate checks
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000, max_value=10_000_000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnMeanToBeBetween(
column="amount", min_value=10, max_value=500
)
)
Running validations in a pipeline
import pandas as pd
def validate_silver_orders(df: pd.DataFrame) -> bool:
batch_request = data_asset.build_batch_request(dataframe=df)
checkpoint = context.add_or_update_checkpoint(
name="silver_orders_checkpoint",
validations=[{
"batch_request": batch_request,
"expectation_suite_name": "silver_orders_suite",
}],
)
result = checkpoint.run()
if not result.success:
failed = [
r.expectation_config.expectation_type
for r in result.run_results.values()
for r in r.results
if not r.success
]
raise DataQualityError(f"Failed checks: {failed}")
return True
2) Pandera for schema-centric validation
Pandera shines when you want type-checked, declarative schemas that integrate with IDE autocompletion:
import pandera as pa
import pandera.polars as ppa
import polars as pl
class OrderSchema(ppa.DataFrameModel):
order_id: int = ppa.Field(unique=True, nullable=False)
customer_id: int = ppa.Field(ge=1, nullable=False)
amount: float = ppa.Field(ge=0.01, le=1_000_000, nullable=False)
currency: str = ppa.Field(isin=["USD", "EUR", "GBP", "JPY", "CAD"])
order_date: str = ppa.Field(str_matches=r"^\d{4}-\d{2}-\d{2}$")
region: str = ppa.Field(nullable=False)
class Config:
strict = True # reject extra columns
coerce = True # attempt type coercion before failing
def validate_with_pandera(df: pl.DataFrame) -> pl.DataFrame:
"""Validate and return the DataFrame, or raise SchemaError."""
return OrderSchema.validate(df)
Pandera vs Great Expectations
| Feature | Pandera | Great Expectations |
|---|---|---|
| Schema-as-code | Native (class-based) | YAML/JSON suites |
| Statistical checks | Basic | Rich (distribution, profiling) |
| HTML reports | No | Yes (Data Docs) |
| Polars support | First-class | Via plugins |
| Learning curve | Low | Medium-high |
| Best for | Pipeline schemas | Full observability platform |
3) Anomaly detection on metrics
Static thresholds catch obvious problems but miss gradual drift. Track key metrics over time and flag anomalies:
from dataclasses import dataclass
from datetime import date
import json
from pathlib import Path
import statistics
@dataclass
class MetricPoint:
date: date
metric: str
value: float
def load_metric_history(path: Path, metric: str, lookback: int = 30) -> list[float]:
"""Load recent metric values from a JSONL file."""
points = []
with open(path) as f:
for line in f:
p = json.loads(line)
if p["metric"] == metric:
points.append(p["value"])
return points[-lookback:]
def check_anomaly(
current: float,
history: list[float],
z_threshold: float = 3.0,
) -> tuple[bool, str]:
"""Flag if current value is z_threshold standard deviations from mean."""
if len(history) < 7:
return False, "insufficient history"
mean = statistics.mean(history)
stdev = statistics.stdev(history)
if stdev == 0:
return current != mean, f"constant metric changed: {mean} → {current}"
z_score = abs(current - mean) / stdev
if z_score > z_threshold:
return True, (
f"anomaly: value={current:.2f}, mean={mean:.2f}, "
f"stdev={stdev:.2f}, z={z_score:.2f}"
)
return False, "within normal range"
Metrics to track
| Metric | What it catches |
|---|---|
| Row count | Source outage, partial loads |
| Null rate per column | Schema drift, extraction bugs |
| Distinct count of key columns | Deduplication failures |
| Sum/mean of numeric columns | Data corruption, unit changes |
| Max timestamp | Stale data, timezone bugs |
4) Dead letter pattern
Records that fail validation should not be silently dropped. Route them to a dead letter table:
import polars as pl
from datetime import datetime, timezone
def split_valid_invalid(
df: pl.DataFrame,
schema_class,
) -> tuple[pl.DataFrame, pl.DataFrame]:
"""Validate each row, return (valid_df, dead_letter_df)."""
valid_rows = []
dead_rows = []
for row in df.iter_rows(named=True):
try:
schema_class(**row)
valid_rows.append(row)
except Exception as e:
dead_rows.append({
**row,
"_error": str(e),
"_rejected_at": datetime.now(timezone.utc).isoformat(),
})
valid_df = pl.DataFrame(valid_rows) if valid_rows else df.clear()
dead_df = pl.DataFrame(dead_rows) if dead_rows else pl.DataFrame()
return valid_df, dead_df
Dead letter tables enable:
- Root cause analysis: inspect exactly which records failed and why.
- Reprocessing: fix the validation rule or the source data, then replay.
- Monitoring: alert when the dead letter rate exceeds a threshold.
5) Orchestrator integration
Airflow quality gate task
from airflow.decorators import task
from airflow.exceptions import AirflowFailException
@task
def quality_gate(dataset_path: str, suite_name: str):
import great_expectations as gx
import pandas as pd
context = gx.get_context()
df = pd.read_parquet(dataset_path)
result = context.run_checkpoint(
checkpoint_name=f"{suite_name}_checkpoint",
batch_request={"dataframe": df},
)
if not result.success:
stats = result.statistics
raise AirflowFailException(
f"Quality gate failed: {stats['unsuccessful_expectations']} "
f"of {stats['evaluated_expectations']} checks failed"
)
Prefect flow with quality checks
from prefect import flow, task
@task
def validate_data(df, checks):
failures = []
for name, check_fn in checks.items():
passed, detail = check_fn(df)
if not passed:
failures.append(f"{name}: {detail}")
if failures:
raise ValueError(f"Quality failures: {failures}")
@flow
def etl_with_quality():
raw = extract()
validate_data(raw, {
"row_count": lambda df: (len(df) > 0, f"got {len(df)} rows"),
"no_null_ids": lambda df: (df["id"].null_count() == 0, "null IDs found"),
})
cleaned = transform(raw)
validate_data(cleaned, {
"unique_ids": lambda df: (df["id"].is_unique().all(), "duplicate IDs"),
"amount_range": lambda df: (
(df["amount"] > 0).all(), "non-positive amounts"
),
})
load(cleaned)
6) Freshness and completeness monitoring
Quality is not just about values. Stale data is bad data:
from datetime import datetime, timezone, timedelta
def check_freshness(
df: pl.DataFrame,
timestamp_col: str,
max_lag: timedelta,
) -> tuple[bool, str]:
"""Check that the most recent record is within max_lag of now."""
latest = df[timestamp_col].max()
if latest is None:
return False, "no timestamps found"
now = datetime.now(timezone.utc)
lag = now - latest
if lag > max_lag:
return False, f"data is {lag} old, max allowed is {max_lag}"
return True, f"freshness OK: lag={lag}"
def check_completeness(
df: pl.DataFrame,
date_col: str,
expected_dates: list[date],
) -> tuple[bool, str]:
"""Check that all expected dates are present in the dataset."""
actual = set(df[date_col].unique().to_list())
missing = set(expected_dates) - actual
if missing:
return False, f"missing dates: {sorted(missing)}"
return True, "all expected dates present"
7) Building a quality dashboard
Track check results over time by appending to a metrics table:
def record_check_result(
check_name: str,
dataset: str,
passed: bool,
detail: str,
metrics_path: str = "s3://lake/quality_metrics/results.ndjson",
):
result = {
"check_name": check_name,
"dataset": dataset,
"passed": passed,
"detail": detail,
"checked_at": datetime.now(timezone.utc).isoformat(),
}
# Append to NDJSON file (use fsspec for S3)
import fsspec
with fsspec.open(metrics_path, "a") as f:
f.write(json.dumps(result) + "\n")
Query this table to build dashboards showing:
- Check pass/fail rates over time.
- Most frequently failing checks (indicates systemic source issues).
- Mean time to resolution after a failure.
Production checklist
- Schema checks validate column existence and types at ingestion
- Value checks enforce ranges, patterns, and allowed values
- Aggregate checks verify row counts, null rates, and statistical distributions
- Anomaly detection compares current metrics against rolling baselines
- Dead letter table captures rejected records with error details
- Quality gates halt pipeline progression on failure
- Freshness checks ensure data is not stale
- Completeness checks verify expected partitions/dates exist
- Check results are logged for historical trending
- Alerts route to the right team within minutes of failure
One thing to remember: data quality is not a one-time audit—it is a continuous pipeline stage that runs on every batch, compares against baselines, and blocks bad data before it reaches anyone downstream.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.