Python Workflow Engines — Deep Dive

Airflow: Production DAG Patterns

Standard ETL DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=60),
    "execution_timeout": timedelta(hours=2),
    "email_on_failure": True,
    "email": ["data-alerts@company.com"],
}

with DAG(
    dag_id="daily_sales_pipeline",
    default_args=default_args,
    schedule_interval="0 6 * * *",  # 6 AM daily
    start_date=days_ago(1),
    catchup=False,
    max_active_runs=1,
    tags=["sales", "daily"],
) as dag:

    extract = PythonOperator(
        task_id="extract_sales_data",
        python_callable=extract_from_source,
        op_kwargs={"date": "{{ ds }}"},  # Jinja template for execution date
    )

    validate = PythonOperator(
        task_id="validate_data",
        python_callable=run_quality_checks,
    )

    transform = PythonOperator(
        task_id="transform_sales",
        python_callable=apply_transformations,
    )

    load_warehouse = PostgresOperator(
        task_id="load_to_warehouse",
        postgres_conn_id="warehouse",
        sql="sql/load_sales.sql",
        parameters={"date": "{{ ds }}"},
    )

    notify = PythonOperator(
        task_id="notify_stakeholders",
        python_callable=send_completion_email,
        trigger_rule="all_done",  # Run even if upstream failed
    )

    extract >> validate >> transform >> load_warehouse >> notify

Dynamic Task Generation

from airflow.decorators import dag, task
from datetime import datetime

@dag(schedule_interval="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def dynamic_regional_pipeline():

    @task
    def get_regions():
        return ["us-east", "us-west", "eu-west", "ap-southeast"]

    @task
    def process_region(region: str):
        print(f"Processing {region}")
        return {"region": region, "records": 1000}

    @task
    def aggregate_results(results: list):
        total = sum(r["records"] for r in results)
        print(f"Total records across all regions: {total}")

    regions = get_regions()
    results = process_region.expand(region=regions)  # Dynamic task mapping
    aggregate_results(results)

pipeline = dynamic_regional_pipeline()

Sensor-Based Triggering

from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor

# Wait for a file to appear
wait_for_file = FileSensor(
    task_id="wait_for_data_file",
    filepath="/data/incoming/{{ ds }}/sales.csv",
    poke_interval=300,  # Check every 5 minutes
    timeout=3600 * 6,   # Give up after 6 hours
    mode="reschedule",  # Free up worker slot while waiting
)

# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
    task_id="wait_for_ingestion",
    external_dag_id="raw_data_ingestion",
    external_task_id="final_task",
    execution_delta=timedelta(hours=0),
    mode="reschedule",
)

Prefect: Modern Python-Native Workflows

Flow Definition

from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=[60, 300, 1800],  # 1min, 5min, 30min
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
)
def extract_data(source: str, date: str) -> dict:
    logger = get_run_logger()
    logger.info(f"Extracting from {source} for {date}")
    # ... extraction logic
    return {"records": 5000, "source": source}

@task(retries=2)
def validate(data: dict) -> dict:
    if data["records"] == 0:
        raise ValueError("No records extracted")
    return data

@task
def transform(data: dict) -> dict:
    return {**data, "transformed": True}

@task
def load(data: dict, destination: str):
    logger = get_run_logger()
    logger.info(f"Loading {data['records']} records to {destination}")

@flow(name="ETL Pipeline", log_prints=True)
def etl_pipeline(source: str = "postgres", date: str = "2026-03-28"):
    raw_data = extract_data(source, date)
    validated = validate(raw_data)
    transformed = transform(validated)
    load(transformed, destination="warehouse")

# Run locally
etl_pipeline()

Dynamic Workflows with Prefect

from prefect import flow, task
from prefect.futures import wait

@task
def process_file(filepath: str) -> dict:
    return {"file": filepath, "rows": 100}

