Python API Rate Limit Handling — Deep Dive

System-level framing

A production application that consumes external APIs must treat rate limits as a first-class architectural concern. The challenge goes beyond retrying 429 responses — it involves tracking quota across multiple endpoints (often with different limits), coordinating access across distributed workers, degrading gracefully when limits are tight, and avoiding cascading failures when an API becomes temporarily unavailable. This is client-side traffic shaping, and getting it wrong leads to data loss, stale caches, and API key revocation.

Token bucket implementation

The token bucket algorithm is the most versatile rate limiter for API clients:

import asyncio
import time

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate          # Tokens added per second
        self.capacity = capacity  # Maximum burst size
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1):
        async with self._lock:
            self._refill()
            while self.tokens < tokens:
                wait_time = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self._refill()
            self.tokens -= tokens

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_refill = now

# Usage: 100 requests per minute = 1.667 tokens/sec, burst of 10
bucket = TokenBucket(rate=100/60, capacity=10)

async def make_request(url: str):
    await bucket.acquire()
    async with httpx.AsyncClient() as client:
        return await client.get(url)

The token bucket allows short bursts (up to capacity) while maintaining a steady average rate. This matches how most APIs work — they tolerate bursts but enforce a rolling window average.

Adaptive rate limiter with header tracking

A smarter approach reads rate limit headers and adjusts dynamically:

import httpx
from dataclasses import dataclass

@dataclass
class RateLimitState:
    limit: int = 0
    remaining: int = 0
    reset_at: float = 0
    last_updated: float = 0

class AdaptiveRateLimiter:
    def __init__(self, safety_margin: float = 0.1):
        self._states: dict[str, RateLimitState] = {}
        self.safety_margin = safety_margin
        self._lock = asyncio.Lock()

    def update_from_response(self, endpoint: str, response: httpx.Response):
        headers = response.headers
        state = self._states.setdefault(endpoint, RateLimitState())

        if "x-ratelimit-limit" in headers:
            state.limit = int(headers["x-ratelimit-limit"])
        if "x-ratelimit-remaining" in headers:
            state.remaining = int(headers["x-ratelimit-remaining"])
        if "x-ratelimit-reset" in headers:
            state.reset_at = float(headers["x-ratelimit-reset"])
        state.last_updated = time.monotonic()

    async def wait_if_needed(self, endpoint: str):
        async with self._lock:
            state = self._states.get(endpoint)
            if not state or state.limit == 0:
                return  # No data yet, proceed normally

            threshold = state.limit * self.safety_margin
            if state.remaining <= threshold:
                wait_seconds = max(0, state.reset_at - time.time())
                if wait_seconds > 0:
                    await asyncio.sleep(wait_seconds + 0.5)  # Small buffer

    def get_optimal_delay(self, endpoint: str) -> float:
        state = self._states.get(endpoint)
        if not state or state.remaining == 0:
            return 0

        time_until_reset = max(0, state.reset_at - time.time())
        if state.remaining > 0:
            return time_until_reset / state.remaining
        return time_until_reset

Retry middleware with exponential backoff

import random
from functools import wraps

class RetryConfig:
    def __init__(
        self,
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retry_statuses: set[int] = None,
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retry_statuses = retry_statuses or {429, 500, 502, 503, 504}

    def get_delay(self, attempt: int, retry_after: float | None = None) -> float:
        if retry_after:
            return retry_after  # Always respect server-specified delay

        delay = self.base_delay * (self.exponential_base ** attempt)
        delay = min(delay, self.max_delay)

        if self.jitter:
            delay = delay * random.uniform(0.5, 1.5)
        return delay


class ResilientClient:
    def __init__(
        self,
        rate_limiter: AdaptiveRateLimiter,
        retry_config: RetryConfig = None,
    ):
        self.limiter = rate_limiter
        self.retry = retry_config or RetryConfig()

    async def request(
        self, method: str, url: str, endpoint_key: str = None, **kwargs
    ) -> httpx.Response:
        key = endpoint_key or url

        for attempt in range(self.retry.max_retries + 1):
            await self.limiter.wait_if_needed(key)

            optimal_delay = self.limiter.get_optimal_delay(key)
            if optimal_delay > 0:
                await asyncio.sleep(optimal_delay)

            async with httpx.AsyncClient() as client:
                try:
                    response = await client.request(method, url, **kwargs)
                except httpx.RequestError as exc:
                    if attempt == self.retry.max_retries:
                        raise
                    delay = self.retry.get_delay(attempt)
                    await asyncio.sleep(delay)
                    continue

            self.limiter.update_from_response(key, response)

            if response.status_code not in self.retry.retry_statuses:
                return response

            if attempt == self.retry.max_retries:
                return response  # Return last response even if rate limited

            retry_after = None
            if "retry-after" in response.headers:
                try:
                    retry_after = float(response.headers["retry-after"])
                except ValueError:
                    pass

            delay = self.retry.get_delay(attempt, retry_after)
            await asyncio.sleep(delay)

        return response

Multi-endpoint rate limit coordination

Many APIs have independent rate limits per endpoint:

class MultiEndpointLimiter:
    def __init__(self):
        self._buckets: dict[str, TokenBucket] = {}

    def configure(self, endpoint: str, rate: float, burst: int):
        self._buckets[endpoint] = TokenBucket(rate=rate, capacity=burst)

    async def acquire(self, endpoint: str):
        bucket = self._buckets.get(endpoint)
        if bucket:
            await bucket.acquire()

# GitHub example: different limits per endpoint
limiter = MultiEndpointLimiter()
limiter.configure("/repos", rate=5000/3600, burst=50)     # 5000/hour
limiter.configure("/search", rate=30/60, burst=5)          # 30/minute
limiter.configure("/graphql", rate=5000/3600, burst=20)    # 5000/hour

Distributed rate limiting with Redis

When multiple workers share an API key, they must coordinate:

import redis.asyncio as redis

class DistributedRateLimiter:
    def __init__(self, redis_client: redis.Redis, key_prefix: str = "ratelimit"):
        self.redis = redis_client
        self.prefix = key_prefix

    async def acquire(self, resource: str, limit: int, window_seconds: int) -> bool:
        key = f"{self.prefix}:{resource}"
        pipe = self.redis.pipeline()

        now = time.time()
        window_start = now - window_seconds

        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, window_seconds)

        results = await pipe.execute()
        current_count = results[1]

        if current_count >= limit:
            await self.redis.zrem(key, str(now))
            return False
        return True

    async def wait_and_acquire(self, resource: str, limit: int, window_seconds: int):
        while not await self.acquire(resource, limit, window_seconds):
            await asyncio.sleep(window_seconds / limit)

