Python API Load Testing — Deep Dive
Technical foundation
Load testing is the empirical arm of performance engineering. While profiling examines individual request behavior, load testing reveals system behavior under concurrency — contention, resource exhaustion, queuing effects, and cascading failures that only appear when many requests compete for shared resources.
Advanced Locust patterns
Sequential task flows
Real users follow sequences: browse → search → view → add to cart → checkout. Model this with SequentialTaskSet:
from locust import HttpUser, SequentialTaskSet, task, between
class ShoppingFlow(SequentialTaskSet):
@task
def browse_catalog(self):
response = self.client.get("/products?category=electronics&limit=20")
products = response.json()["data"]
if products:
self.product_id = products[0]["id"]
@task
def view_product(self):
if hasattr(self, "product_id"):
self.client.get(f"/products/{self.product_id}")
@task
def add_to_cart(self):
if hasattr(self, "product_id"):
self.client.post("/cart/items", json={
"product_id": self.product_id,
"quantity": 1,
})
@task
def checkout(self):
response = self.client.post("/orders", json={
"payment_method": "card",
"card_token": "tok_test_visa",
})
if response.status_code == 201:
self.interrupt() # End this flow, start a new one
class EcommerceUser(HttpUser):
wait_time = between(2, 5)
tasks = [ShoppingFlow]
def on_start(self):
self.client.post("/auth/login", json={
"email": f"user{self.environment.runner.user_count}@test.com",
"password": "testpass",
})
Data-driven tests with CSV
Avoid testing with identical data by loading realistic test data:
import csv
import random
class RealisticUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
with open("test_data/users.csv") as f:
self.users = list(csv.DictReader(f))
with open("test_data/products.csv") as f:
self.products = list(csv.DictReader(f))
user = random.choice(self.users)
response = self.client.post("/auth/login", json={
"email": user["email"],
"password": user["password"],
})
self.token = response.json()["access_token"]
self.client.headers["Authorization"] = f"Bearer {self.token}"
@task
def search_product(self):
product = random.choice(self.products)
self.client.get(f"/products/search?q={product['name']}")
@task
def view_random_product(self):
product = random.choice(self.products)
self.client.get(f"/products/{product['id']}")
Custom metrics and event tracking
Track business-specific metrics alongside HTTP metrics:
from locust import events
import time
class OrderUser(HttpUser):
@task
def create_order(self):
start = time.perf_counter()
response = self.client.post("/orders", json={"product_id": 42, "quantity": 1})
if response.status_code == 201:
order = response.json()
# Track custom metric: time to order confirmation
events.request.fire(
request_type="BUSINESS",
name="order_e2e_time",
response_time=(time.perf_counter() - start) * 1000,
response_length=0,
exception=None,
context={},
)
elif response.status_code == 409:
events.request.fire(
request_type="BUSINESS",
name="order_out_of_stock",
response_time=0,
response_length=0,
exception=None,
context={},
)
Distributed load testing
A single machine typically generates 1,000–5,000 concurrent users. For larger tests, distribute across multiple machines:
# Master node
locust -f locustfile.py --master --host http://api.staging.example.com
# Worker nodes (run on multiple machines)
locust -f locustfile.py --worker --master-host 10.0.0.1
Each worker runs user simulations independently. The master aggregates metrics. With 10 workers, you can simulate 50,000+ concurrent users.
For containerized environments:
# docker-compose.yml for distributed load testing
services:
master:
image: locustio/locust
command: -f /mnt/locustfile.py --master
ports:
- "8089:8089"
volumes:
- ./:/mnt
worker:
image: locustio/locust
command: -f /mnt/locustfile.py --worker --master-host master
volumes:
- ./:/mnt
deploy:
replicas: 8
Load test profiles
Ramp-up pattern
from locust import LoadTestShape
class StepLoadShape(LoadTestShape):
"""Increase users in steps to find the breaking point."""
step_time = 60 # seconds per step
step_load = 50 # users added per step
spawn_rate = 10 # users per second during ramp
time_limit = 600 # total test duration
def tick(self):
run_time = self.get_run_time()
if run_time > self.time_limit:
return None
current_step = run_time // self.step_time
return (int(current_step * self.step_load), self.spawn_rate)
Spike pattern
class SpikeLoadShape(LoadTestShape):
"""Simulate a traffic spike."""
stages = [
{"duration": 60, "users": 100, "spawn_rate": 10}, # Normal
{"duration": 10, "users": 1000, "spawn_rate": 200}, # Spike
{"duration": 120, "users": 1000, "spawn_rate": 200}, # Sustained peak
{"duration": 30, "users": 100, "spawn_rate": 50}, # Recovery
{"duration": 60, "users": 100, "spawn_rate": 10}, # Back to normal
]
def tick(self):
run_time = self.get_run_time()
elapsed = 0
for stage in self.stages:
elapsed += stage["duration"]
if run_time < elapsed:
return (stage["users"], stage["spawn_rate"])
return None
CI/CD integration
Run load tests automatically and fail the pipeline if performance regresses:
# run_load_test.py
import subprocess
import json
import sys
# Run Locust in headless mode
result = subprocess.run([
"locust", "-f", "locustfile.py",
"--headless",
"--host", "http://staging.example.com",
"--users", "200",
"--spawn-rate", "20",
"--run-time", "5m",
"--csv", "results/load_test",
"--only-summary",
], capture_output=True, text=True)
# Parse results
with open("results/load_test_stats.csv") as f:
import csv
reader = csv.DictReader(f)
for row in reader:
if row["Name"] == "Aggregated":
p95 = float(row["95%"])
error_rate = float(row["Failure Count"]) / float(row["Request Count"]) * 100
rps = float(row["Requests/s"])
# Assert against baselines
THRESHOLDS = {
"p95_ms": 500,
"error_rate_pct": 1.0,
"min_rps": 150,
}
failures = []
if p95 > THRESHOLDS["p95_ms"]:
failures.append(f"P95 latency {p95}ms exceeds {THRESHOLDS['p95_ms']}ms")
if error_rate > THRESHOLDS["error_rate_pct"]:
failures.append(f"Error rate {error_rate}% exceeds {THRESHOLDS['error_rate_pct']}%")
if rps < THRESHOLDS["min_rps"]:
failures.append(f"RPS {rps} below minimum {THRESHOLDS['min_rps']}")
if failures:
print("❌ Load test FAILED:")
for f in failures:
print(f" - {f}")
sys.exit(1)
else:
print(f"✅ Load test PASSED: P95={p95}ms, errors={error_rate}%, RPS={rps}")
Identifying bottlenecks from load test data
When load tests reveal problems, here is the diagnostic decision tree:
CPU maxed out (>90%)?
- Profile with
py-spyduring the load test:py-spy top --pid <uvicorn_pid> - Look for CPU-bound code: serialization, compression, computation
- Solution: optimize hot paths, add workers, or move CPU work to background tasks
Database connections exhausted?
- Check pool size vs concurrent requests
- Look for long-running transactions holding connections
- Solution: increase pool size, add connection timeouts, optimize slow queries
Memory growing continuously?
- Indicates a memory leak (objects not being garbage collected)
- Profile with
memrayortracemalloc - Common culprits: growing caches without TTL, accumulated event handlers, circular references
Response times increase linearly with users?
- Indicates a serialization bottleneck (single-threaded work)
- Solution: more Uvicorn workers, async where blocking, or switch to orjson
Errors appear at specific user count?
- Check file descriptor limits (
ulimit -n), connection limits, or rate limiting - Solution: increase OS limits, tune connection pools, adjust rate limiters
Realistic test environment
The test environment must mirror production or results are meaningless:
- Same database engine and schema (not SQLite for a PostgreSQL production system)
- Similar data volume (an empty database benchmarks differently than one with 10 million rows)
- Same network topology (test internal latency if services communicate over the network)
- Same resource limits (CPU, memory, connection pools)
The most common load testing mistake is testing against an environment that does not match production and drawing conclusions that do not apply.
Benchmarking Python ASGI servers
Different server configurations dramatically affect throughput:
# Single worker (development)
uvicorn app:app --host 0.0.0.0 --port 8000
# ~500-1000 RPS for simple endpoints
# Multiple workers (production)
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4
# ~2000-4000 RPS
# Gunicorn with Uvicorn workers
gunicorn app:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
# ~2500-5000 RPS
# With uvloop (faster event loop)
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 4 --loop uvloop
# ~3000-6000 RPS
Worker count should equal 2x CPU cores for I/O-bound workloads. Test different configurations during load tests to find your optimal setup.
The one thing to remember: Load test with realistic user flows and data, automate tests in CI/CD with regression thresholds, use distributed Locust for large-scale tests, and diagnose bottlenecks systematically — CPU, memory, connections, and serialization each have distinct signatures and solutions.
See Also
- Python Api Authentication Comparison API keys, JWTs, OAuth, and sessions — four ways Python APIs verify who is knocking at the door.
- Python Api Caching Layers Why Python APIs remember answers to common questions — like a teacher who writes frequent answers on the whiteboard.
- Python Api Error Handling Standards Why good error messages from your Python API are like clear road signs — they tell callers exactly what went wrong and what to do next.
- Python Api Monitoring Observability How Python APIs keep track of their own health — like a car dashboard that warns you before the engine overheats.
- Python Request Validation Patterns How Python APIs check incoming data before trusting it — like a bouncer checking IDs at the door.