Python Workflow Engines — Deep Dive
Airflow: Production DAG Patterns
Standard ETL DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=60),
"execution_timeout": timedelta(hours=2),
"email_on_failure": True,
"email": ["data-alerts@company.com"],
}
with DAG(
dag_id="daily_sales_pipeline",
default_args=default_args,
schedule_interval="0 6 * * *", # 6 AM daily
start_date=days_ago(1),
catchup=False,
max_active_runs=1,
tags=["sales", "daily"],
) as dag:
extract = PythonOperator(
task_id="extract_sales_data",
python_callable=extract_from_source,
op_kwargs={"date": "{{ ds }}"}, # Jinja template for execution date
)
validate = PythonOperator(
task_id="validate_data",
python_callable=run_quality_checks,
)
transform = PythonOperator(
task_id="transform_sales",
python_callable=apply_transformations,
)
load_warehouse = PostgresOperator(
task_id="load_to_warehouse",
postgres_conn_id="warehouse",
sql="sql/load_sales.sql",
parameters={"date": "{{ ds }}"},
)
notify = PythonOperator(
task_id="notify_stakeholders",
python_callable=send_completion_email,
trigger_rule="all_done", # Run even if upstream failed
)
extract >> validate >> transform >> load_warehouse >> notify
Dynamic Task Generation
from airflow.decorators import dag, task
from datetime import datetime
@dag(schedule_interval="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def dynamic_regional_pipeline():
@task
def get_regions():
return ["us-east", "us-west", "eu-west", "ap-southeast"]
@task
def process_region(region: str):
print(f"Processing {region}")
return {"region": region, "records": 1000}
@task
def aggregate_results(results: list):
total = sum(r["records"] for r in results)
print(f"Total records across all regions: {total}")
regions = get_regions()
results = process_region.expand(region=regions) # Dynamic task mapping
aggregate_results(results)
pipeline = dynamic_regional_pipeline()
Sensor-Based Triggering
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
# Wait for a file to appear
wait_for_file = FileSensor(
task_id="wait_for_data_file",
filepath="/data/incoming/{{ ds }}/sales.csv",
poke_interval=300, # Check every 5 minutes
timeout=3600 * 6, # Give up after 6 hours
mode="reschedule", # Free up worker slot while waiting
)
# Wait for another DAG to complete
wait_for_upstream = ExternalTaskSensor(
task_id="wait_for_ingestion",
external_dag_id="raw_data_ingestion",
external_task_id="final_task",
execution_delta=timedelta(hours=0),
mode="reschedule",
)
Prefect: Modern Python-Native Workflows
Flow Definition
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=[60, 300, 1800], # 1min, 5min, 30min
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def extract_data(source: str, date: str) -> dict:
logger = get_run_logger()
logger.info(f"Extracting from {source} for {date}")
# ... extraction logic
return {"records": 5000, "source": source}
@task(retries=2)
def validate(data: dict) -> dict:
if data["records"] == 0:
raise ValueError("No records extracted")
return data
@task
def transform(data: dict) -> dict:
return {**data, "transformed": True}
@task
def load(data: dict, destination: str):
logger = get_run_logger()
logger.info(f"Loading {data['records']} records to {destination}")
@flow(name="ETL Pipeline", log_prints=True)
def etl_pipeline(source: str = "postgres", date: str = "2026-03-28"):
raw_data = extract_data(source, date)
validated = validate(raw_data)
transformed = transform(validated)
load(transformed, destination="warehouse")
# Run locally
etl_pipeline()
Dynamic Workflows with Prefect
from prefect import flow, task
from prefect.futures import wait
@task
def process_file(filepath: str) -> dict:
return {"file": filepath, "rows": 100}
@flow
def dynamic_file_processing():
import glob
files = glob.glob("/data/incoming/*.csv")
# Submit all tasks concurrently
futures = [process_file.submit(f) for f in files]
# Wait for all and collect results
results = [f.result() for f in futures]
total_rows = sum(r["rows"] for r in results)
print(f"Processed {len(files)} files, {total_rows} total rows")
Prefect Deployment
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule
deployment = Deployment.build_from_flow(
flow=etl_pipeline,
name="daily-etl",
schedule=CronSchedule(cron="0 6 * * *"),
work_queue_name="default",
parameters={"source": "postgres"},
tags=["production", "etl"],
)
deployment.apply()
Dagster: Asset-Centric Workflows
Software-Defined Assets
from dagster import (
asset,
AssetIn,
DailyPartitionsDefinition,
FreshnessPolicy,
AutoMaterializePolicy,
Definitions,
define_asset_job,
)
import pandas as pd
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(
partitions_def=daily_partitions,
group_name="raw",
description="Raw sales transactions from the source database",
)
def raw_sales(context) -> pd.DataFrame:
partition_date = context.asset_partition_key_for_output()
context.log.info(f"Extracting sales for {partition_date}")
# ... extract from database
return pd.DataFrame({"order_id": [1, 2], "amount": [50, 75]})
@asset(
ins={"raw_sales": AssetIn()},
partitions_def=daily_partitions,
group_name="staging",
description="Validated and cleaned sales data",
)
def clean_sales(context, raw_sales: pd.DataFrame) -> pd.DataFrame:
# Remove nulls, validate schema
cleaned = raw_sales.dropna()
context.log.info(f"Cleaned {len(raw_sales)} → {len(cleaned)} records")
return cleaned
@asset(
ins={"clean_sales": AssetIn()},
partitions_def=daily_partitions,
group_name="analytics",
freshness_policy=FreshnessPolicy(maximum_lag_minutes=60 * 8),
auto_materialize_policy=AutoMaterializePolicy.eager(),
description="Daily revenue metrics aggregated by region",
)
def daily_revenue(context, clean_sales: pd.DataFrame) -> pd.DataFrame:
revenue = clean_sales.groupby("region")["amount"].sum().reset_index()
return revenue
# Define a job that materializes all assets
daily_pipeline = define_asset_job(
name="daily_sales_pipeline",
selection="*", # all assets
partitions_def=daily_partitions,
)
defs = Definitions(
assets=[raw_sales, clean_sales, daily_revenue],
jobs=[daily_pipeline],
)
Dagster Resources and Configuration
from dagster import resource, configured, ConfigurableResource
class DatabaseResource(ConfigurableResource):
host: str
port: int = 5432
database: str
user: str
password: str
def query(self, sql: str) -> pd.DataFrame:
import psycopg2
conn = psycopg2.connect(
host=self.host, port=self.port,
database=self.database,
user=self.user, password=self.password,
)
return pd.read_sql(sql, conn)
@asset(required_resource_keys={"database"})
def raw_sales(context) -> pd.DataFrame:
return context.resources.database.query("SELECT * FROM sales WHERE date = ...")
defs = Definitions(
assets=[raw_sales],
resources={
"database": DatabaseResource(
host="warehouse.internal",
database="analytics",
user="reader",
password={"env": "DB_PASSWORD"},
),
},
)
Testing Workflows
Airflow DAG Tests
import pytest
from airflow.models import DagBag
def test_dag_integrity():
"""Ensure all DAGs parse without errors."""
dag_bag = DagBag(include_examples=False)
assert len(dag_bag.import_errors) == 0, (
f"DAG import errors: {dag_bag.import_errors}"
)
def test_dag_structure():
"""Verify specific DAG properties."""
dag_bag = DagBag(include_examples=False)
dag = dag_bag.get_dag("daily_sales_pipeline")
assert dag is not None
assert dag.schedule_interval == "0 6 * * *"
assert len(dag.tasks) == 5
# Verify dependency chain
extract = dag.get_task("extract_sales_data")
validate = dag.get_task("validate_data")
assert validate in extract.downstream_list
def test_task_callable():
"""Test individual task functions in isolation."""
result = extract_from_source(date="2026-03-28")
assert result is not None
assert len(result) > 0
Prefect Flow Tests
from prefect.testing.utilities import prefect_test_harness
def test_etl_pipeline():
with prefect_test_harness():
result = etl_pipeline(source="test_db", date="2026-03-28")
# Flows return the result of the last expression
def test_extract_task():
# Tasks can be called directly as regular functions
result = extract_data.fn(source="test_db", date="2026-03-28")
assert result["records"] > 0
Dagster Asset Tests
from dagster import materialize, build_asset_context
def test_raw_sales():
context = build_asset_context(
partition_key="2026-03-28",
)
result = raw_sales(context)
assert len(result) > 0
assert "order_id" in result.columns
def test_full_pipeline():
result = materialize(
[raw_sales, clean_sales, daily_revenue],
partition_key="2026-03-28",
)
assert result.success
Migration Strategies
Airflow → Prefect
The main shift: from DAG files with operators to decorated Python functions.
# Airflow style
extract = PythonOperator(task_id="extract", python_callable=extract_fn)
transform = PythonOperator(task_id="transform", python_callable=transform_fn)
extract >> transform
# Prefect equivalent
@flow
def pipeline():
data = extract_fn() # These are now @task-decorated functions
result = transform_fn(data)
Migration approach:
- Extract business logic from Airflow operators into standalone functions
- Wrap functions with
@taskdecorators - Create
@flowfunctions that call tasks with data dependencies - Run both systems in parallel during transition
- Cut over DAG by DAG
Airflow → Dagster
The bigger shift: from task-centric to asset-centric thinking.
# Airflow: "run extract, then transform, then load"
# Dagster: "I want these data assets to exist and be fresh"
# Step 1: Map Airflow tasks to Dagster assets
# extract_task → raw_data asset
# transform_task → clean_data asset
# load_task → warehouse_table asset
# Step 2: Define dependencies as asset inputs
@asset
def raw_data(): ...
@asset(ins={"raw_data": AssetIn()})
def clean_data(raw_data): ...
Choosing the Right Engine
What's your primary use case?
├── Scheduled data pipelines (ETL/ELT)
│ ├── Large team, many pipelines → Airflow (ecosystem, community)
│ ├── Small team, quick iteration → Prefect (simpler setup)
│ └── Asset-focused analytics → Dagster (data quality built in)
├── ML pipeline orchestration
│ ├── Training + serving → Dagster or Prefect
│ └── Experiment tracking focus → Consider MLflow + lighter engine
├── General task orchestration
│ ├── Need distributed execution → Airflow (Celery/K8s executor)
│ └── Moderate scale → Prefect (work pools)
└── Event-driven workflows
└── Prefect (native event triggers)
Performance and Scaling
| Dimension | Airflow | Prefect | Dagster |
|---|---|---|---|
| Max concurrent tasks | Executor-dependent (100s with Celery) | Work pool-dependent | Run coordinator-dependent |
| Scheduler overhead | ~5-10s per DAG parse cycle | Minimal (agent-based) | Minimal (daemon-based) |
| Metadata DB load | High at scale (needs tuning) | Moderate (cloud-managed option) | Moderate |
| K8s native | KubernetesExecutor | K8s work pool | K8s run launcher |
| Horizontal scaling | Add workers | Add agents | Add run coordinators |
One thing to remember: The three major Python workflow engines serve different philosophies — Airflow orchestrates tasks with the largest ecosystem, Prefect modernizes the pattern with Python-native decorators, and Dagster rethinks the problem by focusing on data assets rather than execution steps. Choose based on how your team thinks about the work, not just feature checklists.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.