Python Retry with Backoff — Deep Dive

Building a Retry Decorator from Scratch

Understanding the mechanics before reaching for a library:

import asyncio
import random
import functools
from typing import Type

def retry(
    max_attempts: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True,
    retryable_exceptions: tuple[Type[Exception], ...] = (Exception,),
):
    def decorator(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt == max_attempts - 1:
                        raise

                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay,
                    )
                    if jitter:
                        delay = delay * (0.5 + random.random())

                    await asyncio.sleep(delay)

            raise last_exception

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            import time
            last_exception = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e
                    if attempt == max_attempts - 1:
                        raise

                    delay = min(
                        base_delay * (exponential_base ** attempt),
                        max_delay,
                    )
                    if jitter:
                        delay = delay * (0.5 + random.random())

                    time.sleep(delay)

            raise last_exception

        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        return sync_wrapper

    return decorator

Usage:

@retry(max_attempts=3, retryable_exceptions=(ConnectionError, TimeoutError))
async def fetch_user(user_id: int):
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.example.com/users/{user_id}")
        resp.raise_for_status()
        return resp.json()

Tenacity: The Standard Library for Retries

tenacity is the most popular retry library in Python. It handles edge cases that a simple decorator misses:

from tenacity import (
    retry,
    stop_after_attempt,
    stop_after_delay,
    wait_exponential_jitter,
    retry_if_exception_type,
    before_sleep_log,
    after_log,
)
import logging

logger = logging.getLogger(__name__)

@retry(
    stop=(stop_after_attempt(5) | stop_after_delay(30)),
    wait=wait_exponential_jitter(initial=1, max=60, jitter=5),
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    after=after_log(logger, logging.INFO),
    reraise=True,
)
async def call_payment_api(amount: int, token: str):
    async with httpx.AsyncClient(timeout=10) as client:
        resp = await client.post(
            "https://payments.example.com/charge",
            json={"amount": amount, "token": token},
        )
        resp.raise_for_status()
        return resp.json()

Key tenacity features:

  • Combined stop conditions — Stop after 5 attempts OR 30 seconds total, whichever comes first
  • Jitter strategieswait_exponential_jitter adds bounded randomness
  • Logging hooksbefore_sleep logs each retry attempt with the delay
  • Custom retry predicates — Retry based on exception type, return value, or custom logic

Retry on Specific HTTP Status Codes

from tenacity import retry_if_result

def is_retryable_status(response):
    return response.status_code in (429, 500, 502, 503, 504)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=30),
    retry=retry_if_result(is_retryable_status),
)
async def api_call(url: str):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

This retries based on the response rather than exceptions — useful when the HTTP client doesn’t raise exceptions for server errors.

Respecting Retry-After Headers

Well-behaved APIs send Retry-After headers with 429 responses:

async def call_with_retry_after(url: str, max_retries: int = 3):
    async with httpx.AsyncClient() as client:
        for attempt in range(max_retries):
            resp = await client.get(url)

            if resp.status_code == 429:
                retry_after = resp.headers.get("Retry-After")
                if retry_after:
                    wait_time = float(retry_after)
                else:
                    wait_time = 2 ** attempt
                await asyncio.sleep(min(wait_time, 120))
                continue

            resp.raise_for_status()
            return resp.json()

    raise RuntimeError(f"Failed after {max_retries} retries")

Always respect Retry-After over your calculated backoff — the server knows its load better than you do.

httpx Built-In Transport Retries

httpx supports transport-level retries for connection failures:

import httpx

transport = httpx.AsyncHTTPTransport(retries=3)
async with httpx.AsyncClient(transport=transport) as client:
    resp = await client.get("https://api.example.com/data")

Transport retries only cover connection-level failures (TCP connect, TLS handshake). They don’t retry on HTTP 500 or timeout responses. Use them alongside application-level retry logic.

Retry with Context: Request-Scoped Budgets

