Python Timeout Patterns — Deep Dive

Timeouts in asyncio

The asyncio.wait_for() function is your primary tool for adding timeouts to any coroutine:

import asyncio

async def fetch_user_profile(user_id: str) -> dict:
    """Simulate a slow external call."""
    await asyncio.sleep(10)  # Slow service
    return {"id": user_id, "name": "Alice"}

async def get_profile_with_timeout(user_id: str) -> dict:
    try:
        return await asyncio.wait_for(
            fetch_user_profile(user_id),
            timeout=5.0,
        )
    except asyncio.TimeoutError:
        # Return cached or default data
        return {"id": user_id, "name": "Unknown", "cached": True}

Gotcha: asyncio.wait_for cancels the underlying task when the timeout expires. If the coroutine holds resources (database connections, file handles), ensure cleanup happens in try/finally blocks inside the coroutine itself.

TaskGroup Timeouts (Python 3.11+)

For multiple concurrent operations with a shared deadline:

import asyncio

async def fetch_with_deadline(deadline: float):
    remaining = deadline - asyncio.get_event_loop().time()
    if remaining <= 0:
        raise asyncio.TimeoutError("Deadline already passed")

    async with asyncio.timeout(remaining):
        async with asyncio.TaskGroup() as tg:
            user_task = tg.create_task(fetch_user(user_id))
            prefs_task = tg.create_task(fetch_preferences(user_id))
            history_task = tg.create_task(fetch_history(user_id))

    return {
        "user": user_task.result(),
        "preferences": prefs_task.result(),
        "history": history_task.result(),
    }

asyncio.timeout() (added in 3.11) is a context manager that cancels all tasks within its scope when the deadline expires. It’s cleaner than wrapping each call individually.

HTTP Client Timeouts with httpx

The httpx library provides granular timeout control:

import httpx

# Per-phase timeouts
timeout = httpx.Timeout(
    connect=5.0,     # TCP connection
    read=10.0,       # Waiting for response bytes
    write=5.0,       # Sending request body
    pool=3.0,        # Waiting for a connection from the pool
)

async with httpx.AsyncClient(timeout=timeout) as client:
    response = await client.get("https://api.example.com/data")

The pool timeout is often overlooked but critical — if all connections in the pool are busy, new requests wait. Without a pool timeout, this wait is unbounded.

Deadline Propagation Across Services

In microservice architectures, pass remaining time as a header so downstream services know when to give up:

import time
from fastapi import FastAPI, Request, HTTPException
import httpx

app = FastAPI()

DEADLINE_HEADER = "X-Request-Deadline"

@app.middleware("http")
async def deadline_middleware(request: Request, call_next):
    deadline_str = request.headers.get(DEADLINE_HEADER)
    if deadline_str:
        deadline = float(deadline_str)
        remaining = deadline - time.monotonic()
        if remaining <= 0:
            raise HTTPException(
                status_code=504,
                detail="Request deadline exceeded before processing"
            )
        request.state.deadline = deadline
    else:
        # No upstream deadline — set our own default
        request.state.deadline = time.monotonic() + 30.0

    response = await call_next(request)
    return response


async def call_downstream(
    client: httpx.AsyncClient,
    url: str,
    deadline: float,
    **kwargs,
) -> httpx.Response:
    remaining = deadline - time.monotonic()
    if remaining <= 0:
        raise TimeoutError("Deadline already passed")

    return await client.get(
        url,
        headers={DEADLINE_HEADER: str(deadline)},
        timeout=httpx.Timeout(connect=2.0, read=remaining, write=2.0),
        **kwargs,
    )

This pattern ensures Service C doesn’t spend 10 seconds processing a request that Service A already gave up on 5 seconds ago.

Adaptive Timeout Tuning

Static timeouts are a starting point; production systems benefit from dynamic adjustment based on observed latency:

import time
from collections import deque
from statistics import mean, stdev

