Python Exponential Backoff with Jitter — Deep Dive
Implementing the Four Jitter Strategies
import random
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class JitterStrategy(Enum):
NONE = "none"
FULL = "full"
EQUAL = "equal"
DECORRELATED = "decorrelated"
@dataclass
class BackoffConfig:
base_delay: float = 1.0
max_delay: float = 60.0
max_retries: int = 5
jitter: JitterStrategy = JitterStrategy.FULL
multiplier: float = 2.0
def calculate_delay(
config: BackoffConfig,
attempt: int,
previous_delay: Optional[float] = None,
) -> float:
"""Calculate the next retry delay based on strategy."""
exponential = config.base_delay * (config.multiplier ** attempt)
capped = min(exponential, config.max_delay)
if config.jitter == JitterStrategy.NONE:
return capped
elif config.jitter == JitterStrategy.FULL:
return random.uniform(0, capped)
elif config.jitter == JitterStrategy.EQUAL:
half = capped / 2
return half + random.uniform(0, half)
elif config.jitter == JitterStrategy.DECORRELATED:
prev = previous_delay or config.base_delay
return min(
config.max_delay,
random.uniform(config.base_delay, prev * 3),
)
return capped
A Production Retry Executor
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine, Optional, Type
logger = logging.getLogger(__name__)
@dataclass
class RetryResult:
value: Any
attempts: int
total_delay: float
succeeded: bool
last_error: Optional[Exception] = None
@dataclass
class RetryExecutor:
config: BackoffConfig = field(default_factory=BackoffConfig)
retryable_exceptions: tuple[Type[Exception], ...] = (Exception,)
on_retry: Optional[Callable] = None
async def execute(
self,
func: Callable[..., Coroutine],
*args: Any,
**kwargs: Any,
) -> RetryResult:
last_error: Optional[Exception] = None
total_delay = 0.0
previous_delay: Optional[float] = None
for attempt in range(self.config.max_retries + 1):
try:
result = await func(*args, **kwargs)
return RetryResult(
value=result,
attempts=attempt + 1,
total_delay=total_delay,
succeeded=True,
)
except self.retryable_exceptions as exc:
last_error = exc
if attempt >= self.config.max_retries:
break
delay = calculate_delay(
self.config, attempt, previous_delay
)
previous_delay = delay
total_delay += delay
logger.warning(
"Attempt %d/%d failed (%s: %s). "
"Retrying in %.2fs...",
attempt + 1,
self.config.max_retries + 1,
type(exc).__name__,
exc,
delay,
)
if self.on_retry:
self.on_retry(attempt, exc, delay)
await asyncio.sleep(delay)
return RetryResult(
value=None,
attempts=self.config.max_retries + 1,
total_delay=total_delay,
succeeded=False,
last_error=last_error,
)
Usage Example
import httpx
config = BackoffConfig(
base_delay=0.5,
max_delay=30.0,
max_retries=4,
jitter=JitterStrategy.FULL,
)
retry = RetryExecutor(
config=config,
retryable_exceptions=(httpx.TimeoutException, httpx.HTTPStatusError),
)
async def fetch_data(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
async def main():
result = await retry.execute(
fetch_data, "https://api.example.com/data"
)
if result.succeeded:
print(f"Got data after {result.attempts} attempts "
f"(waited {result.total_delay:.1f}s total)")
else:
print(f"Failed after {result.attempts} attempts: {result.last_error}")
Using Tenacity (Production Library)
For production code, the tenacity library provides a battle-tested implementation:
import tenacity
import httpx
@tenacity.retry(
wait=tenacity.wait_exponential(
multiplier=1,
min=0.5,
max=60,
) + tenacity.wait_random(0, 2), # Add jitter
stop=tenacity.stop_after_attempt(5),
retry=tenacity.retry_if_exception_type(
(httpx.TimeoutException, httpx.HTTPStatusError)
),
before_sleep=tenacity.before_sleep_log(logger, logging.WARNING),
reraise=True,
)
async def fetch_with_retry(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
Tenacity also supports:
- Retry callbacks for logging and metrics
- Custom retry conditions based on return values (not just exceptions)
- Retry budgets via
stop_after_delay()(total time limit) - Combining strategies with
+and|operators
Retry Budgets
Instead of capping retries by count, cap by total time. This prevents retries from exceeding the caller’s timeout:
@dataclass
class RetryBudget:
"""Limits retries by total elapsed time, not count."""
total_budget: float # Maximum seconds for all retries
config: BackoffConfig = field(default_factory=BackoffConfig)
async def execute(
self,
func: Callable[..., Coroutine],
*args: Any,
**kwargs: Any,
) -> RetryResult:
start = time.monotonic()
attempt = 0
previous_delay = None
while True:
elapsed = time.monotonic() - start
remaining = self.total_budget - elapsed
if remaining <= 0:
break
try:
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=min(remaining, self.config.max_delay),
)
return RetryResult(
value=result,
attempts=attempt + 1,
total_delay=elapsed,
succeeded=True,
)
except Exception as exc:
attempt += 1
delay = calculate_delay(
self.config, attempt, previous_delay
)
delay = min(delay, remaining)
previous_delay = delay
if delay <= 0:
return RetryResult(
value=None,
attempts=attempt,
total_delay=elapsed,
succeeded=False,
last_error=exc,
)
await asyncio.sleep(delay)
return RetryResult(
value=None,
attempts=attempt,
total_delay=time.monotonic() - start,
succeeded=False,
last_error=TimeoutError("Retry budget exhausted"),
)
Respecting Retry-After Headers
When a server sends Retry-After, use that value instead of your calculated backoff:
async def fetch_respecting_retry_after(
client: httpx.AsyncClient,
url: str,
config: BackoffConfig,
) -> httpx.Response:
previous_delay = None
for attempt in range(config.max_retries + 1):
response = await client.get(url, timeout=5.0)
if response.status_code == 429 or response.status_code == 503:
retry_after = response.headers.get("Retry-After")
if retry_after:
try:
# Retry-After can be seconds or an HTTP date
delay = float(retry_after)
except ValueError:
from email.utils import parsedate_to_datetime
target = parsedate_to_datetime(retry_after)
delay = max(0, (target - datetime.now(timezone.utc)).total_seconds())
else:
delay = calculate_delay(config, attempt, previous_delay)
delay = min(delay, config.max_delay)
previous_delay = delay
await asyncio.sleep(delay)
continue
response.raise_for_status()
return response
raise httpx.HTTPStatusError(
"Max retries exceeded",
request=response.request,
response=response,
)
Monitoring Retries
Track retry behavior to detect systemic issues:
from prometheus_client import Counter, Histogram
retry_attempts = Counter(
"retry_attempts_total",
"Total retry attempts",
["operation", "outcome"], # outcome: success, exhausted
)
retry_delay = Histogram(
"retry_delay_seconds",
"Delay between retry attempts",
["operation"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
retry_total_duration = Histogram(
"retry_total_duration_seconds",
"Total time spent retrying (including delays)",
["operation"],
buckets=[1, 5, 10, 30, 60, 120, 300],
)
Alerts to configure:
- Retry rate > 10% → the upstream service is degraded
- Average retry count > 2 → sustained failures, not transient
- Retry budget exhaustion rate > 1% → users are seeing errors
Comparing Jitter Strategies Under Load
AWS published a simulation comparing strategies when 100 clients fail simultaneously against a server that can handle 10 requests per second:
| Strategy | Time to Complete All Retries | Peak Server Load |
|---|---|---|
| No Jitter | 90 seconds | 100 req/s spikes |
| Full Jitter | 45 seconds | ~12 req/s sustained |
| Equal Jitter | 50 seconds | ~15 req/s sustained |
| Decorrelated | 55 seconds | ~13 req/s sustained |
Full jitter wins on both completion time and server load. The uniform distribution spreads retries most effectively across the backoff window.
Testing Retry Logic
import pytest
@pytest.mark.asyncio
async def test_succeeds_on_third_attempt():
call_count = 0
async def flaky_func():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError("Temporary failure")
return "success"
executor = RetryExecutor(
config=BackoffConfig(base_delay=0.01, max_retries=5),
retryable_exceptions=(ConnectionError,),
)
result = await executor.execute(flaky_func)
assert result.succeeded
assert result.attempts == 3
assert result.value == "success"
@pytest.mark.asyncio
async def test_exhausts_retries():
async def always_fail():
raise ConnectionError("Down")
executor = RetryExecutor(
config=BackoffConfig(base_delay=0.01, max_retries=2),
retryable_exceptions=(ConnectionError,),
)
result = await executor.execute(always_fail)
assert not result.succeeded
assert result.attempts == 3 # 1 initial + 2 retries
def test_jitter_strategies_produce_valid_delays():
for strategy in JitterStrategy:
config = BackoffConfig(
base_delay=1.0, max_delay=60.0, jitter=strategy
)
for attempt in range(10):
delay = calculate_delay(config, attempt)
assert 0 <= delay <= config.max_delay
One thing to remember: The combination of exponential backoff (progressively longer waits) and full jitter (random spread within each interval) is the proven default for distributed retry logic. Use tenacity for production code, respect Retry-After headers when present, and monitor retry rates to catch upstream degradation before it causes user-visible failures.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.