Python Batch vs Stream Processing — Deep Dive

Production systems rarely use pure batch or pure stream. Understanding how to implement both in Python—and how to combine them—lets you match the right approach to each pipeline’s requirements.

1) Batch processing patterns

Airflow-orchestrated batch pipeline

from airflow.decorators import dag, task
from datetime import datetime, timedelta

@dag(
    schedule="@hourly",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
)
def hourly_orders_batch():

    @task
    def extract(logical_date=None):
        import polars as pl
        hour_start = logical_date.replace(minute=0, second=0, microsecond=0)
        hour_end = hour_start + timedelta(hours=1)

        df = pl.read_parquet("s3://lake/raw/orders/")
        batch = df.filter(
            (pl.col("created_at") >= hour_start)
            & (pl.col("created_at") < hour_end)
        )
        path = f"s3://lake/staging/orders/{hour_start.isoformat()}.parquet"
        batch.write_parquet(path)
        return path

    @task
    def transform(staging_path: str):
        import polars as pl
        df = pl.read_parquet(staging_path)
        cleaned = (
            df.drop_nulls("order_id")
            .unique("order_id")
            .with_columns(
                pl.col("amount").round(2),
                pl.col("currency").str.to_uppercase(),
            )
        )
        output_path = staging_path.replace("staging", "silver")
        cleaned.write_parquet(output_path)
        return output_path

    @task
    def load(silver_path: str):
        from deltalake import DeltaTable, write_deltalake
        import pyarrow.parquet as pq

        table = pq.read_table(silver_path)
        target = "s3://warehouse/orders/"

        if DeltaTable.is_deltatable(target):
            dt = DeltaTable(target)
            (
                dt.merge(
                    source=table,
                    predicate="target.order_id = source.order_id",
                    source_alias="source",
                    target_alias="target",
                )
                .when_matched_update_all()
                .when_not_matched_insert_all()
                .execute()
            )
        else:
            write_deltalake(target, table)

    staging = extract()
    silver = transform(staging)
    load(silver)

hourly_orders_batch()

Batch processing strengths at scale

import polars as pl

def heavy_batch_aggregation(date: str):
    """
    Batch excels at complex operations over large datasets.
    This kind of multi-join, window-function workload
    would be extremely complex in a streaming context.
    """
    orders = pl.scan_parquet(f"s3://warehouse/orders/order_date={date}/**")
    customers = pl.scan_parquet("s3://warehouse/dim_customer/")
    products = pl.scan_parquet("s3://warehouse/dim_product/")

    result = (
        orders
        .join(customers, on="customer_id", how="left")
        .join(products, on="product_id", how="left")
        .group_by(["region", "product_category", "customer_tier"])
        .agg([
            pl.sum("amount").alias("total_revenue"),
            pl.count("order_id").alias("order_count"),
            pl.n_unique("customer_id").alias("unique_customers"),
            pl.mean("amount").alias("avg_order_value"),
            pl.col("amount").quantile(0.95).alias("p95_order_value"),
        ])
        .sort("total_revenue", descending=True)
        .collect()
    )
    return result

2) Stream processing patterns

Faust streaming application

import faust
from datetime import timedelta

app = faust.App(
    "order-processor",
    broker="kafka://kafka:9092",
    store="rocksdb://",
    topic_partitions=8,
)

class Order(faust.Record):
    order_id: int
    customer_id: int
    amount: float
    currency: str
    timestamp: float

orders_topic = app.topic("raw-orders", value_type=Order)

# Real-time aggregation: revenue per region per minute
revenue_table = app.Table(
    "revenue_per_minute",
    default=float,
).tumbling(timedelta(minutes=1), expires=timedelta(hours=1))

@app.agent(orders_topic)
async def process_orders(orders):
    async for order in orders:
        # Real-time validation
        if order.amount <= 0:
            print(f"Invalid order {order.order_id}: negative amount")
            continue

        # Update windowed aggregation
        revenue_table[order.currency] += order.amount

        # Forward to cleaned topic
        await cleaned_orders_topic.send(value=order)

