Performance Testing Patterns — Deep Dive

Benchmark regression detection in CI

pytest-benchmark can save results and compare across runs to catch regressions automatically:

# Generate baseline on main branch
pytest tests/benchmarks/ --benchmark-save=baseline

# On PR branch, compare against baseline
pytest tests/benchmarks/ --benchmark-compare=0001_baseline --benchmark-compare-fail=mean:10%

The --benchmark-compare-fail=mean:10% flag fails the build if any benchmark degrades by more than 10%. Integrate this into CI:

# .github/workflows/benchmarks.yml
jobs:
  benchmark:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - name: Checkout baseline benchmarks
        run: |
          git stash
          git checkout origin/main
          pytest tests/benchmarks/ --benchmark-save=baseline --benchmark-json=baseline.json
          git checkout -
          git stash pop || true

      - name: Run current benchmarks
        run: |
          pytest tests/benchmarks/ \
            --benchmark-compare=0001_baseline \
            --benchmark-compare-fail=mean:15% \
            --benchmark-json=current.json

      - name: Upload benchmark results
        uses: actions/upload-artifact@v4
        with:
          name: benchmark-results
          path: "*.json"

Advanced Locust patterns

Sequential task flows

Real users follow patterns — login, browse, add to cart, checkout. Model this with SequentialTaskSet:

from locust import HttpUser, SequentialTaskSet, task, between


class CheckoutFlow(SequentialTaskSet):
    @task
    def login(self):
        self.client.post("/api/auth/login", json={
            "email": "loadtest@example.com",
            "password": "testpass123",
        })

    @task
    def browse_products(self):
        response = self.client.get("/api/products?page=1&limit=20")
        products = response.json()["items"]
        if products:
            self.product_id = products[0]["id"]

    @task
    def view_product(self):
        if hasattr(self, "product_id"):
            self.client.get(f"/api/products/{self.product_id}")

    @task
    def add_to_cart(self):
        if hasattr(self, "product_id"):
            self.client.post("/api/cart/items", json={
                "product_id": self.product_id,
                "quantity": 1,
            })

    @task
    def checkout(self):
        self.client.post("/api/checkout", json={
            "payment_token": "tok_test_visa",
        })
        self.interrupt()  # Return to parent, start over


class EcommerceUser(HttpUser):
    wait_time = between(2, 5)
    tasks = [CheckoutFlow]

Custom load shapes

Instead of constant load, simulate realistic traffic patterns:

from locust import LoadTestShape


class PeakHourShape(LoadTestShape):
    """Ramp up to peak, hold, ramp down — simulating business hours."""

    stages = [
        {"duration": 120, "users": 50, "spawn_rate": 5},    # Warm up
        {"duration": 300, "users": 200, "spawn_rate": 10},   # Morning ramp
        {"duration": 600, "users": 500, "spawn_rate": 20},   # Peak hours
        {"duration": 300, "users": 200, "spawn_rate": 10},   # Afternoon decline
        {"duration": 120, "users": 50, "spawn_rate": 5},     # Wind down
    ]

    def tick(self):
        run_time = self.get_run_time()
        elapsed = 0
        for stage in self.stages:
            elapsed += stage["duration"]
            if run_time < elapsed:
                return (stage["users"], stage["spawn_rate"])
        return None  # Stop the test

Distributed load generation

For serious load, run Locust across multiple workers:

# Master node
locust -f locustfile.py --master --expect-workers=4

# Worker nodes (on separate machines)
locust -f locustfile.py --worker --master-host=10.0.0.1

Each worker generates its share of users. Four workers with 250 users each simulate 1,000 concurrent users. Workers report metrics back to the master for aggregated dashboards.

Profiling under load

Combine profiling with load testing to identify bottlenecks under realistic conditions:

# profile_under_load.py
import cProfile
import pstats
from io import StringIO
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager

app = FastAPI()
profiler = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global profiler
    profiler = cProfile.Profile()
    profiler.enable()
    yield
    profiler.disable()
    stream = StringIO()
    stats = pstats.Stats(profiler, stream=stream)
    stats.sort_stats("cumulative")
    stats.print_stats(30)
    print(stream.getvalue())
    stats.dump_stats("load_profile.prof")

Then visualize with snakeviz:

pip install snakeviz
snakeviz load_profile.prof

For production profiling without overhead, use py-spy:

