Python Data Lineage Tracking — Deep Dive

Production lineage tracking requires a combination of standards-based event emission, custom instrumentation for Python-specific transforms, and a metadata platform to store and query the lineage graph. This guide covers the full implementation stack.

1) OpenLineage standard

OpenLineage defines a JSON event spec for lineage events. Every pipeline run emits three types of events:

  • START — run begins, declares input datasets.
  • COMPLETE — run succeeds, declares output datasets.
  • FAIL — run fails, includes error details.

Event structure

from datetime import datetime, timezone
import json

def create_lineage_event(
    event_type: str,
    job_namespace: str,
    job_name: str,
    run_id: str,
    inputs: list[dict],
    outputs: list[dict],
) -> dict:
    return {
        "eventType": event_type,
        "eventTime": datetime.now(timezone.utc).isoformat(),
        "run": {
            "runId": run_id,
        },
        "job": {
            "namespace": job_namespace,
            "name": job_name,
        },
        "inputs": [
            {
                "namespace": ds["namespace"],
                "name": ds["name"],
                "facets": ds.get("facets", {}),
            }
            for ds in inputs
        ],
        "outputs": [
            {
                "namespace": ds["namespace"],
                "name": ds["name"],
                "facets": ds.get("facets", {}),
            }
            for ds in outputs
        ],
        "producer": "https://github.com/our-org/data-pipelines",
        "schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
    }

Emitting events to Marquez

import requests

class LineageEmitter:
    def __init__(self, marquez_url: str = "http://marquez:5000"):
        self.url = f"{marquez_url}/api/v1/lineage"

    def emit(self, event: dict):
        response = requests.post(
            self.url,
            json=event,
            headers={"Content-Type": "application/json"},
            timeout=5,
        )
        response.raise_for_status()

2) Automatic lineage with Airflow + OpenLineage

Airflow’s OpenLineage integration emits lineage events automatically for supported operators:

# airflow.cfg or environment variable
# OPENLINEAGE_URL=http://marquez:5000
# OPENLINEAGE_NAMESPACE=production

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def orders_pipeline():

    @task
    def extract_orders():
        # OpenLineage automatically captures:
        # - Input: the source (if using a supported operator)
        # - Output: whatever the task produces
        import pandas as pd
        df = pd.read_parquet("s3://lake/raw/orders/")
        df.to_parquet("s3://lake/silver/orders/")

    @task
    def build_revenue():
        import pandas as pd
        df = pd.read_parquet("s3://lake/silver/orders/")
        revenue = df.groupby("date")["amount"].sum().reset_index()
        revenue.to_parquet("s3://lake/gold/daily_revenue/")

    extract_orders() >> build_revenue()

orders_pipeline()

For custom Python operators that OpenLineage cannot auto-detect, you can add manual lineage hints:

from airflow.lineage.entities import Table

@task(
    inlets=[Table(cluster="lake", database="raw", name="orders")],
    outlets=[Table(cluster="lake", database="silver", name="orders")],
)
def extract_orders():
    ...

3) Custom lineage decorator for Python functions

For pipelines outside Airflow (standalone scripts, Prefect flows, custom schedulers), build a decorator:

import uuid
import functools
from dataclasses import dataclass, field
from datetime import datetime, timezone

@dataclass
class Dataset:
    namespace: str
    name: str
    schema_fields: list[dict] = field(default_factory=list)