This sliding window approach using Redis sorted sets ensures accurate counting across distributed workers.

Request batching and caching

The best rate limit strategy is needing fewer requests:

from functools import lru_cache
from datetime import datetime, timedelta

class BatchingClient:
    def __init__(self, client: ResilientClient, batch_size: int = 100):
        self.client = client
        self.batch_size = batch_size
        self._cache: dict[str, tuple[any, datetime]] = {}
        self._cache_ttl = timedelta(minutes=5)

    async def get_users(self, user_ids: list[str]) -> list[dict]:
        uncached = []
        results = {}

        for uid in user_ids:
            cached = self._cache.get(f"user:{uid}")
            if cached and datetime.utcnow() - cached[1] < self._cache_ttl:
                results[uid] = cached[0]
            else:
                uncached.append(uid)

        # Batch uncached requests
        for i in range(0, len(uncached), self.batch_size):
            batch = uncached[i:i + self.batch_size]
            ids_param = ",".join(batch)
            response = await self.client.request(
                "GET", f"/api/users?ids={ids_param}",
                endpoint_key="/api/users"
            )
            for user in response.json()["users"]:
                results[user["id"]] = user
                self._cache[f"user:{user['id']}"] = (user, datetime.utcnow())

        return [results[uid] for uid in user_ids if uid in results]

Circuit breaker pattern

When an API is severely rate limiting or down, stop sending requests entirely:

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure: float = 0
        self.state = "closed"  # closed, open, half-open

    def record_failure(self):
        self.failure_count += 1
        self.last_failure = time.monotonic()
        if self.failure_count >= self.failure_threshold:
            self.state = "open"

    def record_success(self):
        self.failure_count = 0
        self.state = "closed"

    def can_proceed(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.monotonic() - self.last_failure > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open: allow one attempt

Monitoring and observability

Track rate limit metrics to understand API consumption:

import logging

logger = logging.getLogger("api_rate_limits")

class RateLimitMetrics:
    def __init__(self):
        self.requests_total = 0
        self.retries_total = 0
        self.rate_limited_total = 0

    def record_request(self, endpoint: str, status: int, retries: int):
        self.requests_total += 1
        self.retries_total += retries
        if status == 429:
            self.rate_limited_total += 1

        if self.requests_total % 100 == 0:
            logger.info(
                f"API metrics: requests={self.requests_total} "
                f"retries={self.retries_total} "
                f"rate_limited={self.rate_limited_total} "
                f"retry_rate={self.retries_total/self.requests_total:.2%}"
            )

One thing to remember: Production rate limit handling is a layered system — token buckets for pacing, adaptive headers for dynamic adjustment, exponential backoff for retries, Redis for distributed coordination, caching and batching to reduce request count, and circuit breakers to fail fast when an API is overwhelmed. Build each layer, and your application stays reliable regardless of what external APIs throw at it.

pythonapirate-limitingnetworking

See Also

  • Python Proxy Rotation Why Python programs disguise their internet address when collecting data, and how proxy rotation works — explained without any tech jargon.
  • Python Sse Client Consumption How Python programs listen to live data streams from servers — like a radio that never stops playing — explained for complete beginners.
  • Python Web Scraping Ethics When is it okay to collect data from websites with Python, and when does it cross the line? The rules explained for everyone.
  • Python Webhook Handlers How Python programs receive instant notifications from other services when something happens — explained without technical jargon.
  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.