Python API Load Testing — Deep Dive

Technical foundation

Load testing is the empirical arm of performance engineering. While profiling examines individual request behavior, load testing reveals system behavior under concurrency — contention, resource exhaustion, queuing effects, and cascading failures that only appear when many requests compete for shared resources.

Advanced Locust patterns

Sequential task flows

Real users follow sequences: browse → search → view → add to cart → checkout. Model this with SequentialTaskSet:

from locust import HttpUser, SequentialTaskSet, task, between

class ShoppingFlow(SequentialTaskSet):
    @task
    def browse_catalog(self):
        response = self.client.get("/products?category=electronics&limit=20")
        products = response.json()["data"]
        if products:
            self.product_id = products[0]["id"]
    
    @task
    def view_product(self):
        if hasattr(self, "product_id"):
            self.client.get(f"/products/{self.product_id}")
    
    @task
    def add_to_cart(self):
        if hasattr(self, "product_id"):
            self.client.post("/cart/items", json={
                "product_id": self.product_id,
                "quantity": 1,
            })
    
    @task
    def checkout(self):
        response = self.client.post("/orders", json={
            "payment_method": "card",
            "card_token": "tok_test_visa",
        })
        if response.status_code == 201:
            self.interrupt()  # End this flow, start a new one

class EcommerceUser(HttpUser):
    wait_time = between(2, 5)
    tasks = [ShoppingFlow]
    
    def on_start(self):
        self.client.post("/auth/login", json={
            "email": f"user{self.environment.runner.user_count}@test.com",
            "password": "testpass",
        })

Data-driven tests with CSV

Avoid testing with identical data by loading realistic test data:

import csv
import random

class RealisticUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        with open("test_data/users.csv") as f:
            self.users = list(csv.DictReader(f))
        with open("test_data/products.csv") as f:
            self.products = list(csv.DictReader(f))
        
        user = random.choice(self.users)
        response = self.client.post("/auth/login", json={
            "email": user["email"],
            "password": user["password"],
        })
        self.token = response.json()["access_token"]
        self.client.headers["Authorization"] = f"Bearer {self.token}"
    
    @task
    def search_product(self):
        product = random.choice(self.products)
        self.client.get(f"/products/search?q={product['name']}")
    
    @task
    def view_random_product(self):
        product = random.choice(self.products)
        self.client.get(f"/products/{product['id']}")

Custom metrics and event tracking

Track business-specific metrics alongside HTTP metrics:

from locust import events
import time

class OrderUser(HttpUser):
    @task
    def create_order(self):
        start = time.perf_counter()
        
        response = self.client.post("/orders", json={"product_id": 42, "quantity": 1})
        
        if response.status_code == 201:
            order = response.json()
            # Track custom metric: time to order confirmation
            events.request.fire(
                request_type="BUSINESS",
                name="order_e2e_time",
                response_time=(time.perf_counter() - start) * 1000,
                response_length=0,
                exception=None,
                context={},
            )
        elif response.status_code == 409:
            events.request.fire(
                request_type="BUSINESS",
                name="order_out_of_stock",
                response_time=0,
                response_length=0,
                exception=None,
                context={},
            )

Distributed load testing

A single machine typically generates 1,000–5,000 concurrent users. For larger tests, distribute across multiple machines:

# Master node
locust -f locustfile.py --master --host http://api.staging.example.com

# Worker nodes (run on multiple machines)
locust -f locustfile.py --worker --master-host 10.0.0.1

Each worker runs user simulations independently. The master aggregates metrics. With 10 workers, you can simulate 50,000+ concurrent users.

For containerized environments:

# docker-compose.yml for distributed load testing
services:
  master:
    image: locustio/locust
    command: -f /mnt/locustfile.py --master
    ports:
      - "8089:8089"
    volumes:
      - ./:/mnt

  worker:
    image: locustio/locust
    command: -f /mnt/locustfile.py --worker --master-host master
    volumes:
      - ./:/mnt
    deploy:
      replicas: 8

Load test profiles

Ramp-up pattern

from locust import LoadTestShape

class StepLoadShape(LoadTestShape):
    """Increase users in steps to find the breaking point."""
    step_time = 60       # seconds per step
    step_load = 50       # users added per step
    spawn_rate = 10      # users per second during ramp
    time_limit = 600     # total test duration

    def tick(self):
        run_time = self.get_run_time()
        if run_time > self.time_limit:
            return None
        current_step = run_time // self.step_time
        return (int(current_step * self.step_load), self.spawn_rate)

Spike pattern