@flow
def dynamic_file_processing():
    import glob
    files = glob.glob("/data/incoming/*.csv")

    # Submit all tasks concurrently
    futures = [process_file.submit(f) for f in files]

    # Wait for all and collect results
    results = [f.result() for f in futures]

    total_rows = sum(r["rows"] for r in results)
    print(f"Processed {len(files)} files, {total_rows} total rows")

Prefect Deployment

from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

deployment = Deployment.build_from_flow(
    flow=etl_pipeline,
    name="daily-etl",
    schedule=CronSchedule(cron="0 6 * * *"),
    work_queue_name="default",
    parameters={"source": "postgres"},
    tags=["production", "etl"],
)

deployment.apply()

Dagster: Asset-Centric Workflows

Software-Defined Assets

from dagster import (
    asset,
    AssetIn,
    DailyPartitionsDefinition,
    FreshnessPolicy,
    AutoMaterializePolicy,
    Definitions,
    define_asset_job,
)
import pandas as pd

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(
    partitions_def=daily_partitions,
    group_name="raw",
    description="Raw sales transactions from the source database",
)
def raw_sales(context) -> pd.DataFrame:
    partition_date = context.asset_partition_key_for_output()
    context.log.info(f"Extracting sales for {partition_date}")
    # ... extract from database
    return pd.DataFrame({"order_id": [1, 2], "amount": [50, 75]})

@asset(
    ins={"raw_sales": AssetIn()},
    partitions_def=daily_partitions,
    group_name="staging",
    description="Validated and cleaned sales data",
)
def clean_sales(context, raw_sales: pd.DataFrame) -> pd.DataFrame:
    # Remove nulls, validate schema
    cleaned = raw_sales.dropna()
    context.log.info(f"Cleaned {len(raw_sales)}{len(cleaned)} records")
    return cleaned

@asset(
    ins={"clean_sales": AssetIn()},
    partitions_def=daily_partitions,
    group_name="analytics",
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 8),
    auto_materialize_policy=AutoMaterializePolicy.eager(),
    description="Daily revenue metrics aggregated by region",
)
def daily_revenue(context, clean_sales: pd.DataFrame) -> pd.DataFrame:
    revenue = clean_sales.groupby("region")["amount"].sum().reset_index()
    return revenue

# Define a job that materializes all assets
daily_pipeline = define_asset_job(
    name="daily_sales_pipeline",
    selection="*",  # all assets
    partitions_def=daily_partitions,
)

defs = Definitions(
    assets=[raw_sales, clean_sales, daily_revenue],
    jobs=[daily_pipeline],
)

Dagster Resources and Configuration

from dagster import resource, configured, ConfigurableResource

class DatabaseResource(ConfigurableResource):
    host: str
    port: int = 5432
    database: str
    user: str
    password: str

    def query(self, sql: str) -> pd.DataFrame:
        import psycopg2
        conn = psycopg2.connect(
            host=self.host, port=self.port,
            database=self.database,
            user=self.user, password=self.password,
        )
        return pd.read_sql(sql, conn)

@asset(required_resource_keys={"database"})
def raw_sales(context) -> pd.DataFrame:
    return context.resources.database.query("SELECT * FROM sales WHERE date = ...")

defs = Definitions(
    assets=[raw_sales],
    resources={
        "database": DatabaseResource(
            host="warehouse.internal",
            database="analytics",
            user="reader",
            password={"env": "DB_PASSWORD"},
        ),
    },
)

Testing Workflows

Airflow DAG Tests

import pytest
from airflow.models import DagBag

def test_dag_integrity():
    """Ensure all DAGs parse without errors."""
    dag_bag = DagBag(include_examples=False)
    assert len(dag_bag.import_errors) == 0, (
        f"DAG import errors: {dag_bag.import_errors}"
    )

def test_dag_structure():
    """Verify specific DAG properties."""
    dag_bag = DagBag(include_examples=False)
    dag = dag_bag.get_dag("daily_sales_pipeline")

    assert dag is not None
    assert dag.schedule_interval == "0 6 * * *"
    assert len(dag.tasks) == 5

    # Verify dependency chain
    extract = dag.get_task("extract_sales_data")
    validate = dag.get_task("validate_data")
    assert validate in extract.downstream_list

