Python Data Lineage Tracking — Deep Dive
Production lineage tracking requires a combination of standards-based event emission, custom instrumentation for Python-specific transforms, and a metadata platform to store and query the lineage graph. This guide covers the full implementation stack.
1) OpenLineage standard
OpenLineage defines a JSON event spec for lineage events. Every pipeline run emits three types of events:
- START — run begins, declares input datasets.
- COMPLETE — run succeeds, declares output datasets.
- FAIL — run fails, includes error details.
Event structure
from datetime import datetime, timezone
import json
def create_lineage_event(
event_type: str,
job_namespace: str,
job_name: str,
run_id: str,
inputs: list[dict],
outputs: list[dict],
) -> dict:
return {
"eventType": event_type,
"eventTime": datetime.now(timezone.utc).isoformat(),
"run": {
"runId": run_id,
},
"job": {
"namespace": job_namespace,
"name": job_name,
},
"inputs": [
{
"namespace": ds["namespace"],
"name": ds["name"],
"facets": ds.get("facets", {}),
}
for ds in inputs
],
"outputs": [
{
"namespace": ds["namespace"],
"name": ds["name"],
"facets": ds.get("facets", {}),
}
for ds in outputs
],
"producer": "https://github.com/our-org/data-pipelines",
"schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunEvent",
}
Emitting events to Marquez
import requests
class LineageEmitter:
def __init__(self, marquez_url: str = "http://marquez:5000"):
self.url = f"{marquez_url}/api/v1/lineage"
def emit(self, event: dict):
response = requests.post(
self.url,
json=event,
headers={"Content-Type": "application/json"},
timeout=5,
)
response.raise_for_status()
2) Automatic lineage with Airflow + OpenLineage
Airflow’s OpenLineage integration emits lineage events automatically for supported operators:
# airflow.cfg or environment variable
# OPENLINEAGE_URL=http://marquez:5000
# OPENLINEAGE_NAMESPACE=production
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule="@daily", start_date=datetime(2026, 1, 1))
def orders_pipeline():
@task
def extract_orders():
# OpenLineage automatically captures:
# - Input: the source (if using a supported operator)
# - Output: whatever the task produces
import pandas as pd
df = pd.read_parquet("s3://lake/raw/orders/")
df.to_parquet("s3://lake/silver/orders/")
@task
def build_revenue():
import pandas as pd
df = pd.read_parquet("s3://lake/silver/orders/")
revenue = df.groupby("date")["amount"].sum().reset_index()
revenue.to_parquet("s3://lake/gold/daily_revenue/")
extract_orders() >> build_revenue()
orders_pipeline()
For custom Python operators that OpenLineage cannot auto-detect, you can add manual lineage hints:
from airflow.lineage.entities import Table
@task(
inlets=[Table(cluster="lake", database="raw", name="orders")],
outlets=[Table(cluster="lake", database="silver", name="orders")],
)
def extract_orders():
...
3) Custom lineage decorator for Python functions
For pipelines outside Airflow (standalone scripts, Prefect flows, custom schedulers), build a decorator:
import uuid
import functools
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class Dataset:
namespace: str
name: str
schema_fields: list[dict] = field(default_factory=list)
def track_lineage(
inputs: list[Dataset],
outputs: list[Dataset],
job_namespace: str = "default",
emitter: LineageEmitter | None = None,
):
"""Decorator that emits OpenLineage events around a function."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
run_id = str(uuid.uuid4())
job_name = func.__qualname__
_emitter = emitter or LineageEmitter()
# Emit START
start_event = create_lineage_event(
"START", job_namespace, job_name, run_id,
[{"namespace": d.namespace, "name": d.name} for d in inputs],
[{"namespace": d.namespace, "name": d.name} for d in outputs],
)
_emitter.emit(start_event)
try:
result = func(*args, **kwargs)
# Emit COMPLETE
complete_event = create_lineage_event(
"COMPLETE", job_namespace, job_name, run_id,
[{"namespace": d.namespace, "name": d.name} for d in inputs],
[{"namespace": d.namespace, "name": d.name} for d in outputs],
)
_emitter.emit(complete_event)
return result
except Exception as e:
# Emit FAIL
fail_event = create_lineage_event(
"FAIL", job_namespace, job_name, run_id,
[{"namespace": d.namespace, "name": d.name} for d in inputs],
[{"namespace": d.namespace, "name": d.name} for d in outputs],
)
fail_event["run"]["facets"] = {
"errorMessage": {
"message": str(e),
"programmingLanguage": "python",
}
}
_emitter.emit(fail_event)
raise
return wrapper
return decorator
Usage
@track_lineage(
inputs=[Dataset("s3://lake", "raw/orders")],
outputs=[Dataset("s3://lake", "silver/orders")],
job_namespace="production",
)
def clean_orders():
import polars as pl
raw = pl.read_parquet("s3://lake/raw/orders/")
cleaned = raw.drop_nulls("order_id").unique("order_id")
cleaned.write_parquet("s3://lake/silver/orders/")
4) Column-level lineage
Table-level lineage shows which datasets connect. Column-level lineage shows how individual fields transform. This requires deeper instrumentation:
from dataclasses import dataclass
@dataclass
class ColumnLineage:
output_column: str
input_columns: list[str]
transformation: str # human-readable description
def document_column_lineage(
job_name: str,
lineage: list[ColumnLineage],
output_path: str = "lineage/column_lineage.jsonl",
):
"""Append column-level lineage records."""
import json
with open(output_path, "a") as f:
for col in lineage:
record = {
"job": job_name,
"output_column": col.output_column,
"input_columns": col.input_columns,
"transformation": col.transformation,
"recorded_at": datetime.now(timezone.utc).isoformat(),
}
f.write(json.dumps(record) + "\n")
# Document how Gold columns derive from Silver columns
document_column_lineage("build_daily_revenue", [
ColumnLineage(
output_column="total_revenue",
input_columns=["silver.orders.amount"],
transformation="SUM(amount) GROUP BY date, region",
),
ColumnLineage(
output_column="order_count",
input_columns=["silver.orders.order_id"],
transformation="COUNT(DISTINCT order_id) GROUP BY date, region",
),
ColumnLineage(
output_column="avg_order_value",
input_columns=["silver.orders.amount", "silver.orders.order_id"],
transformation="SUM(amount) / COUNT(DISTINCT order_id)",
),
])
SQL-based column lineage with sqllineage
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO gold.daily_revenue (date, region, total_revenue)
SELECT
order_date AS date,
region,
SUM(amount) AS total_revenue
FROM silver.orders
GROUP BY order_date, region
"""
runner = LineageRunner(sql)
print("Tables:", runner.table_entities())
print("Columns:", runner.get_column_lineage())
# Output: total_revenue <- silver.orders.amount
5) Lineage visualization and querying
DataHub integration
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
emitter = DatahubRestEmitter("http://datahub-gms:8080")
upstream = UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:s3,lake/silver/orders,PROD)",
type=DatasetLineageTypeClass.TRANSFORMED,
)
]
)
emitter.emit_mce({
"proposedSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:s3,lake/gold/daily_revenue,PROD)",
"aspects": [upstream],
}
})
Graph queries
Once lineage is stored, you can answer critical questions:
def get_upstream_chain(dataset: str, graph) -> list[str]:
"""Recursively find all upstream datasets."""
upstreams = graph.get_upstreams(dataset)
chain = list(upstreams)
for upstream in upstreams:
chain.extend(get_upstream_chain(upstream, graph))
return chain
def get_downstream_impact(dataset: str, graph) -> list[str]:
"""Find all datasets and dashboards affected by a change."""
downstreams = graph.get_downstreams(dataset)
impact = list(downstreams)
for downstream in downstreams:
impact.extend(get_downstream_impact(downstream, graph))
return impact
6) Lineage for debugging
When a metric looks wrong, use lineage to narrow the search:
def diagnose_metric(
metric_dataset: str,
metric_column: str,
lineage_store,
) -> dict:
"""Trace a suspicious metric back to its sources."""
column_lineage = lineage_store.get_column_lineage(
metric_dataset, metric_column
)
diagnosis = {
"metric": f"{metric_dataset}.{metric_column}",
"direct_sources": [],
"transformations": [],
}
for cl in column_lineage:
diagnosis["direct_sources"].extend(cl.input_columns)
diagnosis["transformations"].append(cl.transformation)
# Check freshness of each source
for source in diagnosis["direct_sources"]:
freshness = lineage_store.get_last_updated(source)
diagnosis[f"freshness_{source}"] = freshness
return diagnosis
Production checklist
- OpenLineage events emitted for every pipeline run (START, COMPLETE, FAIL)
- Table-level lineage captured for all Bronze → Silver → Gold transitions
- Column-level lineage documented for critical Gold metrics
- Lineage events stored in a metadata platform (DataHub, Marquez, Atlan)
- Impact analysis available before making source schema changes
- Lineage graph accessible to analysts and data scientists (not just engineers)
- Failed runs include error context in lineage events
- Lineage is tested: verify that known dependencies appear in the graph
- Stale lineage records are pruned when pipelines are decommissioned
One thing to remember: lineage’s ultimate purpose is speed—speed of debugging, speed of impact analysis, and speed of trust-building—invest in it proportionally to how much time you spend asking “where did this data come from?”
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.