Python API Rate Limit Handling — Deep Dive
System-level framing
A production application that consumes external APIs must treat rate limits as a first-class architectural concern. The challenge goes beyond retrying 429 responses — it involves tracking quota across multiple endpoints (often with different limits), coordinating access across distributed workers, degrading gracefully when limits are tight, and avoiding cascading failures when an API becomes temporarily unavailable. This is client-side traffic shaping, and getting it wrong leads to data loss, stale caches, and API key revocation.
Token bucket implementation
The token bucket algorithm is the most versatile rate limiter for API clients:
import asyncio
import time
class TokenBucket:
def __init__(self, rate: float, capacity: int):
self.rate = rate # Tokens added per second
self.capacity = capacity # Maximum burst size
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, tokens: int = 1):
async with self._lock:
self._refill()
while self.tokens < tokens:
wait_time = (tokens - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self._refill()
self.tokens -= tokens
def _refill(self):
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
# Usage: 100 requests per minute = 1.667 tokens/sec, burst of 10
bucket = TokenBucket(rate=100/60, capacity=10)
async def make_request(url: str):
await bucket.acquire()
async with httpx.AsyncClient() as client:
return await client.get(url)
The token bucket allows short bursts (up to capacity) while maintaining a steady average rate. This matches how most APIs work — they tolerate bursts but enforce a rolling window average.
Adaptive rate limiter with header tracking
A smarter approach reads rate limit headers and adjusts dynamically:
import httpx
from dataclasses import dataclass
@dataclass
class RateLimitState:
limit: int = 0
remaining: int = 0
reset_at: float = 0
last_updated: float = 0
class AdaptiveRateLimiter:
def __init__(self, safety_margin: float = 0.1):
self._states: dict[str, RateLimitState] = {}
self.safety_margin = safety_margin
self._lock = asyncio.Lock()
def update_from_response(self, endpoint: str, response: httpx.Response):
headers = response.headers
state = self._states.setdefault(endpoint, RateLimitState())
if "x-ratelimit-limit" in headers:
state.limit = int(headers["x-ratelimit-limit"])
if "x-ratelimit-remaining" in headers:
state.remaining = int(headers["x-ratelimit-remaining"])
if "x-ratelimit-reset" in headers:
state.reset_at = float(headers["x-ratelimit-reset"])
state.last_updated = time.monotonic()
async def wait_if_needed(self, endpoint: str):
async with self._lock:
state = self._states.get(endpoint)
if not state or state.limit == 0:
return # No data yet, proceed normally
threshold = state.limit * self.safety_margin
if state.remaining <= threshold:
wait_seconds = max(0, state.reset_at - time.time())
if wait_seconds > 0:
await asyncio.sleep(wait_seconds + 0.5) # Small buffer
def get_optimal_delay(self, endpoint: str) -> float:
state = self._states.get(endpoint)
if not state or state.remaining == 0:
return 0
time_until_reset = max(0, state.reset_at - time.time())
if state.remaining > 0:
return time_until_reset / state.remaining
return time_until_reset
Retry middleware with exponential backoff
import random
from functools import wraps
class RetryConfig:
def __init__(
self,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retry_statuses: set[int] = None,
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
self.retry_statuses = retry_statuses or {429, 500, 502, 503, 504}
def get_delay(self, attempt: int, retry_after: float | None = None) -> float:
if retry_after:
return retry_after # Always respect server-specified delay
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
if self.jitter:
delay = delay * random.uniform(0.5, 1.5)
return delay
class ResilientClient:
def __init__(
self,
rate_limiter: AdaptiveRateLimiter,
retry_config: RetryConfig = None,
):
self.limiter = rate_limiter
self.retry = retry_config or RetryConfig()
async def request(
self, method: str, url: str, endpoint_key: str = None, **kwargs
) -> httpx.Response:
key = endpoint_key or url
for attempt in range(self.retry.max_retries + 1):
await self.limiter.wait_if_needed(key)
optimal_delay = self.limiter.get_optimal_delay(key)
if optimal_delay > 0:
await asyncio.sleep(optimal_delay)
async with httpx.AsyncClient() as client:
try:
response = await client.request(method, url, **kwargs)
except httpx.RequestError as exc:
if attempt == self.retry.max_retries:
raise
delay = self.retry.get_delay(attempt)
await asyncio.sleep(delay)
continue
self.limiter.update_from_response(key, response)
if response.status_code not in self.retry.retry_statuses:
return response
if attempt == self.retry.max_retries:
return response # Return last response even if rate limited
retry_after = None
if "retry-after" in response.headers:
try:
retry_after = float(response.headers["retry-after"])
except ValueError:
pass
delay = self.retry.get_delay(attempt, retry_after)
await asyncio.sleep(delay)
return response
Multi-endpoint rate limit coordination
Many APIs have independent rate limits per endpoint:
class MultiEndpointLimiter:
def __init__(self):
self._buckets: dict[str, TokenBucket] = {}
def configure(self, endpoint: str, rate: float, burst: int):
self._buckets[endpoint] = TokenBucket(rate=rate, capacity=burst)
async def acquire(self, endpoint: str):
bucket = self._buckets.get(endpoint)
if bucket:
await bucket.acquire()
# GitHub example: different limits per endpoint
limiter = MultiEndpointLimiter()
limiter.configure("/repos", rate=5000/3600, burst=50) # 5000/hour
limiter.configure("/search", rate=30/60, burst=5) # 30/minute
limiter.configure("/graphql", rate=5000/3600, burst=20) # 5000/hour
Distributed rate limiting with Redis
When multiple workers share an API key, they must coordinate:
import redis.asyncio as redis
class DistributedRateLimiter:
def __init__(self, redis_client: redis.Redis, key_prefix: str = "ratelimit"):
self.redis = redis_client
self.prefix = key_prefix
async def acquire(self, resource: str, limit: int, window_seconds: int) -> bool:
key = f"{self.prefix}:{resource}"
pipe = self.redis.pipeline()
now = time.time()
window_start = now - window_seconds
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, window_seconds)
results = await pipe.execute()
current_count = results[1]
if current_count >= limit:
await self.redis.zrem(key, str(now))
return False
return True
async def wait_and_acquire(self, resource: str, limit: int, window_seconds: int):
while not await self.acquire(resource, limit, window_seconds):
await asyncio.sleep(window_seconds / limit)
This sliding window approach using Redis sorted sets ensures accurate counting across distributed workers.
Request batching and caching
The best rate limit strategy is needing fewer requests:
from functools import lru_cache
from datetime import datetime, timedelta
class BatchingClient:
def __init__(self, client: ResilientClient, batch_size: int = 100):
self.client = client
self.batch_size = batch_size
self._cache: dict[str, tuple[any, datetime]] = {}
self._cache_ttl = timedelta(minutes=5)
async def get_users(self, user_ids: list[str]) -> list[dict]:
uncached = []
results = {}
for uid in user_ids:
cached = self._cache.get(f"user:{uid}")
if cached and datetime.utcnow() - cached[1] < self._cache_ttl:
results[uid] = cached[0]
else:
uncached.append(uid)
# Batch uncached requests
for i in range(0, len(uncached), self.batch_size):
batch = uncached[i:i + self.batch_size]
ids_param = ",".join(batch)
response = await self.client.request(
"GET", f"/api/users?ids={ids_param}",
endpoint_key="/api/users"
)
for user in response.json()["users"]:
results[user["id"]] = user
self._cache[f"user:{user['id']}"] = (user, datetime.utcnow())
return [results[uid] for uid in user_ids if uid in results]
Circuit breaker pattern
When an API is severely rate limiting or down, stop sending requests entirely:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure: float = 0
self.state = "closed" # closed, open, half-open
def record_failure(self):
self.failure_count += 1
self.last_failure = time.monotonic()
if self.failure_count >= self.failure_threshold:
self.state = "open"
def record_success(self):
self.failure_count = 0
self.state = "closed"
def can_proceed(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.monotonic() - self.last_failure > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open: allow one attempt
Monitoring and observability
Track rate limit metrics to understand API consumption:
import logging
logger = logging.getLogger("api_rate_limits")
class RateLimitMetrics:
def __init__(self):
self.requests_total = 0
self.retries_total = 0
self.rate_limited_total = 0
def record_request(self, endpoint: str, status: int, retries: int):
self.requests_total += 1
self.retries_total += retries
if status == 429:
self.rate_limited_total += 1
if self.requests_total % 100 == 0:
logger.info(
f"API metrics: requests={self.requests_total} "
f"retries={self.retries_total} "
f"rate_limited={self.rate_limited_total} "
f"retry_rate={self.retries_total/self.requests_total:.2%}"
)
One thing to remember: Production rate limit handling is a layered system — token buckets for pacing, adaptive headers for dynamic adjustment, exponential backoff for retries, Redis for distributed coordination, caching and batching to reduce request count, and circuit breakers to fail fast when an API is overwhelmed. Build each layer, and your application stays reliable regardless of what external APIs throw at it.
See Also
- Python Proxy Rotation Why Python programs disguise their internet address when collecting data, and how proxy rotation works — explained without any tech jargon.
- Python Sse Client Consumption How Python programs listen to live data streams from servers — like a radio that never stops playing — explained for complete beginners.
- Python Web Scraping Ethics When is it okay to collect data from websites with Python, and when does it cross the line? The rules explained for everyone.
- Python Webhook Handlers How Python programs receive instant notifications from other services when something happens — explained without technical jargon.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.