def test_task_callable():
    """Test individual task functions in isolation."""
    result = extract_from_source(date="2026-03-28")
    assert result is not None
    assert len(result) > 0

Prefect Flow Tests

from prefect.testing.utilities import prefect_test_harness

def test_etl_pipeline():
    with prefect_test_harness():
        result = etl_pipeline(source="test_db", date="2026-03-28")
        # Flows return the result of the last expression

def test_extract_task():
    # Tasks can be called directly as regular functions
    result = extract_data.fn(source="test_db", date="2026-03-28")
    assert result["records"] > 0

Dagster Asset Tests

from dagster import materialize, build_asset_context

def test_raw_sales():
    context = build_asset_context(
        partition_key="2026-03-28",
    )
    result = raw_sales(context)
    assert len(result) > 0
    assert "order_id" in result.columns

def test_full_pipeline():
    result = materialize(
        [raw_sales, clean_sales, daily_revenue],
        partition_key="2026-03-28",
    )
    assert result.success

Migration Strategies

Airflow → Prefect

The main shift: from DAG files with operators to decorated Python functions.

# Airflow style
extract = PythonOperator(task_id="extract", python_callable=extract_fn)
transform = PythonOperator(task_id="transform", python_callable=transform_fn)
extract >> transform

# Prefect equivalent
@flow
def pipeline():
    data = extract_fn()  # These are now @task-decorated functions
    result = transform_fn(data)

Migration approach:

  1. Extract business logic from Airflow operators into standalone functions
  2. Wrap functions with @task decorators
  3. Create @flow functions that call tasks with data dependencies
  4. Run both systems in parallel during transition
  5. Cut over DAG by DAG

Airflow → Dagster

The bigger shift: from task-centric to asset-centric thinking.

# Airflow: "run extract, then transform, then load"
# Dagster: "I want these data assets to exist and be fresh"

# Step 1: Map Airflow tasks to Dagster assets
# extract_task → raw_data asset
# transform_task → clean_data asset
# load_task → warehouse_table asset

# Step 2: Define dependencies as asset inputs
@asset
def raw_data(): ...

@asset(ins={"raw_data": AssetIn()})
def clean_data(raw_data): ...

Choosing the Right Engine

What's your primary use case?
├── Scheduled data pipelines (ETL/ELT)
│   ├── Large team, many pipelines → Airflow (ecosystem, community)
│   ├── Small team, quick iteration → Prefect (simpler setup)
│   └── Asset-focused analytics → Dagster (data quality built in)
├── ML pipeline orchestration
│   ├── Training + serving → Dagster or Prefect
│   └── Experiment tracking focus → Consider MLflow + lighter engine
├── General task orchestration
│   ├── Need distributed execution → Airflow (Celery/K8s executor)
│   └── Moderate scale → Prefect (work pools)
└── Event-driven workflows
    └── Prefect (native event triggers)

Performance and Scaling

DimensionAirflowPrefectDagster
Max concurrent tasksExecutor-dependent (100s with Celery)Work pool-dependentRun coordinator-dependent
Scheduler overhead~5-10s per DAG parse cycleMinimal (agent-based)Minimal (daemon-based)
Metadata DB loadHigh at scale (needs tuning)Moderate (cloud-managed option)Moderate
K8s nativeKubernetesExecutorK8s work poolK8s run launcher
Horizontal scalingAdd workersAdd agentsAdd run coordinators

One thing to remember: The three major Python workflow engines serve different philosophies — Airflow orchestrates tasks with the largest ecosystem, Prefect modernizes the pattern with Python-native decorators, and Dagster rethinks the problem by focusing on data assets rather than execution steps. Choose based on how your team thinks about the work, not just feature checklists.

pythonworkflowsautomation

See Also