class SpikeLoadShape(LoadTestShape):
    """Simulate a traffic spike."""
    stages = [
        {"duration": 60, "users": 100, "spawn_rate": 10},    # Normal
        {"duration": 10, "users": 1000, "spawn_rate": 200},   # Spike
        {"duration": 120, "users": 1000, "spawn_rate": 200},  # Sustained peak
        {"duration": 30, "users": 100, "spawn_rate": 50},     # Recovery
        {"duration": 60, "users": 100, "spawn_rate": 10},     # Back to normal
    ]

    def tick(self):
        run_time = self.get_run_time()
        elapsed = 0
        for stage in self.stages:
            elapsed += stage["duration"]
            if run_time < elapsed:
                return (stage["users"], stage["spawn_rate"])
        return None

CI/CD integration

Run load tests automatically and fail the pipeline if performance regresses:

# run_load_test.py
import subprocess
import json
import sys

# Run Locust in headless mode
result = subprocess.run([
    "locust", "-f", "locustfile.py",
    "--headless",
    "--host", "http://staging.example.com",
    "--users", "200",
    "--spawn-rate", "20",
    "--run-time", "5m",
    "--csv", "results/load_test",
    "--only-summary",
], capture_output=True, text=True)

# Parse results
with open("results/load_test_stats.csv") as f:
    import csv
    reader = csv.DictReader(f)
    for row in reader:
        if row["Name"] == "Aggregated":
            p95 = float(row["95%"])
            error_rate = float(row["Failure Count"]) / float(row["Request Count"]) * 100
            rps = float(row["Requests/s"])

# Assert against baselines
THRESHOLDS = {
    "p95_ms": 500,
    "error_rate_pct": 1.0,
    "min_rps": 150,
}

failures = []
if p95 > THRESHOLDS["p95_ms"]:
    failures.append(f"P95 latency {p95}ms exceeds {THRESHOLDS['p95_ms']}ms")
if error_rate > THRESHOLDS["error_rate_pct"]:
    failures.append(f"Error rate {error_rate}% exceeds {THRESHOLDS['error_rate_pct']}%")
if rps < THRESHOLDS["min_rps"]:
    failures.append(f"RPS {rps} below minimum {THRESHOLDS['min_rps']}")

if failures:
    print("❌ Load test FAILED:")
    for f in failures:
        print(f"  - {f}")
    sys.exit(1)
else:
    print(f"✅ Load test PASSED: P95={p95}ms, errors={error_rate}%, RPS={rps}")

Identifying bottlenecks from load test data

When load tests reveal problems, here is the diagnostic decision tree:

CPU maxed out (>90%)?

  • Profile with py-spy during the load test: py-spy top --pid <uvicorn_pid>
  • Look for CPU-bound code: serialization, compression, computation
  • Solution: optimize hot paths, add workers, or move CPU work to background tasks

Database connections exhausted?

  • Check pool size vs concurrent requests
  • Look for long-running transactions holding connections
  • Solution: increase pool size, add connection timeouts, optimize slow queries

Memory growing continuously?

  • Indicates a memory leak (objects not being garbage collected)
  • Profile with memray or tracemalloc
  • Common culprits: growing caches without TTL, accumulated event handlers, circular references

Response times increase linearly with users?

  • Indicates a serialization bottleneck (single-threaded work)
  • Solution: more Uvicorn workers, async where blocking, or switch to orjson

Errors appear at specific user count?

  • Check file descriptor limits (ulimit -n), connection limits, or rate limiting
  • Solution: increase OS limits, tune connection pools, adjust rate limiters

Realistic test environment

The test environment must mirror production or results are meaningless:

  • Same database engine and schema (not SQLite for a PostgreSQL production system)
  • Similar data volume (an empty database benchmarks differently than one with 10 million rows)
  • Same network topology (test internal latency if services communicate over the network)
  • Same resource limits (CPU, memory, connection pools)

The most common load testing mistake is testing against an environment that does not match production and drawing conclusions that do not apply.

Benchmarking Python ASGI servers

Different server configurations dramatically affect throughput:

# Single worker (development)
uvicorn app:app --host 0.0.0.0 --port 8000
# ~500-1000 RPS for simple endpoints

# Multiple workers (production)
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
# ~2000-4000 RPS

# Gunicorn with Uvicorn workers
gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
# ~2500-5000 RPS

# With uvloop (faster event loop)
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop
# ~3000-6000 RPS

Worker count should equal 2x CPU cores for I/O-bound workloads. Test different configurations during load tests to find your optimal setup.

The one thing to remember: Load test with realistic user flows and data, automate tests in CI/CD with regression thresholds, use distributed Locust for large-scale tests, and diagnose bottlenecks systematically — CPU, memory, connections, and serialization each have distinct signatures and solutions.

pythonapiload-testingperformancelocust

See Also