Python Timeout Patterns — Deep Dive
Timeouts in asyncio
The asyncio.wait_for() function is your primary tool for adding timeouts to any coroutine:
import asyncio
async def fetch_user_profile(user_id: str) -> dict:
"""Simulate a slow external call."""
await asyncio.sleep(10) # Slow service
return {"id": user_id, "name": "Alice"}
async def get_profile_with_timeout(user_id: str) -> dict:
try:
return await asyncio.wait_for(
fetch_user_profile(user_id),
timeout=5.0,
)
except asyncio.TimeoutError:
# Return cached or default data
return {"id": user_id, "name": "Unknown", "cached": True}
Gotcha: asyncio.wait_for cancels the underlying task when the timeout expires. If the coroutine holds resources (database connections, file handles), ensure cleanup happens in try/finally blocks inside the coroutine itself.
TaskGroup Timeouts (Python 3.11+)
For multiple concurrent operations with a shared deadline:
import asyncio
async def fetch_with_deadline(deadline: float):
remaining = deadline - asyncio.get_event_loop().time()
if remaining <= 0:
raise asyncio.TimeoutError("Deadline already passed")
async with asyncio.timeout(remaining):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
prefs_task = tg.create_task(fetch_preferences(user_id))
history_task = tg.create_task(fetch_history(user_id))
return {
"user": user_task.result(),
"preferences": prefs_task.result(),
"history": history_task.result(),
}
asyncio.timeout() (added in 3.11) is a context manager that cancels all tasks within its scope when the deadline expires. It’s cleaner than wrapping each call individually.
HTTP Client Timeouts with httpx
The httpx library provides granular timeout control:
import httpx
# Per-phase timeouts
timeout = httpx.Timeout(
connect=5.0, # TCP connection
read=10.0, # Waiting for response bytes
write=5.0, # Sending request body
pool=3.0, # Waiting for a connection from the pool
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get("https://api.example.com/data")
The pool timeout is often overlooked but critical — if all connections in the pool are busy, new requests wait. Without a pool timeout, this wait is unbounded.
Deadline Propagation Across Services
In microservice architectures, pass remaining time as a header so downstream services know when to give up:
import time
from fastapi import FastAPI, Request, HTTPException
import httpx
app = FastAPI()
DEADLINE_HEADER = "X-Request-Deadline"
@app.middleware("http")
async def deadline_middleware(request: Request, call_next):
deadline_str = request.headers.get(DEADLINE_HEADER)
if deadline_str:
deadline = float(deadline_str)
remaining = deadline - time.monotonic()
if remaining <= 0:
raise HTTPException(
status_code=504,
detail="Request deadline exceeded before processing"
)
request.state.deadline = deadline
else:
# No upstream deadline — set our own default
request.state.deadline = time.monotonic() + 30.0
response = await call_next(request)
return response
async def call_downstream(
client: httpx.AsyncClient,
url: str,
deadline: float,
**kwargs,
) -> httpx.Response:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError("Deadline already passed")
return await client.get(
url,
headers={DEADLINE_HEADER: str(deadline)},
timeout=httpx.Timeout(connect=2.0, read=remaining, write=2.0),
**kwargs,
)
This pattern ensures Service C doesn’t spend 10 seconds processing a request that Service A already gave up on 5 seconds ago.
Adaptive Timeout Tuning
Static timeouts are a starting point; production systems benefit from dynamic adjustment based on observed latency:
import time
from collections import deque
from statistics import mean, stdev
class AdaptiveTimeout:
"""Adjusts timeout based on recent response times."""
def __init__(
self,
initial: float = 5.0,
min_timeout: float = 1.0,
max_timeout: float = 30.0,
window_size: int = 100,
multiplier: float = 3.0, # timeout = mean + multiplier * stdev
):
self.min_timeout = min_timeout
self.max_timeout = max_timeout
self.multiplier = multiplier
self._history: deque[float] = deque(maxlen=window_size)
self._current = initial
def record(self, duration: float) -> None:
self._history.append(duration)
if len(self._history) >= 10:
avg = mean(self._history)
sd = stdev(self._history) if len(self._history) > 1 else 0
calculated = avg + self.multiplier * sd
self._current = max(
self.min_timeout,
min(self.max_timeout, calculated),
)
@property
def value(self) -> float:
return self._current
# Usage
timeout_tracker = AdaptiveTimeout(initial=5.0)
async def fetch_with_adaptive_timeout(client, url):
start = time.monotonic()
try:
response = await client.get(url, timeout=timeout_tracker.value)
timeout_tracker.record(time.monotonic() - start)
return response
except httpx.TimeoutException:
timeout_tracker.record(timeout_tracker.value) # Record at max
raise
The multiplier of 3.0 means the timeout is set at roughly the P99.7 latency (assuming normal distribution). Adjust based on your tolerance for false timeouts.
Signal-Based Timeouts for Blocking Code
When you can’t use asyncio (synchronous code, C extensions), signal.alarm provides a last-resort timeout on Unix systems:
import signal
class TimeoutError(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutError("Operation timed out")
def run_with_timeout(func, timeout_seconds, *args, **kwargs):
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
return func(*args, **kwargs)
finally:
signal.alarm(0) # Cancel the alarm
signal.signal(signal.SIGALRM, old_handler)
Limitations: Only works on Unix, only in the main thread, and signal.alarm has second-level granularity (no sub-second timeouts). For finer control, use threading.Timer or wrap in a subprocess.
Timeout Testing
Testing timeout behavior requires controlling time or using real delays:
import pytest
import asyncio
@pytest.mark.asyncio
async def test_timeout_returns_fallback():
async def slow_service():
await asyncio.sleep(100)
return "real data"
result = await asyncio.wait_for(
get_with_fallback(slow_service),
timeout=2.0,
)
assert result == {"cached": True}
@pytest.mark.asyncio
async def test_adaptive_timeout_adjusts():
tracker = AdaptiveTimeout(initial=5.0)
# Simulate fast responses
for _ in range(20):
tracker.record(0.1)
# Timeout should shrink toward the fast response time
assert tracker.value < 2.0
Common Pitfalls
-
Timeout on connect but not on read. The connection succeeds instantly but the server never sends a response. Both phases need limits.
-
Retrying without reducing the timeout. If your total budget is 30 seconds and the first attempt times out at 30 seconds, there’s no time left for retries. Use
per_retry_timeout = total_timeout / (max_retries + 1). -
Ignoring pool exhaustion. Connection pool timeouts are invisible — your code sees a slow response when it’s actually waiting for a free connection, not for the server.
-
Using
time.time()for deadlines. System clock can jump (NTP adjustments, leap seconds). Usetime.monotonic()for duration-based timeouts. -
Catching
TimeoutErrortoo broadly. In Python 3.11+,asyncio.TimeoutErroris a subclass ofTimeoutError. Make sure yourexcept TimeoutErrorhandlers distinguish between your timeouts and unrelated ones.
One thing to remember: The right timeout value isn’t fixed — it’s a function of observed latency, acceptable error rates, and downstream capacity. Use adaptive tuning in production and deadline propagation across service boundaries to avoid both false timeouts and zombie requests.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.