def track_lineage(
    inputs: list[Dataset],
    outputs: list[Dataset],
    job_namespace: str = "default",
    emitter: LineageEmitter | None = None,
):
    """Decorator that emits OpenLineage events around a function."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            run_id = str(uuid.uuid4())
            job_name = func.__qualname__
            _emitter = emitter or LineageEmitter()

            # Emit START
            start_event = create_lineage_event(
                "START", job_namespace, job_name, run_id,
                [{"namespace": d.namespace, "name": d.name} for d in inputs],
                [{"namespace": d.namespace, "name": d.name} for d in outputs],
            )
            _emitter.emit(start_event)

            try:
                result = func(*args, **kwargs)

                # Emit COMPLETE
                complete_event = create_lineage_event(
                    "COMPLETE", job_namespace, job_name, run_id,
                    [{"namespace": d.namespace, "name": d.name} for d in inputs],
                    [{"namespace": d.namespace, "name": d.name} for d in outputs],
                )
                _emitter.emit(complete_event)
                return result

            except Exception as e:
                # Emit FAIL
                fail_event = create_lineage_event(
                    "FAIL", job_namespace, job_name, run_id,
                    [{"namespace": d.namespace, "name": d.name} for d in inputs],
                    [{"namespace": d.namespace, "name": d.name} for d in outputs],
                )
                fail_event["run"]["facets"] = {
                    "errorMessage": {
                        "message": str(e),
                        "programmingLanguage": "python",
                    }
                }
                _emitter.emit(fail_event)
                raise

        return wrapper
    return decorator

Usage

@track_lineage(
    inputs=[Dataset("s3://lake", "raw/orders")],
    outputs=[Dataset("s3://lake", "silver/orders")],
    job_namespace="production",
)
def clean_orders():
    import polars as pl
    raw = pl.read_parquet("s3://lake/raw/orders/")
    cleaned = raw.drop_nulls("order_id").unique("order_id")
    cleaned.write_parquet("s3://lake/silver/orders/")

4) Column-level lineage

Table-level lineage shows which datasets connect. Column-level lineage shows how individual fields transform. This requires deeper instrumentation:

from dataclasses import dataclass

@dataclass
class ColumnLineage:
    output_column: str
    input_columns: list[str]
    transformation: str  # human-readable description

def document_column_lineage(
    job_name: str,
    lineage: list[ColumnLineage],
    output_path: str = "lineage/column_lineage.jsonl",
):
    """Append column-level lineage records."""
    import json
    with open(output_path, "a") as f:
        for col in lineage:
            record = {
                "job": job_name,
                "output_column": col.output_column,
                "input_columns": col.input_columns,
                "transformation": col.transformation,
                "recorded_at": datetime.now(timezone.utc).isoformat(),
            }
            f.write(json.dumps(record) + "\n")

# Document how Gold columns derive from Silver columns
document_column_lineage("build_daily_revenue", [
    ColumnLineage(
        output_column="total_revenue",
        input_columns=["silver.orders.amount"],
        transformation="SUM(amount) GROUP BY date, region",
    ),
    ColumnLineage(
        output_column="order_count",
        input_columns=["silver.orders.order_id"],
        transformation="COUNT(DISTINCT order_id) GROUP BY date, region",
    ),
    ColumnLineage(
        output_column="avg_order_value",
        input_columns=["silver.orders.amount", "silver.orders.order_id"],
        transformation="SUM(amount) / COUNT(DISTINCT order_id)",
    ),
])

SQL-based column lineage with sqllineage

from sqllineage.runner import LineageRunner

sql = """
INSERT INTO gold.daily_revenue (date, region, total_revenue)
SELECT
    order_date AS date,
    region,
    SUM(amount) AS total_revenue
FROM silver.orders
GROUP BY order_date, region
"""

runner = LineageRunner(sql)
print("Tables:", runner.table_entities())
print("Columns:", runner.get_column_lineage())
# Output: total_revenue <- silver.orders.amount

5) Lineage visualization and querying

DataHub integration

from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    DatasetLineageTypeClass,
    UpstreamClass,
    UpstreamLineageClass,
)

emitter = DatahubRestEmitter("http://datahub-gms:8080")

upstream = UpstreamLineageClass(
    upstreams=[
        UpstreamClass(
            dataset="urn:li:dataset:(urn:li:dataPlatform:s3,lake/silver/orders,PROD)",
            type=DatasetLineageTypeClass.TRANSFORMED,
        )
    ]
)

emitter.emit_mce({
    "proposedSnapshot": {
        "urn": "urn:li:dataset:(urn:li:dataPlatform:s3,lake/gold/daily_revenue,PROD)",
        "aspects": [upstream],
    }
})

Graph queries

Once lineage is stored, you can answer critical questions:

def get_upstream_chain(dataset: str, graph) -> list[str]:
    """Recursively find all upstream datasets."""
    upstreams = graph.get_upstreams(dataset)
    chain = list(upstreams)
    for upstream in upstreams:
        chain.extend(get_upstream_chain(upstream, graph))
    return chain

def get_downstream_impact(dataset: str, graph) -> list[str]:
    """Find all datasets and dashboards affected by a change."""
    downstreams = graph.get_downstreams(dataset)
    impact = list(downstreams)
    for downstream in downstreams:
        impact.extend(get_downstream_impact(downstream, graph))
    return impact

6) Lineage for debugging

When a metric looks wrong, use lineage to narrow the search:

def diagnose_metric(
    metric_dataset: str,
    metric_column: str,
    lineage_store,
) -> dict:
    """Trace a suspicious metric back to its sources."""
    column_lineage = lineage_store.get_column_lineage(
        metric_dataset, metric_column
    )
    
    diagnosis = {
        "metric": f"{metric_dataset}.{metric_column}",
        "direct_sources": [],
        "transformations": [],
    }
    
    for cl in column_lineage:
        diagnosis["direct_sources"].extend(cl.input_columns)
        diagnosis["transformations"].append(cl.transformation)
    
    # Check freshness of each source
    for source in diagnosis["direct_sources"]:
        freshness = lineage_store.get_last_updated(source)
        diagnosis[f"freshness_{source}"] = freshness
    
    return diagnosis

Production checklist

  • OpenLineage events emitted for every pipeline run (START, COMPLETE, FAIL)
  • Table-level lineage captured for all Bronze → Silver → Gold transitions
  • Column-level lineage documented for critical Gold metrics
  • Lineage events stored in a metadata platform (DataHub, Marquez, Atlan)
  • Impact analysis available before making source schema changes
  • Lineage graph accessible to analysts and data scientists (not just engineers)
  • Failed runs include error context in lineage events
  • Lineage is tested: verify that known dependencies appear in the graph
  • Stale lineage records are pruned when pipelines are decommissioned

One thing to remember: lineage’s ultimate purpose is speed—speed of debugging, speed of impact analysis, and speed of trust-building—invest in it proportionally to how much time you spend asking “where did this data come from?”

pythondata-lineagedata-engineering

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.