ML Pipeline Orchestration in Python — Deep Dive
Airflow: The Industry Standard
Apache Airflow defines pipelines as Python code using DAGs (Directed Acyclic Graphs). It remains the most widely deployed orchestrator in production ML.
Basic ML Pipeline DAG
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
default_args = {
"owner": "ml-platform",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
}
with DAG(
dag_id="credit_scoring_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
tags=["ml", "credit-scoring"],
) as dag:
def ingest_data(**context):
"""Pull latest data from warehouse."""
from data_pipeline import DataIngester
ingester = DataIngester()
path = ingester.fetch_daily_snapshot(context["ds"])
context["ti"].xcom_push(key="data_path", value=path)
def validate_data(**context):
"""Run data quality checks."""
data_path = context["ti"].xcom_pull(key="data_path")
from data_pipeline import DataValidator
validator = DataValidator()
report = validator.validate(data_path)
if not report["passed"]:
raise ValueError(f"Data validation failed: {report['failures']}")
def train_model(**context):
"""Train and log model to MLflow."""
import mlflow
data_path = context["ti"].xcom_pull(key="data_path")
from training import Trainer
trainer = Trainer()
run_id = trainer.train_and_log(data_path)
context["ti"].xcom_push(key="run_id", value=run_id)
def evaluate_and_branch(**context):
"""Compare new model to production. Branch based on result."""
run_id = context["ti"].xcom_pull(key="run_id")
from evaluation import compare_to_production
should_deploy = compare_to_production(run_id, min_improvement=0.005)
return "register_model" if should_deploy else "skip_deployment"
def register_model(**context):
"""Register model in MLflow registry."""
import mlflow
run_id = context["ti"].xcom_pull(key="run_id")
result = mlflow.register_model(
f"runs:/{run_id}/model", "credit-scorer"
)
context["ti"].xcom_push(key="model_version", value=result.version)
def deploy_model(**context):
"""Promote to production stage."""
from mlflow.tracking import MlflowClient
version = context["ti"].xcom_pull(key="model_version")
client = MlflowClient()
client.transition_model_version_stage(
name="credit-scorer", version=version,
stage="Production", archive_existing_versions=True
)
ingest = PythonOperator(task_id="ingest_data", python_callable=ingest_data)
validate = PythonOperator(task_id="validate_data", python_callable=validate_data)
train = PythonOperator(task_id="train_model", python_callable=train_model)
evaluate = BranchPythonOperator(
task_id="evaluate_model", python_callable=evaluate_and_branch
)
register = PythonOperator(task_id="register_model", python_callable=register_model)
deploy = PythonOperator(task_id="deploy_model", python_callable=deploy_model)
skip = EmptyOperator(task_id="skip_deployment")
ingest >> validate >> train >> evaluate >> [register, skip]
register >> deploy
Dynamic Task Generation
When you need to train multiple model variants in parallel:
from airflow.utils.task_group import TaskGroup
model_configs = [
{"name": "xgboost_v1", "params": {"max_depth": 6, "lr": 0.01}},
{"name": "lightgbm_v1", "params": {"num_leaves": 31, "lr": 0.05}},
{"name": "catboost_v1", "params": {"depth": 8, "lr": 0.03}},
]
with TaskGroup("train_variants") as train_group:
for config in model_configs:
PythonOperator(
task_id=f"train_{config['name']}",
python_callable=train_variant,
op_kwargs={"config": config},
)
validate >> train_group >> evaluate
Prefect: Modern Python-Native Orchestration
Prefect eliminates much of Airflow’s boilerplate with a decorator-based API:
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1))
def ingest_data(date: str) -> str:
"""Fetch data — cached for 1 hour to avoid redundant pulls."""
from data_pipeline import DataIngester
return DataIngester().fetch_daily_snapshot(date)
@task(retries=2)
def validate_data(data_path: str) -> bool:
from data_pipeline import DataValidator
report = DataValidator().validate(data_path)
if not report["passed"]:
raise ValueError(f"Validation failed: {report['failures']}")
return True
@task
def train_model(data_path: str) -> str:
import mlflow
from training import Trainer
return Trainer().train_and_log(data_path)
@task
def evaluate_model(run_id: str) -> bool:
from evaluation import compare_to_production
return compare_to_production(run_id, min_improvement=0.005)
@task
def deploy_model(run_id: str):
import mlflow
from mlflow.tracking import MlflowClient
result = mlflow.register_model(f"runs:/{run_id}/model", "credit-scorer")
client = MlflowClient()
client.transition_model_version_stage(
name="credit-scorer", version=result.version,
stage="Production", archive_existing_versions=True
)
@flow(name="credit-scoring-pipeline", log_prints=True)
def ml_pipeline(date: str):
data_path = ingest_data(date)
validate_data(data_path)
run_id = train_model(data_path)
should_deploy = evaluate_model(run_id)
if should_deploy:
deploy_model(run_id)
print(f"Model deployed from run {run_id}")
else:
print(f"Model from run {run_id} did not meet threshold — skipping")
# Run
ml_pipeline("2025-06-15")
Prefect’s key advantages: native Python control flow (if/else, loops), built-in caching, and a hybrid execution model where the orchestration server manages state while tasks run on your infrastructure.
Dagster: Asset-Oriented Pipelines
Dagster models pipelines as data assets rather than tasks. Instead of “run this function,” you define “produce this dataset”:
from dagster import asset, define_asset_job, ScheduleDefinition
import pandas as pd
@asset(group_name="credit_scoring")
def raw_loan_data() -> pd.DataFrame:
"""Ingest raw loan applications from warehouse."""
return pd.read_sql("SELECT * FROM loan_applications WHERE date = CURRENT_DATE", conn)
@asset(group_name="credit_scoring")
def processed_features(raw_loan_data: pd.DataFrame) -> pd.DataFrame:
"""Feature engineering on raw data."""
df = raw_loan_data.copy()
df["debt_to_income"] = df["total_debt"] / df["annual_income"].clip(lower=1)
df["credit_utilization"] = df["credit_used"] / df["credit_limit"].clip(lower=1)
return df
@asset(group_name="credit_scoring")
def trained_model(processed_features: pd.DataFrame) -> dict:
"""Train model and return metrics."""
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
import mlflow
X = processed_features.drop(columns=["default"])
y = processed_features["default"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
with mlflow.start_run():
model = XGBClassifier(n_estimators=200)
model.fit(X_train, y_train)
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(model, "model")
return {"run_id": mlflow.active_run().info.run_id, "accuracy": accuracy}
# Schedule: daily at 6 AM
credit_scoring_job = define_asset_job("credit_scoring_daily", selection="credit_scoring/*")
credit_scoring_schedule = ScheduleDefinition(
job=credit_scoring_job, cron_schedule="0 6 * * *"
)
Dagster automatically infers the dependency graph from function signatures. If trained_model takes processed_features as input, Dagster knows to run processing before training.
Event-Triggered Retraining
Instead of fixed schedules, trigger pipelines when conditions are met:
from prefect import flow
from prefect.events import DeploymentEventTrigger
@flow
def retraining_pipeline():
"""Triggered by drift detection alert."""
data = ingest_latest_data()
validate_data(data)
new_model = train_with_recent_data(data, lookback_days=90)
if passes_evaluation(new_model):
deploy_via_shadow(new_model)
# Deploy with event trigger
# Trigger: when drift-monitor emits "drift.detected" event
Common triggers:
- Data drift detected — monitoring system flags distribution shift
- Performance degradation — estimated or measured metrics drop below threshold
- New data volume threshold — enough new labeled data accumulated to warrant retraining
- Calendar-based — monthly regulatory model refresh
Orchestrator Comparison for ML
| Feature | Airflow | Prefect | Dagster | Kubeflow |
|---|---|---|---|---|
| DAG definition | Python (explicit) | Python (decorators) | Python (assets) | YAML/SDK |
| Control flow | BranchOperator | Native Python | Graph-based | Conditions |
| Caching | Custom | Built-in | Built-in | Artifacts |
| Data lineage | Limited | Basic | Strong | Basic |
| Kubernetes native | Via KubeExecutor | Via agents | Via Helm | Yes |
| Learning curve | Steep | Moderate | Moderate | Steep |
| Community size | Largest | Growing | Growing | Medium |
Pipeline Anti-Patterns
-
Monolithic tasks — one giant task that does ingestion, training, and evaluation. Breaks retry granularity and makes debugging impossible.
-
XCom abuse (Airflow) — passing large datasets through XCom (metadata store). Pass file paths or URIs instead of actual data.
-
No idempotency — tasks that append to tables instead of writing to dated partitions. Re-running creates duplicates.
-
Missing data validation — training on corrupt data produces a corrupt model. Always validate between ingestion and training.
-
Tight coupling to orchestrator — business logic embedded in Airflow operators. Keep ML code in separate packages; orchestrator code should only call functions.
Resource Management
ML pipelines have heterogeneous resource needs — data processing needs CPU, training needs GPUs, evaluation needs neither:
# Airflow: resource-aware task assignment
train = PythonOperator(
task_id="train_model",
python_callable=train_model,
executor_config={
"KubernetesExecutor": {
"request_memory": "16Gi",
"request_cpu": "4",
"limit_gpu": "1",
"node_selector": {"gpu-type": "a100"}
}
}
)
This ensures training tasks land on GPU nodes while cheaper tasks use standard compute — keeping infrastructure costs proportional to actual needs.
One thing to remember: ML pipeline orchestration is the backbone of reliable MLOps — it transforms manual, error-prone model building into automated workflows with built-in validation gates, retry logic, and full auditability of every run.
See Also
- Python Ab Testing Ml Models Why taste-testing two cookie recipes with different friends is the fairest way to pick a winner.
- Python Feature Store Design Why a shared ingredient pantry saves every cook in the kitchen from buying the same spices over and over.
- Python Mlflow Experiment Tracking Find out why writing down every cooking experiment helps you recreate the perfect recipe every time.
- Python Model Explainability Shap How asking 'why did you pick that answer?' turns a mysterious black box into something you can actually trust.
- Python Model Monitoring Drift Why a weather forecast that was perfect last summer might completely fail this winter — and how to catch it early.