pip install py-spy
# Attach to running process
py-spy record -o profile.svg --pid 12345 --duration 60

This generates a flame graph SVG showing where CPU time is spent — no code changes required.

Database performance testing

Database queries are the most common performance bottleneck. Test them explicitly:

import pytest
import time


class TestQueryPerformance:
    @pytest.fixture(autouse=True)
    def setup_large_dataset(self, db_session, product_factory):
        """Insert realistic data volumes."""
        product_factory.create_batch(50_000)
        db_session.commit()

    def test_product_search_scales(self, db_session):
        """Full-text search should complete in under 100ms with 50k products."""
        start = time.perf_counter()
        results = db_session.execute(
            "SELECT * FROM products WHERE name ILIKE '%widget%' LIMIT 20"
        ).fetchall()
        elapsed_ms = (time.perf_counter() - start) * 1000
        assert elapsed_ms < 100, f"Search took {elapsed_ms:.1f}ms"

    def test_order_aggregation_scales(self, db_session):
        """Monthly revenue aggregation should complete in under 200ms."""
        start = time.perf_counter()
        db_session.execute(
            """
            SELECT date_trunc('month', created_at) AS month,
                   SUM(total) AS revenue
            FROM orders
            GROUP BY month
            ORDER BY month DESC
            LIMIT 12
            """
        ).fetchall()
        elapsed_ms = (time.perf_counter() - start) * 1000
        assert elapsed_ms < 200, f"Aggregation took {elapsed_ms:.1f}ms"

Memory performance testing

Memory leaks are performance bugs that only show up over time:

import tracemalloc
import pytest


@pytest.fixture
def memory_tracker():
    tracemalloc.start()
    yield
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics("lineno")
    total_mb = sum(s.size for s in top_stats) / 1024 / 1024
    tracemalloc.stop()
    assert total_mb < 50, f"Memory usage: {total_mb:.1f}MB exceeds 50MB limit"


def test_batch_processing_memory(memory_tracker):
    """Processing 100k records should not exceed 50MB."""
    results = []
    for i in range(100_000):
        record = process_record({"id": i, "data": f"value_{i}"})
        results.append(record)

For long-running soak tests, track memory over time:

import psutil
import time
import csv


def soak_test(duration_hours: float = 2.0):
    process = psutil.Process()
    start = time.time()
    end = start + duration_hours * 3600

    with open("soak_results.csv", "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(["elapsed_min", "rss_mb", "vms_mb", "connections"])

        while time.time() < end:
            mem = process.memory_info()
            conns = len(process.connections())
            elapsed = (time.time() - start) / 60
            writer.writerow([
                f"{elapsed:.1f}",
                f"{mem.rss / 1024 / 1024:.1f}",
                f"{mem.vms / 1024 / 1024:.1f}",
                conns,
            ])
            # Simulate normal workload
            run_typical_requests(count=100)
            time.sleep(60)

Performance budgets

Define explicit budgets and enforce them:

# performance_budgets.py
BUDGETS = {
    "api/products/list": {"p50_ms": 50, "p99_ms": 200, "max_mb": 10},
    "api/orders/create": {"p50_ms": 100, "p99_ms": 500, "max_mb": 5},
    "api/search": {"p50_ms": 80, "p99_ms": 300, "max_mb": 20},
    "batch/daily_report": {"p50_ms": 30000, "p99_ms": 60000, "max_mb": 512},
}


def check_budget(endpoint: str, p50_ms: float, p99_ms: float, peak_mb: float):
    budget = BUDGETS.get(endpoint)
    if not budget:
        return True

    violations = []
    if p50_ms > budget["p50_ms"]:
        violations.append(f"p50 {p50_ms:.0f}ms > {budget['p50_ms']}ms")
    if p99_ms > budget["p99_ms"]:
        violations.append(f"p99 {p99_ms:.0f}ms > {budget['p99_ms']}ms")
    if peak_mb > budget["max_mb"]:
        violations.append(f"memory {peak_mb:.0f}MB > {budget['max_mb']}MB")

    if violations:
        raise AssertionError(
            f"Performance budget violated for {endpoint}: "
            + ", ".join(violations)
        )

The one thing to remember: A complete performance testing strategy layers micro-benchmarks in CI for regression detection, load tests with realistic traffic shapes for capacity validation, profiling for bottleneck diagnosis, and explicit budgets to prevent gradual degradation.

pythontestingperformance

See Also