ML Pipeline Orchestration in Python — Deep Dive

Airflow: The Industry Standard

Apache Airflow defines pipelines as Python code using DAGs (Directed Acyclic Graphs). It remains the most widely deployed orchestrator in production ML.

Basic ML Pipeline DAG

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-platform",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
}

with DAG(
    dag_id="credit_scoring_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=["ml", "credit-scoring"],
) as dag:

    def ingest_data(**context):
        """Pull latest data from warehouse."""
        from data_pipeline import DataIngester
        ingester = DataIngester()
        path = ingester.fetch_daily_snapshot(context["ds"])
        context["ti"].xcom_push(key="data_path", value=path)

    def validate_data(**context):
        """Run data quality checks."""
        data_path = context["ti"].xcom_pull(key="data_path")
        from data_pipeline import DataValidator
        validator = DataValidator()
        report = validator.validate(data_path)
        if not report["passed"]:
            raise ValueError(f"Data validation failed: {report['failures']}")

    def train_model(**context):
        """Train and log model to MLflow."""
        import mlflow
        data_path = context["ti"].xcom_pull(key="data_path")
        from training import Trainer
        trainer = Trainer()
        run_id = trainer.train_and_log(data_path)
        context["ti"].xcom_push(key="run_id", value=run_id)

    def evaluate_and_branch(**context):
        """Compare new model to production. Branch based on result."""
        run_id = context["ti"].xcom_pull(key="run_id")
        from evaluation import compare_to_production
        should_deploy = compare_to_production(run_id, min_improvement=0.005)
        return "register_model" if should_deploy else "skip_deployment"

    def register_model(**context):
        """Register model in MLflow registry."""
        import mlflow
        run_id = context["ti"].xcom_pull(key="run_id")
        result = mlflow.register_model(
            f"runs:/{run_id}/model", "credit-scorer"
        )
        context["ti"].xcom_push(key="model_version", value=result.version)

    def deploy_model(**context):
        """Promote to production stage."""
        from mlflow.tracking import MlflowClient
        version = context["ti"].xcom_pull(key="model_version")
        client = MlflowClient()
        client.transition_model_version_stage(
            name="credit-scorer", version=version,
            stage="Production", archive_existing_versions=True
        )

    ingest = PythonOperator(task_id="ingest_data", python_callable=ingest_data)
    validate = PythonOperator(task_id="validate_data", python_callable=validate_data)
    train = PythonOperator(task_id="train_model", python_callable=train_model)
    evaluate = BranchPythonOperator(
        task_id="evaluate_model", python_callable=evaluate_and_branch
    )
    register = PythonOperator(task_id="register_model", python_callable=register_model)
    deploy = PythonOperator(task_id="deploy_model", python_callable=deploy_model)
    skip = EmptyOperator(task_id="skip_deployment")

    ingest >> validate >> train >> evaluate >> [register, skip]
    register >> deploy

Dynamic Task Generation

When you need to train multiple model variants in parallel:

from airflow.utils.task_group import TaskGroup

model_configs = [
    {"name": "xgboost_v1", "params": {"max_depth": 6, "lr": 0.01}},
    {"name": "lightgbm_v1", "params": {"num_leaves": 31, "lr": 0.05}},
    {"name": "catboost_v1", "params": {"depth": 8, "lr": 0.03}},
]

with TaskGroup("train_variants") as train_group:
    for config in model_configs:
        PythonOperator(
            task_id=f"train_{config['name']}",
            python_callable=train_variant,
            op_kwargs={"config": config},
        )

validate >> train_group >> evaluate

Prefect: Modern Python-Native Orchestration

Prefect eliminates much of Airflow’s boilerplate with a decorator-based API:

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(retries=3, retry_delay_seconds=60, cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1))
def ingest_data(date: str) -> str:
    """Fetch data — cached for 1 hour to avoid redundant pulls."""
    from data_pipeline import DataIngester
    return DataIngester().fetch_daily_snapshot(date)

@task(retries=2)
def validate_data(data_path: str) -> bool:
    from data_pipeline import DataValidator
    report = DataValidator().validate(data_path)
    if not report["passed"]:
        raise ValueError(f"Validation failed: {report['failures']}")
    return True

@task
def train_model(data_path: str) -> str:
    import mlflow
    from training import Trainer
    return Trainer().train_and_log(data_path)

@task
def evaluate_model(run_id: str) -> bool:
    from evaluation import compare_to_production
    return compare_to_production(run_id, min_improvement=0.005)

@task
def deploy_model(run_id: str):
    import mlflow
    from mlflow.tracking import MlflowClient
    result = mlflow.register_model(f"runs:/{run_id}/model", "credit-scorer")
    client = MlflowClient()
    client.transition_model_version_stage(
        name="credit-scorer", version=result.version,
        stage="Production", archive_existing_versions=True
    )

@flow(name="credit-scoring-pipeline", log_prints=True)
def ml_pipeline(date: str):
    data_path = ingest_data(date)
    validate_data(data_path)
    run_id = train_model(data_path)
    should_deploy = evaluate_model(run_id)

    if should_deploy:
        deploy_model(run_id)
        print(f"Model deployed from run {run_id}")
    else:
        print(f"Model from run {run_id} did not meet threshold — skipping")

