Python Data Quality Checks — Deep Dive

Production data quality requires more than assertions. You need composable check definitions, historical baselines, anomaly detection, dead-letter handling, and integration with orchestrators. This guide covers the full implementation stack in Python.

1) Great Expectations in production

Setting up a data context

import great_expectations as gx

context = gx.get_context(project_root_dir="/opt/pipelines/gx")

# Register a data source
datasource = context.sources.add_or_update_pandas("orders_source")
data_asset = datasource.add_dataframe_asset(name="silver_orders")

Defining expectation suites

suite = context.add_or_update_expectation_suite("silver_orders_suite")

# Schema checks
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnToExist(column="amount")
)

# Value checks
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="amount", min_value=0.01, max_value=1_000_000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeInSet(
        column="currency", value_set=["USD", "EUR", "GBP", "JPY", "CAD"]
    )
)

# Aggregate checks
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000, max_value=10_000_000
    )
)
suite.add_expectation(
    gx.expectations.ExpectColumnMeanToBeBetween(
        column="amount", min_value=10, max_value=500
    )
)

Running validations in a pipeline

import pandas as pd

def validate_silver_orders(df: pd.DataFrame) -> bool:
    batch_request = data_asset.build_batch_request(dataframe=df)
    
    checkpoint = context.add_or_update_checkpoint(
        name="silver_orders_checkpoint",
        validations=[{
            "batch_request": batch_request,
            "expectation_suite_name": "silver_orders_suite",
        }],
    )
    
    result = checkpoint.run()
    
    if not result.success:
        failed = [
            r.expectation_config.expectation_type
            for r in result.run_results.values()
            for r in r.results
            if not r.success
        ]
        raise DataQualityError(f"Failed checks: {failed}")
    
    return True

2) Pandera for schema-centric validation

Pandera shines when you want type-checked, declarative schemas that integrate with IDE autocompletion:

import pandera as pa
import pandera.polars as ppa
import polars as pl

class OrderSchema(ppa.DataFrameModel):
    order_id: int = ppa.Field(unique=True, nullable=False)
    customer_id: int = ppa.Field(ge=1, nullable=False)
    amount: float = ppa.Field(ge=0.01, le=1_000_000, nullable=False)
    currency: str = ppa.Field(isin=["USD", "EUR", "GBP", "JPY", "CAD"])
    order_date: str = ppa.Field(str_matches=r"^\d{4}-\d{2}-\d{2}$")
    region: str = ppa.Field(nullable=False)

    class Config:
        strict = True  # reject extra columns
        coerce = True  # attempt type coercion before failing

def validate_with_pandera(df: pl.DataFrame) -> pl.DataFrame:
    """Validate and return the DataFrame, or raise SchemaError."""
    return OrderSchema.validate(df)

Pandera vs Great Expectations

FeaturePanderaGreat Expectations
Schema-as-codeNative (class-based)YAML/JSON suites
Statistical checksBasicRich (distribution, profiling)
HTML reportsNoYes (Data Docs)
Polars supportFirst-classVia plugins
Learning curveLowMedium-high
Best forPipeline schemasFull observability platform

3) Anomaly detection on metrics

Static thresholds catch obvious problems but miss gradual drift. Track key metrics over time and flag anomalies:

from dataclasses import dataclass
from datetime import date
import json
from pathlib import Path
import statistics

@dataclass
class MetricPoint:
    date: date
    metric: str
    value: float

def load_metric_history(path: Path, metric: str, lookback: int = 30) -> list[float]:
    """Load recent metric values from a JSONL file."""
    points = []
    with open(path) as f:
        for line in f:
            p = json.loads(line)
            if p["metric"] == metric:
                points.append(p["value"])
    return points[-lookback:]

def check_anomaly(
    current: float,
    history: list[float],
    z_threshold: float = 3.0,
) -> tuple[bool, str]:
    """Flag if current value is z_threshold standard deviations from mean."""
    if len(history) < 7:
        return False, "insufficient history"
    
    mean = statistics.mean(history)
    stdev = statistics.stdev(history)
    
    if stdev == 0:
        return current != mean, f"constant metric changed: {mean}{current}"
    
    z_score = abs(current - mean) / stdev
    if z_score > z_threshold:
        return True, (
            f"anomaly: value={current:.2f}, mean={mean:.2f}, "
            f"stdev={stdev:.2f}, z={z_score:.2f}"
        )
    return False, "within normal range"

Metrics to track

MetricWhat it catches
Row countSource outage, partial loads
Null rate per columnSchema drift, extraction bugs
Distinct count of key columnsDeduplication failures
Sum/mean of numeric columnsData corruption, unit changes
Max timestampStale data, timezone bugs

4) Dead letter pattern

Records that fail validation should not be silently dropped. Route them to a dead letter table:

import polars as pl
from datetime import datetime, timezone

