Python Load Shedding — Deep Dive

Concurrency-Based Load Shedder

The simplest approach: track concurrent requests and reject when above capacity:

import asyncio
import time
from dataclasses import dataclass, field
from enum import IntEnum

class RequestPriority(IntEnum):
    CRITICAL = 0    # Health checks, admin
    HIGH = 1        # Payments, auth
    NORMAL = 2      # Authenticated users
    LOW = 3         # Anonymous, bots

@dataclass
class LoadShedder:
    max_concurrent: int
    _active: int = field(default=0, init=False)
    _total_shed: int = field(default=0, init=False)
    _total_served: int = field(default=0, init=False)

    def should_shed(self, priority: RequestPriority = RequestPriority.NORMAL) -> bool:
        # Critical requests are never shed
        if priority == RequestPriority.CRITICAL:
            return False

        # Priority-aware: lower priority requests shed earlier
        capacity_ratio = self._active / self.max_concurrent
        shed_threshold = {
            RequestPriority.HIGH: 0.95,
            RequestPriority.NORMAL: 0.80,
            RequestPriority.LOW: 0.60,
        }
        return capacity_ratio >= shed_threshold.get(priority, 0.80)

    def acquire(self) -> bool:
        self._active += 1
        self._total_served += 1
        return True

    def release(self) -> None:
        self._active = max(0, self._active - 1)

    def record_shed(self) -> None:
        self._total_shed += 1

    @property
    def stats(self) -> dict:
        return {
            "active": self._active,
            "max": self.max_concurrent,
            "utilization": self._active / self.max_concurrent,
            "total_shed": self._total_shed,
            "total_served": self._total_served,
            "shed_rate": (
                self._total_shed / (self._total_shed + self._total_served)
                if (self._total_shed + self._total_served) > 0
                else 0.0
            ),
        }

ASGI Middleware for FastAPI

from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

class LoadSheddingMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, max_concurrent: int = 100):
        super().__init__(app)
        self.shedder = LoadShedder(max_concurrent=max_concurrent)

    async def dispatch(self, request: Request, call_next):
        priority = self._classify_request(request)

        if self.shedder.should_shed(priority):
            self.shedder.record_shed()
            return JSONResponse(
                status_code=503,
                content={
                    "error": "Service at capacity",
                    "retry_after": 5,
                },
                headers={"Retry-After": "5"},
            )

        self.shedder.acquire()
        try:
            response = await call_next(request)
            return response
        finally:
            self.shedder.release()

    def _classify_request(self, request: Request) -> RequestPriority:
        path = request.url.path

        if path.startswith("/health") or path.startswith("/admin"):
            return RequestPriority.CRITICAL
        if path.startswith("/api/checkout") or path.startswith("/api/payment"):
            return RequestPriority.HIGH
        if request.headers.get("Authorization"):
            return RequestPriority.NORMAL
        return RequestPriority.LOW

app = FastAPI()
app.add_middleware(LoadSheddingMiddleware, max_concurrent=200)

Event Loop Lag Detection

For asyncio applications, event loop lag is a strong signal of overload — it means coroutines are waiting longer than expected to be scheduled:

import asyncio
import time

class EventLoopLagMonitor:
    """Measures asyncio event loop lag."""

    def __init__(self, check_interval: float = 1.0):
        self.check_interval = check_interval
        self._lag_ms: float = 0.0
        self._running = False

    async def start(self):
        self._running = True
        while self._running:
            before = time.monotonic()
            await asyncio.sleep(self.check_interval)
            actual = time.monotonic() - before
            self._lag_ms = (actual - self.check_interval) * 1000
            # Clamp negative values (sleep returned early)
            self._lag_ms = max(0, self._lag_ms)

    def stop(self):
        self._running = False

    @property
    def lag_ms(self) -> float:
        return self._lag_ms

    @property
    def is_overloaded(self) -> bool:
        return self._lag_ms > 100  # >100ms lag = trouble

Integrate it with the load shedder:

class AdaptiveLoadShedder:
    """Adjusts shedding threshold based on event loop lag."""

    def __init__(self, base_max: int, lag_monitor: EventLoopLagMonitor):
        self.base_max = base_max
        self.lag_monitor = lag_monitor
        self.shedder = LoadShedder(max_concurrent=base_max)

    def should_shed(self, priority: RequestPriority) -> bool:
        # Tighten thresholds when event loop is lagging
        lag = self.lag_monitor.lag_ms
        if lag > 200:
            # Severe lag: shed everything except critical
            return priority != RequestPriority.CRITICAL
        elif lag > 100:
            # Moderate lag: shed normal and low priority
            return priority >= RequestPriority.NORMAL

        return self.shedder.should_shed(priority)

Deadline-Based Shedding

Drop requests that have already waited too long to be useful:

from starlette.responses import JSONResponse