# Run
ml_pipeline("2025-06-15")

Prefect’s key advantages: native Python control flow (if/else, loops), built-in caching, and a hybrid execution model where the orchestration server manages state while tasks run on your infrastructure.

Dagster: Asset-Oriented Pipelines

Dagster models pipelines as data assets rather than tasks. Instead of “run this function,” you define “produce this dataset”:

from dagster import asset, define_asset_job, ScheduleDefinition
import pandas as pd

@asset(group_name="credit_scoring")
def raw_loan_data() -> pd.DataFrame:
    """Ingest raw loan applications from warehouse."""
    return pd.read_sql("SELECT * FROM loan_applications WHERE date = CURRENT_DATE", conn)

@asset(group_name="credit_scoring")
def processed_features(raw_loan_data: pd.DataFrame) -> pd.DataFrame:
    """Feature engineering on raw data."""
    df = raw_loan_data.copy()
    df["debt_to_income"] = df["total_debt"] / df["annual_income"].clip(lower=1)
    df["credit_utilization"] = df["credit_used"] / df["credit_limit"].clip(lower=1)
    return df

@asset(group_name="credit_scoring")
def trained_model(processed_features: pd.DataFrame) -> dict:
    """Train model and return metrics."""
    from sklearn.model_selection import train_test_split
    from xgboost import XGBClassifier
    import mlflow

    X = processed_features.drop(columns=["default"])
    y = processed_features["default"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    with mlflow.start_run():
        model = XGBClassifier(n_estimators=200)
        model.fit(X_train, y_train)
        accuracy = model.score(X_test, y_test)
        mlflow.log_metric("accuracy", accuracy)
        mlflow.sklearn.log_model(model, "model")
        return {"run_id": mlflow.active_run().info.run_id, "accuracy": accuracy}

# Schedule: daily at 6 AM
credit_scoring_job = define_asset_job("credit_scoring_daily", selection="credit_scoring/*")
credit_scoring_schedule = ScheduleDefinition(
    job=credit_scoring_job, cron_schedule="0 6 * * *"
)

Dagster automatically infers the dependency graph from function signatures. If trained_model takes processed_features as input, Dagster knows to run processing before training.

Event-Triggered Retraining

Instead of fixed schedules, trigger pipelines when conditions are met:

from prefect import flow
from prefect.events import DeploymentEventTrigger

@flow
def retraining_pipeline():
    """Triggered by drift detection alert."""
    data = ingest_latest_data()
    validate_data(data)
    new_model = train_with_recent_data(data, lookback_days=90)
    if passes_evaluation(new_model):
        deploy_via_shadow(new_model)

# Deploy with event trigger
# Trigger: when drift-monitor emits "drift.detected" event

Common triggers:

  • Data drift detected — monitoring system flags distribution shift
  • Performance degradation — estimated or measured metrics drop below threshold
  • New data volume threshold — enough new labeled data accumulated to warrant retraining
  • Calendar-based — monthly regulatory model refresh

Orchestrator Comparison for ML

FeatureAirflowPrefectDagsterKubeflow
DAG definitionPython (explicit)Python (decorators)Python (assets)YAML/SDK
Control flowBranchOperatorNative PythonGraph-basedConditions
CachingCustomBuilt-inBuilt-inArtifacts
Data lineageLimitedBasicStrongBasic
Kubernetes nativeVia KubeExecutorVia agentsVia HelmYes
Learning curveSteepModerateModerateSteep
Community sizeLargestGrowingGrowingMedium

Pipeline Anti-Patterns

  1. Monolithic tasks — one giant task that does ingestion, training, and evaluation. Breaks retry granularity and makes debugging impossible.

  2. XCom abuse (Airflow) — passing large datasets through XCom (metadata store). Pass file paths or URIs instead of actual data.

  3. No idempotency — tasks that append to tables instead of writing to dated partitions. Re-running creates duplicates.

  4. Missing data validation — training on corrupt data produces a corrupt model. Always validate between ingestion and training.

  5. Tight coupling to orchestrator — business logic embedded in Airflow operators. Keep ML code in separate packages; orchestrator code should only call functions.

Resource Management

ML pipelines have heterogeneous resource needs — data processing needs CPU, training needs GPUs, evaluation needs neither:

# Airflow: resource-aware task assignment
train = PythonOperator(
    task_id="train_model",
    python_callable=train_model,
    executor_config={
        "KubernetesExecutor": {
            "request_memory": "16Gi",
            "request_cpu": "4",
            "limit_gpu": "1",
            "node_selector": {"gpu-type": "a100"}
        }
    }
)

This ensures training tasks land on GPU nodes while cheaper tasks use standard compute — keeping infrastructure costs proportional to actual needs.

One thing to remember: ML pipeline orchestration is the backbone of reliable MLOps — it transforms manual, error-prone model building into automated workflows with built-in validation gates, retry logic, and full auditability of every run.

pythonml-pipelineorchestrationmlops

See Also