Python Load Shedding — Deep Dive
Concurrency-Based Load Shedder
The simplest approach: track concurrent requests and reject when above capacity:
import asyncio
import time
from dataclasses import dataclass, field
from enum import IntEnum
class RequestPriority(IntEnum):
CRITICAL = 0 # Health checks, admin
HIGH = 1 # Payments, auth
NORMAL = 2 # Authenticated users
LOW = 3 # Anonymous, bots
@dataclass
class LoadShedder:
max_concurrent: int
_active: int = field(default=0, init=False)
_total_shed: int = field(default=0, init=False)
_total_served: int = field(default=0, init=False)
def should_shed(self, priority: RequestPriority = RequestPriority.NORMAL) -> bool:
# Critical requests are never shed
if priority == RequestPriority.CRITICAL:
return False
# Priority-aware: lower priority requests shed earlier
capacity_ratio = self._active / self.max_concurrent
shed_threshold = {
RequestPriority.HIGH: 0.95,
RequestPriority.NORMAL: 0.80,
RequestPriority.LOW: 0.60,
}
return capacity_ratio >= shed_threshold.get(priority, 0.80)
def acquire(self) -> bool:
self._active += 1
self._total_served += 1
return True
def release(self) -> None:
self._active = max(0, self._active - 1)
def record_shed(self) -> None:
self._total_shed += 1
@property
def stats(self) -> dict:
return {
"active": self._active,
"max": self.max_concurrent,
"utilization": self._active / self.max_concurrent,
"total_shed": self._total_shed,
"total_served": self._total_served,
"shed_rate": (
self._total_shed / (self._total_shed + self._total_served)
if (self._total_shed + self._total_served) > 0
else 0.0
),
}
ASGI Middleware for FastAPI
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
class LoadSheddingMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_concurrent: int = 100):
super().__init__(app)
self.shedder = LoadShedder(max_concurrent=max_concurrent)
async def dispatch(self, request: Request, call_next):
priority = self._classify_request(request)
if self.shedder.should_shed(priority):
self.shedder.record_shed()
return JSONResponse(
status_code=503,
content={
"error": "Service at capacity",
"retry_after": 5,
},
headers={"Retry-After": "5"},
)
self.shedder.acquire()
try:
response = await call_next(request)
return response
finally:
self.shedder.release()
def _classify_request(self, request: Request) -> RequestPriority:
path = request.url.path
if path.startswith("/health") or path.startswith("/admin"):
return RequestPriority.CRITICAL
if path.startswith("/api/checkout") or path.startswith("/api/payment"):
return RequestPriority.HIGH
if request.headers.get("Authorization"):
return RequestPriority.NORMAL
return RequestPriority.LOW
app = FastAPI()
app.add_middleware(LoadSheddingMiddleware, max_concurrent=200)
Event Loop Lag Detection
For asyncio applications, event loop lag is a strong signal of overload — it means coroutines are waiting longer than expected to be scheduled:
import asyncio
import time
class EventLoopLagMonitor:
"""Measures asyncio event loop lag."""
def __init__(self, check_interval: float = 1.0):
self.check_interval = check_interval
self._lag_ms: float = 0.0
self._running = False
async def start(self):
self._running = True
while self._running:
before = time.monotonic()
await asyncio.sleep(self.check_interval)
actual = time.monotonic() - before
self._lag_ms = (actual - self.check_interval) * 1000
# Clamp negative values (sleep returned early)
self._lag_ms = max(0, self._lag_ms)
def stop(self):
self._running = False
@property
def lag_ms(self) -> float:
return self._lag_ms
@property
def is_overloaded(self) -> bool:
return self._lag_ms > 100 # >100ms lag = trouble
Integrate it with the load shedder:
class AdaptiveLoadShedder:
"""Adjusts shedding threshold based on event loop lag."""
def __init__(self, base_max: int, lag_monitor: EventLoopLagMonitor):
self.base_max = base_max
self.lag_monitor = lag_monitor
self.shedder = LoadShedder(max_concurrent=base_max)
def should_shed(self, priority: RequestPriority) -> bool:
# Tighten thresholds when event loop is lagging
lag = self.lag_monitor.lag_ms
if lag > 200:
# Severe lag: shed everything except critical
return priority != RequestPriority.CRITICAL
elif lag > 100:
# Moderate lag: shed normal and low priority
return priority >= RequestPriority.NORMAL
return self.shedder.should_shed(priority)
Deadline-Based Shedding
Drop requests that have already waited too long to be useful:
from starlette.responses import JSONResponse
class DeadlineSheddingMiddleware(BaseHTTPMiddleware):
"""Rejects requests whose deadline has already passed."""
DEADLINE_HEADER = "X-Request-Deadline"
MAX_QUEUE_TIME = 10.0 # seconds
async def dispatch(self, request: Request, call_next):
# Check upstream deadline
deadline_str = request.headers.get(self.DEADLINE_HEADER)
if deadline_str:
deadline = float(deadline_str)
if time.monotonic() > deadline:
return JSONResponse(
status_code=504,
content={"error": "Request deadline exceeded"},
)
# Check local queue time (set by load balancer or ingress)
queue_time = request.headers.get("X-Queue-Time-Seconds")
if queue_time and float(queue_time) > self.MAX_QUEUE_TIME:
return JSONResponse(
status_code=503,
content={"error": "Request waited too long in queue"},
)
return await call_next(request)
Probabilistic Shedding with CoDel
Controlled Delay (CoDel) is an algorithm from networking that sheds load when queue latency stays high. It avoids shedding during brief spikes while catching sustained overload:
from collections import deque
from dataclasses import dataclass
@dataclass
class CoDelShedder:
"""CoDel-inspired load shedding based on queue sojourn time."""
target_ms: float = 50.0 # Target queue latency
interval_ms: float = 1000.0 # Measurement interval
_first_above: float = 0.0
_drop_next: float = 0.0
_dropping: bool = False
_drop_count: int = 0
def should_drop(self, sojourn_time_ms: float) -> bool:
now_ms = time.monotonic() * 1000
if sojourn_time_ms < self.target_ms:
self._first_above = 0.0
self._dropping = False
return False
if self._first_above == 0.0:
self._first_above = now_ms + self.interval_ms
return False
if not self._dropping:
if now_ms >= self._first_above:
self._dropping = True
self._drop_count = 1
self._drop_next = now_ms
return True
return False
if now_ms >= self._drop_next:
self._drop_count += 1
# CoDel uses inverse-sqrt spacing for drops
self._drop_next = now_ms + self.interval_ms / (
self._drop_count ** 0.5
)
return True
return False
Monitoring Load Shedding
from prometheus_client import Counter, Gauge, Histogram
requests_shed = Counter(
"http_requests_shed_total",
"Total requests shed due to overload",
["priority", "reason"],
)
active_requests = Gauge(
"http_requests_active",
"Currently active requests",
)
shed_rate = Gauge(
"http_shed_rate",
"Fraction of requests being shed (0.0-1.0)",
)
queue_latency = Histogram(
"http_queue_latency_seconds",
"Time requests spent in queue before processing",
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
Key alerts to set:
- Shed rate > 1% sustained for 5 minutes → investigate
- Shed rate > 10% → likely capacity issue, page the on-call
- Critical requests shed > 0 → configuration error (critical should never shed)
- Event loop lag > 500ms → severe overload
Load Testing Your Shedding Logic
import pytest
import asyncio
@pytest.mark.asyncio
async def test_sheds_low_priority_first():
shedder = LoadShedder(max_concurrent=10)
# Fill to 70% capacity
for _ in range(7):
shedder.acquire()
# Low priority should be shed at 60% threshold
assert shedder.should_shed(RequestPriority.LOW) is True
# Normal should not be shed yet (80% threshold)
assert shedder.should_shed(RequestPriority.NORMAL) is False
# Critical is never shed
assert shedder.should_shed(RequestPriority.CRITICAL) is False
@pytest.mark.asyncio
async def test_503_includes_retry_after():
app = FastAPI()
app.add_middleware(LoadSheddingMiddleware, max_concurrent=1)
@app.get("/test")
async def test_endpoint():
await asyncio.sleep(10)
return {"ok": True}
# Simulate overload by setting active above max
# (integration test with TestClient)
Production Tuning
-
Start conservative. Set
max_concurrenthigher than you think you need and tighten based on observed performance. It’s easier to lower the threshold than to explain why you rejected legitimate traffic. -
Differentiate by endpoint. A
/healthendpoint costs almost nothing; a/api/generate-reportmight use 100x more resources. Consider per-endpoint concurrency limits. -
Coordinate with your load balancer. If your load balancer retries 503 responses on a different backend, load shedding works beautifully — the request gets served by a less-loaded instance. Without retry, the user sees an error.
-
Account for warm-up. After a deploy or restart, your application might have cold caches and slower response times. Start with a lower concurrency limit and ramp up as caches warm.
One thing to remember: Load shedding protects the system’s ability to function, not individual requests. Measure your actual capacity under realistic load, set thresholds with margins, and treat sustained shedding as a signal to add capacity — not just a feature working as intended.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.