@app.page("/revenue/")
async def revenue_view(web, request):
    """REST endpoint exposing real-time revenue."""
    return web.json({
        key: value.current()
        for key, value in revenue_table.items()
    })

Kafka consumer for event processing

from confluent_kafka import Consumer, KafkaError
import json
from datetime import datetime, timezone

def stream_processor(
    topic: str = "raw-orders",
    group_id: str = "order-enrichment",
):
    consumer = Consumer({
        "bootstrap.servers": "kafka:9092",
        "group.id": group_id,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": False,
        "max.poll.interval.ms": 300000,
    })
    consumer.subscribe([topic])

    batch = []
    BATCH_SIZE = 100
    FLUSH_INTERVAL = 5.0  # seconds
    last_flush = datetime.now(timezone.utc)

    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is not None and not msg.error():
                event = json.loads(msg.value())
                processed = enrich_event(event)
                batch.append(processed)

            # Micro-batch: flush when batch is full or interval elapsed
            elapsed = (datetime.now(timezone.utc) - last_flush).total_seconds()
            if len(batch) >= BATCH_SIZE or (batch and elapsed >= FLUSH_INTERVAL):
                write_batch_to_target(batch)
                consumer.commit()
                batch = []
                last_flush = datetime.now(timezone.utc)

    except KeyboardInterrupt:
        pass
    finally:
        if batch:
            write_batch_to_target(batch)
            consumer.commit()
        consumer.close()

Bytewax dataflow

import bytewax.operators as op
from bytewax.connectors.kafka import KafkaSource, KafkaSink
from bytewax.dataflow import Dataflow
import json

flow = Dataflow("order-processor")

source = KafkaSource(
    brokers=["kafka:9092"],
    topics=["raw-orders"],
    starting_offset="latest",
)
stream = op.input("kafka_in", flow, source)

def parse_order(msg):
    key, value = msg
    return json.loads(value)

def validate_order(order):
    return order.get("amount", 0) > 0

def enrich_order(order):
    order["processed_at"] = datetime.now(timezone.utc).isoformat()
    order["amount_usd"] = convert_to_usd(order["amount"], order["currency"])
    return order

parsed = op.map("parse", stream, parse_order)
valid = op.filter("validate", parsed, validate_order)
enriched = op.map("enrich", valid, enrich_order)

sink = KafkaSink(
    brokers=["kafka:9092"],
    topic="enriched-orders",
)
op.output("kafka_out", enriched, sink)

3) Hybrid architecture: Lambda pattern

# Batch layer: runs hourly, produces "correct" view
def batch_revenue_pipeline(hour: str):
    """Full recalculation from source of truth."""
    orders = pl.read_parquet(f"s3://lake/silver/orders/hour={hour}/")
    revenue = (
        orders.group_by(["region", "currency"])
        .agg(pl.sum("amount").alias("total"))
    )
    revenue.write_parquet(f"s3://warehouse/batch_revenue/hour={hour}/")

# Stream layer: runs continuously, produces "fast" approximate view
@app.agent(orders_topic)
async def speed_layer(orders):
    async for order in orders:
        # Update in-memory/Redis counters
        await redis.incrbyfloat(
            f"revenue:{order.region}:{order.currency}",
            order.amount,
        )

# Serving layer: merges batch + stream at query time
def get_revenue(region: str, currency: str) -> dict:
    # Batch: last complete hour
    batch_total = read_batch_revenue(region, currency)
    # Stream: current incomplete hour
    stream_delta = float(redis.get(f"revenue:{region}:{currency}") or 0)

    return {
        "batch_total": batch_total,
        "stream_delta": stream_delta,
        "combined": batch_total + stream_delta,
        "batch_freshness": "hourly",
        "stream_freshness": "seconds",
    }

4) Micro-batch: the practical middle ground

