Python Batch vs Stream Processing — Deep Dive
Production systems rarely use pure batch or pure stream. Understanding how to implement both in Python—and how to combine them—lets you match the right approach to each pipeline’s requirements.
1) Batch processing patterns
Airflow-orchestrated batch pipeline
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule="@hourly",
start_date=datetime(2026, 1, 1),
catchup=False,
default_args={"retries": 2, "retry_delay": timedelta(minutes=5)},
)
def hourly_orders_batch():
@task
def extract(logical_date=None):
import polars as pl
hour_start = logical_date.replace(minute=0, second=0, microsecond=0)
hour_end = hour_start + timedelta(hours=1)
df = pl.read_parquet("s3://lake/raw/orders/")
batch = df.filter(
(pl.col("created_at") >= hour_start)
& (pl.col("created_at") < hour_end)
)
path = f"s3://lake/staging/orders/{hour_start.isoformat()}.parquet"
batch.write_parquet(path)
return path
@task
def transform(staging_path: str):
import polars as pl
df = pl.read_parquet(staging_path)
cleaned = (
df.drop_nulls("order_id")
.unique("order_id")
.with_columns(
pl.col("amount").round(2),
pl.col("currency").str.to_uppercase(),
)
)
output_path = staging_path.replace("staging", "silver")
cleaned.write_parquet(output_path)
return output_path
@task
def load(silver_path: str):
from deltalake import DeltaTable, write_deltalake
import pyarrow.parquet as pq
table = pq.read_table(silver_path)
target = "s3://warehouse/orders/"
if DeltaTable.is_deltatable(target):
dt = DeltaTable(target)
(
dt.merge(
source=table,
predicate="target.order_id = source.order_id",
source_alias="source",
target_alias="target",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
else:
write_deltalake(target, table)
staging = extract()
silver = transform(staging)
load(silver)
hourly_orders_batch()
Batch processing strengths at scale
import polars as pl
def heavy_batch_aggregation(date: str):
"""
Batch excels at complex operations over large datasets.
This kind of multi-join, window-function workload
would be extremely complex in a streaming context.
"""
orders = pl.scan_parquet(f"s3://warehouse/orders/order_date={date}/**")
customers = pl.scan_parquet("s3://warehouse/dim_customer/")
products = pl.scan_parquet("s3://warehouse/dim_product/")
result = (
orders
.join(customers, on="customer_id", how="left")
.join(products, on="product_id", how="left")
.group_by(["region", "product_category", "customer_tier"])
.agg([
pl.sum("amount").alias("total_revenue"),
pl.count("order_id").alias("order_count"),
pl.n_unique("customer_id").alias("unique_customers"),
pl.mean("amount").alias("avg_order_value"),
pl.col("amount").quantile(0.95).alias("p95_order_value"),
])
.sort("total_revenue", descending=True)
.collect()
)
return result
2) Stream processing patterns
Faust streaming application
import faust
from datetime import timedelta
app = faust.App(
"order-processor",
broker="kafka://kafka:9092",
store="rocksdb://",
topic_partitions=8,
)
class Order(faust.Record):
order_id: int
customer_id: int
amount: float
currency: str
timestamp: float
orders_topic = app.topic("raw-orders", value_type=Order)
# Real-time aggregation: revenue per region per minute
revenue_table = app.Table(
"revenue_per_minute",
default=float,
).tumbling(timedelta(minutes=1), expires=timedelta(hours=1))
@app.agent(orders_topic)
async def process_orders(orders):
async for order in orders:
# Real-time validation
if order.amount <= 0:
print(f"Invalid order {order.order_id}: negative amount")
continue
# Update windowed aggregation
revenue_table[order.currency] += order.amount
# Forward to cleaned topic
await cleaned_orders_topic.send(value=order)
@app.page("/revenue/")
async def revenue_view(web, request):
"""REST endpoint exposing real-time revenue."""
return web.json({
key: value.current()
for key, value in revenue_table.items()
})
Kafka consumer for event processing
from confluent_kafka import Consumer, KafkaError
import json
from datetime import datetime, timezone
def stream_processor(
topic: str = "raw-orders",
group_id: str = "order-enrichment",
):
consumer = Consumer({
"bootstrap.servers": "kafka:9092",
"group.id": group_id,
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"max.poll.interval.ms": 300000,
})
consumer.subscribe([topic])
batch = []
BATCH_SIZE = 100
FLUSH_INTERVAL = 5.0 # seconds
last_flush = datetime.now(timezone.utc)
try:
while True:
msg = consumer.poll(1.0)
if msg is not None and not msg.error():
event = json.loads(msg.value())
processed = enrich_event(event)
batch.append(processed)
# Micro-batch: flush when batch is full or interval elapsed
elapsed = (datetime.now(timezone.utc) - last_flush).total_seconds()
if len(batch) >= BATCH_SIZE or (batch and elapsed >= FLUSH_INTERVAL):
write_batch_to_target(batch)
consumer.commit()
batch = []
last_flush = datetime.now(timezone.utc)
except KeyboardInterrupt:
pass
finally:
if batch:
write_batch_to_target(batch)
consumer.commit()
consumer.close()
Bytewax dataflow
import bytewax.operators as op
from bytewax.connectors.kafka import KafkaSource, KafkaSink
from bytewax.dataflow import Dataflow
import json
flow = Dataflow("order-processor")
source = KafkaSource(
brokers=["kafka:9092"],
topics=["raw-orders"],
starting_offset="latest",
)
stream = op.input("kafka_in", flow, source)
def parse_order(msg):
key, value = msg
return json.loads(value)
def validate_order(order):
return order.get("amount", 0) > 0
def enrich_order(order):
order["processed_at"] = datetime.now(timezone.utc).isoformat()
order["amount_usd"] = convert_to_usd(order["amount"], order["currency"])
return order
parsed = op.map("parse", stream, parse_order)
valid = op.filter("validate", parsed, validate_order)
enriched = op.map("enrich", valid, enrich_order)
sink = KafkaSink(
brokers=["kafka:9092"],
topic="enriched-orders",
)
op.output("kafka_out", enriched, sink)
3) Hybrid architecture: Lambda pattern
# Batch layer: runs hourly, produces "correct" view
def batch_revenue_pipeline(hour: str):
"""Full recalculation from source of truth."""
orders = pl.read_parquet(f"s3://lake/silver/orders/hour={hour}/")
revenue = (
orders.group_by(["region", "currency"])
.agg(pl.sum("amount").alias("total"))
)
revenue.write_parquet(f"s3://warehouse/batch_revenue/hour={hour}/")
# Stream layer: runs continuously, produces "fast" approximate view
@app.agent(orders_topic)
async def speed_layer(orders):
async for order in orders:
# Update in-memory/Redis counters
await redis.incrbyfloat(
f"revenue:{order.region}:{order.currency}",
order.amount,
)
# Serving layer: merges batch + stream at query time
def get_revenue(region: str, currency: str) -> dict:
# Batch: last complete hour
batch_total = read_batch_revenue(region, currency)
# Stream: current incomplete hour
stream_delta = float(redis.get(f"revenue:{region}:{currency}") or 0)
return {
"batch_total": batch_total,
"stream_delta": stream_delta,
"combined": batch_total + stream_delta,
"batch_freshness": "hourly",
"stream_freshness": "seconds",
}
4) Micro-batch: the practical middle ground
For teams that want near-real-time but find stream processing too complex:
from datetime import datetime, timedelta, timezone
import time
def micro_batch_loop(
interval_seconds: int = 30,
source_path: str = "s3://lake/raw/orders/",
target_path: str = "s3://warehouse/orders/",
):
"""Run batch processing every N seconds."""
store = WatermarkStore()
while True:
start = time.monotonic()
watermark = store.get_value_as_datetime("micro_batch_orders")
now = datetime.now(timezone.utc)
# Read new data since watermark
df = pl.read_parquet(source_path)
if watermark:
df = df.filter(pl.col("created_at") > watermark)
if len(df) > 0:
cleaned = transform(df)
upsert_to_target(cleaned, target_path)
new_watermark = df["created_at"].max()
store.set(WatermarkState(
pipeline_name="micro_batch_orders",
last_value=new_watermark.isoformat(),
value_type="timestamp",
updated_at=now.isoformat(),
row_count=len(cleaned),
))
# Sleep remaining interval
elapsed = time.monotonic() - start
sleep_time = max(0, interval_seconds - elapsed)
time.sleep(sleep_time)
5) Stream processing challenges
Exactly-once semantics
Stream processors must handle the case where a message is processed but the acknowledgment fails. Options:
| Strategy | How it works | Trade-off |
|---|---|---|
| At-least-once + idempotent writes | Process may duplicate, writes deduplicate | Simple, requires idempotent target |
| Kafka transactions | Producer + consumer in a single transaction | Complex, Kafka-specific |
| Checkpointing | Save processing state periodically, replay from checkpoint | Reprocesses on recovery |
Windowing
Streaming aggregations need windows to define “how much data to aggregate”:
# Tumbling window: fixed, non-overlapping
# [0:00-0:05] [0:05-0:10] [0:10-0:15]
# Sliding window: overlapping, triggered on each event
# [0:00-0:05] [0:01-0:06] [0:02-0:07]
# Session window: grouped by activity gaps
# User A: [click, click, click] → 5 min gap → [click, click]
# Two sessions for user A
Out-of-order events
Events may arrive out of order due to network delays. Watermarks in the streaming engine define how long to wait for late events before closing a window.
6) Decision framework
def recommend_architecture(
latency_requirement: str,
data_volume_gb_per_day: float,
team_streaming_experience: bool,
budget: str,
) -> str:
if latency_requirement == "sub-second":
return "stream (Faust/Bytewax/Kafka consumers)"
if latency_requirement == "minutes" and data_volume_gb_per_day > 100:
if team_streaming_experience:
return "stream or micro-batch (Spark Structured Streaming)"
return "micro-batch (frequent Airflow runs)"
if latency_requirement == "hours":
return "batch (Airflow + polars/PySpark)"
if budget == "minimal":
return "batch with cron (simplest possible)"
return "hybrid: batch for historical, stream for real-time"
Production checklist
- Latency requirement documented per pipeline (drives batch vs stream choice)
- Batch pipelines are idempotent and retriable
- Stream processors handle backpressure (slow consumers do not crash)
- Exactly-once or at-least-once + idempotent semantics implemented
- Windowing strategy defined for streaming aggregations
- Late-arriving events handled (watermarks or lookback)
- Monitoring covers lag, throughput, and error rate for both batch and stream
- Reprocessing path exists (batch: rerun; stream: replay from offset)
- Cost model understood (batch: pay-per-run; stream: always-on)
- Hybrid serving layer merges batch and stream correctly if using Lambda
One thing to remember: the architecture choice is not batch versus stream—it is about matching processing latency to business value, and most production systems use both approaches for different parts of the same data platform.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Bentoml Model Serving See BentoML as a packaging-and-delivery system that turns your Python model into a dependable service others can call.