In a microservice that makes multiple downstream calls per request, a per-request retry budget prevents cascading timeouts:

class RetryBudget:
    def __init__(self, max_total_retries: int = 10, max_total_wait: float = 15.0):
        self.remaining_retries = max_total_retries
        self.remaining_wait = max_total_wait

    def can_retry(self, delay: float) -> bool:
        return self.remaining_retries > 0 and self.remaining_wait >= delay

    def consume(self, delay: float):
        self.remaining_retries -= 1
        self.remaining_wait -= delay

# Usage in request handler
async def handle_request(request):
    budget = RetryBudget(max_total_retries=10, max_total_wait=15.0)

    user = await fetch_with_budget(budget, get_user, request.user_id)
    orders = await fetch_with_budget(budget, get_orders, request.user_id)
    recommendations = await fetch_with_budget(budget, get_recommendations, user)

    return {"user": user, "orders": orders, "recommendations": recommendations}

If fetching the user takes 3 retries, the remaining calls have fewer retries available. This prevents a single slow dependency from consuming the entire request timeout.

Async Retry Patterns

Retry with Semaphore (Concurrency Limit)

sem = asyncio.Semaphore(10)  # Max 10 concurrent retrying calls

@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1))
async def rate_limited_call(url: str):
    async with sem:
        async with httpx.AsyncClient() as client:
            resp = await client.get(url, timeout=5)
            resp.raise_for_status()
            return resp.json()

Batch Retry with Partial Success

async def fetch_batch_with_retry(ids: list[int], max_retries: int = 3):
    results = {}
    remaining = list(ids)

    for attempt in range(max_retries):
        tasks = [fetch_item(id) for id in remaining]
        outcomes = await asyncio.gather(*tasks, return_exceptions=True)

        still_failing = []
        for id, outcome in zip(remaining, outcomes):
            if isinstance(outcome, Exception):
                still_failing.append(id)
            else:
                results[id] = outcome

        remaining = still_failing
        if not remaining:
            break

        await asyncio.sleep(2 ** attempt)

    # Report partial results and failures
    return results, remaining

Testing Retry Logic

import pytest
from unittest.mock import AsyncMock

@pytest.mark.asyncio
async def test_retry_succeeds_on_second_attempt():
    mock_api = AsyncMock(side_effect=[ConnectionError("timeout"), {"id": 1}])

    @retry(max_attempts=3, retryable_exceptions=(ConnectionError,))
    async def call_api():
        return await mock_api()

    result = await call_api()
    assert result == {"id": 1}
    assert mock_api.call_count == 2

@pytest.mark.asyncio
async def test_retry_exhausted():
    mock_api = AsyncMock(side_effect=ConnectionError("always fails"))

    @retry(max_attempts=3, base_delay=0.01, retryable_exceptions=(ConnectionError,))
    async def call_api():
        return await mock_api()

    with pytest.raises(ConnectionError):
        await call_api()
    assert mock_api.call_count == 3

Use tiny delays in tests (base_delay=0.01) to avoid slow test suites.

Anti-Patterns

Retrying non-idempotent operations without idempotency keys. Retrying a POST that creates a record might create duplicates. Always combine retry logic with idempotency protection.

Retrying without logging. Silent retries hide problems. Log every retry with the attempt number, delay, and error. This makes debugging production issues dramatically easier.

Infinite retry loops. Always set both max_attempts and max_elapsed_time. A retry loop without bounds can keep a request alive for hours.

Retrying on the wrong layer. If your HTTP client, ORM, and application code all have retry logic, a single transient failure might trigger dozens of actual retries. Pick one layer for retry logic and disable it elsewhere.

The one thing to remember: Production retry logic needs exponential backoff with jitter, respect for Retry-After headers, per-request budgets to prevent cascading timeouts, and logging on every retry attempt — use tenacity for the heavy lifting.

pythonreliabilitypatterns

See Also