For teams that want near-real-time but find stream processing too complex:

from datetime import datetime, timedelta, timezone
import time

def micro_batch_loop(
    interval_seconds: int = 30,
    source_path: str = "s3://lake/raw/orders/",
    target_path: str = "s3://warehouse/orders/",
):
    """Run batch processing every N seconds."""
    store = WatermarkStore()

    while True:
        start = time.monotonic()

        watermark = store.get_value_as_datetime("micro_batch_orders")
        now = datetime.now(timezone.utc)

        # Read new data since watermark
        df = pl.read_parquet(source_path)
        if watermark:
            df = df.filter(pl.col("created_at") > watermark)

        if len(df) > 0:
            cleaned = transform(df)
            upsert_to_target(cleaned, target_path)
            new_watermark = df["created_at"].max()
            store.set(WatermarkState(
                pipeline_name="micro_batch_orders",
                last_value=new_watermark.isoformat(),
                value_type="timestamp",
                updated_at=now.isoformat(),
                row_count=len(cleaned),
            ))

        # Sleep remaining interval
        elapsed = time.monotonic() - start
        sleep_time = max(0, interval_seconds - elapsed)
        time.sleep(sleep_time)

5) Stream processing challenges

Exactly-once semantics

Stream processors must handle the case where a message is processed but the acknowledgment fails. Options:

StrategyHow it worksTrade-off
At-least-once + idempotent writesProcess may duplicate, writes deduplicateSimple, requires idempotent target
Kafka transactionsProducer + consumer in a single transactionComplex, Kafka-specific
CheckpointingSave processing state periodically, replay from checkpointReprocesses on recovery

Windowing

Streaming aggregations need windows to define “how much data to aggregate”:

# Tumbling window: fixed, non-overlapping
# [0:00-0:05] [0:05-0:10] [0:10-0:15]

# Sliding window: overlapping, triggered on each event
# [0:00-0:05] [0:01-0:06] [0:02-0:07]

# Session window: grouped by activity gaps
# User A: [click, click, click] → 5 min gap → [click, click]
# Two sessions for user A

Out-of-order events

Events may arrive out of order due to network delays. Watermarks in the streaming engine define how long to wait for late events before closing a window.

6) Decision framework

def recommend_architecture(
    latency_requirement: str,
    data_volume_gb_per_day: float,
    team_streaming_experience: bool,
    budget: str,
) -> str:
    if latency_requirement == "sub-second":
        return "stream (Faust/Bytewax/Kafka consumers)"
    
    if latency_requirement == "minutes" and data_volume_gb_per_day > 100:
        if team_streaming_experience:
            return "stream or micro-batch (Spark Structured Streaming)"
        return "micro-batch (frequent Airflow runs)"
    
    if latency_requirement == "hours":
        return "batch (Airflow + polars/PySpark)"
    
    if budget == "minimal":
        return "batch with cron (simplest possible)"
    
    return "hybrid: batch for historical, stream for real-time"

Production checklist

  • Latency requirement documented per pipeline (drives batch vs stream choice)
  • Batch pipelines are idempotent and retriable
  • Stream processors handle backpressure (slow consumers do not crash)
  • Exactly-once or at-least-once + idempotent semantics implemented
  • Windowing strategy defined for streaming aggregations
  • Late-arriving events handled (watermarks or lookback)
  • Monitoring covers lag, throughput, and error rate for both batch and stream
  • Reprocessing path exists (batch: rerun; stream: replay from offset)
  • Cost model understood (batch: pay-per-run; stream: always-on)
  • Hybrid serving layer merges batch and stream correctly if using Lambda

One thing to remember: the architecture choice is not batch versus stream—it is about matching processing latency to business value, and most production systems use both approaches for different parts of the same data platform.

pythonbatch-processingstream-processingdata-engineering

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Bentoml Model Serving See BentoML as a packaging-and-delivery system that turns your Python model into a dependable service others can call.