def split_valid_invalid(
    df: pl.DataFrame,
    schema_class,
) -> tuple[pl.DataFrame, pl.DataFrame]:
    """Validate each row, return (valid_df, dead_letter_df)."""
    valid_rows = []
    dead_rows = []

    for row in df.iter_rows(named=True):
        try:
            schema_class(**row)
            valid_rows.append(row)
        except Exception as e:
            dead_rows.append({
                **row,
                "_error": str(e),
                "_rejected_at": datetime.now(timezone.utc).isoformat(),
            })

    valid_df = pl.DataFrame(valid_rows) if valid_rows else df.clear()
    dead_df = pl.DataFrame(dead_rows) if dead_rows else pl.DataFrame()
    
    return valid_df, dead_df

Dead letter tables enable:

  • Root cause analysis: inspect exactly which records failed and why.
  • Reprocessing: fix the validation rule or the source data, then replay.
  • Monitoring: alert when the dead letter rate exceeds a threshold.

5) Orchestrator integration

Airflow quality gate task

from airflow.decorators import task
from airflow.exceptions import AirflowFailException

@task
def quality_gate(dataset_path: str, suite_name: str):
    import great_expectations as gx
    import pandas as pd

    context = gx.get_context()
    df = pd.read_parquet(dataset_path)
    
    result = context.run_checkpoint(
        checkpoint_name=f"{suite_name}_checkpoint",
        batch_request={"dataframe": df},
    )
    
    if not result.success:
        stats = result.statistics
        raise AirflowFailException(
            f"Quality gate failed: {stats['unsuccessful_expectations']} "
            f"of {stats['evaluated_expectations']} checks failed"
        )

Prefect flow with quality checks

from prefect import flow, task

@task
def validate_data(df, checks):
    failures = []
    for name, check_fn in checks.items():
        passed, detail = check_fn(df)
        if not passed:
            failures.append(f"{name}: {detail}")
    if failures:
        raise ValueError(f"Quality failures: {failures}")

@flow
def etl_with_quality():
    raw = extract()
    validate_data(raw, {
        "row_count": lambda df: (len(df) > 0, f"got {len(df)} rows"),
        "no_null_ids": lambda df: (df["id"].null_count() == 0, "null IDs found"),
    })
    cleaned = transform(raw)
    validate_data(cleaned, {
        "unique_ids": lambda df: (df["id"].is_unique().all(), "duplicate IDs"),
        "amount_range": lambda df: (
            (df["amount"] > 0).all(), "non-positive amounts"
        ),
    })
    load(cleaned)

6) Freshness and completeness monitoring

Quality is not just about values. Stale data is bad data:

from datetime import datetime, timezone, timedelta

def check_freshness(
    df: pl.DataFrame,
    timestamp_col: str,
    max_lag: timedelta,
) -> tuple[bool, str]:
    """Check that the most recent record is within max_lag of now."""
    latest = df[timestamp_col].max()
    if latest is None:
        return False, "no timestamps found"
    
    now = datetime.now(timezone.utc)
    lag = now - latest
    
    if lag > max_lag:
        return False, f"data is {lag} old, max allowed is {max_lag}"
    return True, f"freshness OK: lag={lag}"

def check_completeness(
    df: pl.DataFrame,
    date_col: str,
    expected_dates: list[date],
) -> tuple[bool, str]:
    """Check that all expected dates are present in the dataset."""
    actual = set(df[date_col].unique().to_list())
    missing = set(expected_dates) - actual
    
    if missing:
        return False, f"missing dates: {sorted(missing)}"
    return True, "all expected dates present"

7) Building a quality dashboard

Track check results over time by appending to a metrics table:

def record_check_result(
    check_name: str,
    dataset: str,
    passed: bool,
    detail: str,
    metrics_path: str = "s3://lake/quality_metrics/results.ndjson",
):
    result = {
        "check_name": check_name,
        "dataset": dataset,
        "passed": passed,
        "detail": detail,
        "checked_at": datetime.now(timezone.utc).isoformat(),
    }
    # Append to NDJSON file (use fsspec for S3)
    import fsspec
    with fsspec.open(metrics_path, "a") as f:
        f.write(json.dumps(result) + "\n")

Query this table to build dashboards showing:

  • Check pass/fail rates over time.
  • Most frequently failing checks (indicates systemic source issues).
  • Mean time to resolution after a failure.

Production checklist

  • Schema checks validate column existence and types at ingestion
  • Value checks enforce ranges, patterns, and allowed values
  • Aggregate checks verify row counts, null rates, and statistical distributions
  • Anomaly detection compares current metrics against rolling baselines
  • Dead letter table captures rejected records with error details
  • Quality gates halt pipeline progression on failure
  • Freshness checks ensure data is not stale
  • Completeness checks verify expected partitions/dates exist
  • Check results are logged for historical trending
  • Alerts route to the right team within minutes of failure

One thing to remember: data quality is not a one-time audit—it is a continuous pipeline stage that runs on every batch, compares against baselines, and blocks bad data before it reaches anyone downstream.

pythondata-qualitydata-engineering

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.