class AdaptiveTimeout:
    """Adjusts timeout based on recent response times."""

    def __init__(
        self,
        initial: float = 5.0,
        min_timeout: float = 1.0,
        max_timeout: float = 30.0,
        window_size: int = 100,
        multiplier: float = 3.0,  # timeout = mean + multiplier * stdev
    ):
        self.min_timeout = min_timeout
        self.max_timeout = max_timeout
        self.multiplier = multiplier
        self._history: deque[float] = deque(maxlen=window_size)
        self._current = initial

    def record(self, duration: float) -> None:
        self._history.append(duration)
        if len(self._history) >= 10:
            avg = mean(self._history)
            sd = stdev(self._history) if len(self._history) > 1 else 0
            calculated = avg + self.multiplier * sd
            self._current = max(
                self.min_timeout,
                min(self.max_timeout, calculated),
            )

    @property
    def value(self) -> float:
        return self._current

# Usage
timeout_tracker = AdaptiveTimeout(initial=5.0)

async def fetch_with_adaptive_timeout(client, url):
    start = time.monotonic()
    try:
        response = await client.get(url, timeout=timeout_tracker.value)
        timeout_tracker.record(time.monotonic() - start)
        return response
    except httpx.TimeoutException:
        timeout_tracker.record(timeout_tracker.value)  # Record at max
        raise

The multiplier of 3.0 means the timeout is set at roughly the P99.7 latency (assuming normal distribution). Adjust based on your tolerance for false timeouts.

Signal-Based Timeouts for Blocking Code

When you can’t use asyncio (synchronous code, C extensions), signal.alarm provides a last-resort timeout on Unix systems:

import signal

class TimeoutError(Exception):
    pass

def timeout_handler(signum, frame):
    raise TimeoutError("Operation timed out")

def run_with_timeout(func, timeout_seconds, *args, **kwargs):
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout_seconds)
    try:
        return func(*args, **kwargs)
    finally:
        signal.alarm(0)  # Cancel the alarm
        signal.signal(signal.SIGALRM, old_handler)

Limitations: Only works on Unix, only in the main thread, and signal.alarm has second-level granularity (no sub-second timeouts). For finer control, use threading.Timer or wrap in a subprocess.

Timeout Testing

Testing timeout behavior requires controlling time or using real delays:

import pytest
import asyncio

@pytest.mark.asyncio
async def test_timeout_returns_fallback():
    async def slow_service():
        await asyncio.sleep(100)
        return "real data"

    result = await asyncio.wait_for(
        get_with_fallback(slow_service),
        timeout=2.0,
    )
    assert result == {"cached": True}

@pytest.mark.asyncio
async def test_adaptive_timeout_adjusts():
    tracker = AdaptiveTimeout(initial=5.0)
    # Simulate fast responses
    for _ in range(20):
        tracker.record(0.1)
    # Timeout should shrink toward the fast response time
    assert tracker.value < 2.0

Common Pitfalls

  1. Timeout on connect but not on read. The connection succeeds instantly but the server never sends a response. Both phases need limits.

  2. Retrying without reducing the timeout. If your total budget is 30 seconds and the first attempt times out at 30 seconds, there’s no time left for retries. Use per_retry_timeout = total_timeout / (max_retries + 1).

  3. Ignoring pool exhaustion. Connection pool timeouts are invisible — your code sees a slow response when it’s actually waiting for a free connection, not for the server.

  4. Using time.time() for deadlines. System clock can jump (NTP adjustments, leap seconds). Use time.monotonic() for duration-based timeouts.

  5. Catching TimeoutError too broadly. In Python 3.11+, asyncio.TimeoutError is a subclass of TimeoutError. Make sure your except TimeoutError handlers distinguish between your timeouts and unrelated ones.

One thing to remember: The right timeout value isn’t fixed — it’s a function of observed latency, acceptable error rates, and downstream capacity. Use adaptive tuning in production and deadline propagation across service boundaries to avoid both false timeouts and zombie requests.

pythonreliabilityasync

See Also