class DeadlineSheddingMiddleware(BaseHTTPMiddleware):
    """Rejects requests whose deadline has already passed."""

    DEADLINE_HEADER = "X-Request-Deadline"
    MAX_QUEUE_TIME = 10.0  # seconds

    async def dispatch(self, request: Request, call_next):
        # Check upstream deadline
        deadline_str = request.headers.get(self.DEADLINE_HEADER)
        if deadline_str:
            deadline = float(deadline_str)
            if time.monotonic() > deadline:
                return JSONResponse(
                    status_code=504,
                    content={"error": "Request deadline exceeded"},
                )

        # Check local queue time (set by load balancer or ingress)
        queue_time = request.headers.get("X-Queue-Time-Seconds")
        if queue_time and float(queue_time) > self.MAX_QUEUE_TIME:
            return JSONResponse(
                status_code=503,
                content={"error": "Request waited too long in queue"},
            )

        return await call_next(request)

Probabilistic Shedding with CoDel

Controlled Delay (CoDel) is an algorithm from networking that sheds load when queue latency stays high. It avoids shedding during brief spikes while catching sustained overload:

from collections import deque
from dataclasses import dataclass

@dataclass
class CoDelShedder:
    """CoDel-inspired load shedding based on queue sojourn time."""
    target_ms: float = 50.0       # Target queue latency
    interval_ms: float = 1000.0   # Measurement interval
    _first_above: float = 0.0
    _drop_next: float = 0.0
    _dropping: bool = False
    _drop_count: int = 0

    def should_drop(self, sojourn_time_ms: float) -> bool:
        now_ms = time.monotonic() * 1000

        if sojourn_time_ms < self.target_ms:
            self._first_above = 0.0
            self._dropping = False
            return False

        if self._first_above == 0.0:
            self._first_above = now_ms + self.interval_ms
            return False

        if not self._dropping:
            if now_ms >= self._first_above:
                self._dropping = True
                self._drop_count = 1
                self._drop_next = now_ms
                return True
            return False

        if now_ms >= self._drop_next:
            self._drop_count += 1
            # CoDel uses inverse-sqrt spacing for drops
            self._drop_next = now_ms + self.interval_ms / (
                self._drop_count ** 0.5
            )
            return True

        return False

Monitoring Load Shedding

from prometheus_client import Counter, Gauge, Histogram

requests_shed = Counter(
    "http_requests_shed_total",
    "Total requests shed due to overload",
    ["priority", "reason"],
)

active_requests = Gauge(
    "http_requests_active",
    "Currently active requests",
)

shed_rate = Gauge(
    "http_shed_rate",
    "Fraction of requests being shed (0.0-1.0)",
)

queue_latency = Histogram(
    "http_queue_latency_seconds",
    "Time requests spent in queue before processing",
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)

Key alerts to set:

  • Shed rate > 1% sustained for 5 minutes → investigate
  • Shed rate > 10% → likely capacity issue, page the on-call
  • Critical requests shed > 0 → configuration error (critical should never shed)
  • Event loop lag > 500ms → severe overload

Load Testing Your Shedding Logic

import pytest
import asyncio

@pytest.mark.asyncio
async def test_sheds_low_priority_first():
    shedder = LoadShedder(max_concurrent=10)

    # Fill to 70% capacity
    for _ in range(7):
        shedder.acquire()

    # Low priority should be shed at 60% threshold
    assert shedder.should_shed(RequestPriority.LOW) is True
    # Normal should not be shed yet (80% threshold)
    assert shedder.should_shed(RequestPriority.NORMAL) is False
    # Critical is never shed
    assert shedder.should_shed(RequestPriority.CRITICAL) is False

@pytest.mark.asyncio
async def test_503_includes_retry_after():
    app = FastAPI()
    app.add_middleware(LoadSheddingMiddleware, max_concurrent=1)

    @app.get("/test")
    async def test_endpoint():
        await asyncio.sleep(10)
        return {"ok": True}

    # Simulate overload by setting active above max
    # (integration test with TestClient)

Production Tuning

  1. Start conservative. Set max_concurrent higher than you think you need and tighten based on observed performance. It’s easier to lower the threshold than to explain why you rejected legitimate traffic.

  2. Differentiate by endpoint. A /health endpoint costs almost nothing; a /api/generate-report might use 100x more resources. Consider per-endpoint concurrency limits.

  3. Coordinate with your load balancer. If your load balancer retries 503 responses on a different backend, load shedding works beautifully — the request gets served by a less-loaded instance. Without retry, the user sees an error.

  4. Account for warm-up. After a deploy or restart, your application might have cold caches and slower response times. Start with a lower concurrency limit and ramp up as caches warm.

One thing to remember: Load shedding protects the system’s ability to function, not individual requests. Measure your actual capacity under realistic load, set thresholds with margins, and treat sustained shedding as a signal to add capacity — not just a feature working as intended.

pythonreliabilityperformance

See Also