FastAPI Background Tasks — Deep Dive
Under the hood: Starlette’s BackgroundTask
FastAPI’s BackgroundTasks is a wrapper around Starlette’s BackgroundTask. When you return a response, Starlette attaches the accumulated tasks to the response object. After the response body is fully sent to the client, Starlette’s ASGI server calls task() on each registered background task sequentially.
The execution model:
# Simplified Starlette internals
class Response:
async def __call__(self, scope, receive, send):
await send({"type": "http.response.start", ...})
await send({"type": "http.response.body", ...})
# Response is sent. Now run background tasks:
if self.background:
await self.background()
Key insight: background tasks run after the ASGI send cycle completes but before the ASGI scope closes. The client has already received the response. The server is still holding the connection context.
Sequential execution
Multiple tasks added via add_task() run sequentially, not concurrently:
@app.post("/signup")
async def signup(bg: BackgroundTasks):
bg.add_task(send_welcome_email, user.email)
bg.add_task(update_analytics, "signup", user.id)
bg.add_task(sync_to_crm, user)
return {"status": "created"}
These three tasks run one after another. If send_welcome_email takes 5 seconds, update_analytics waits. This is by design — Starlette keeps it simple to avoid concurrency bugs in task code.
If you need concurrent background work, you can wrap multiple operations in a single async task that uses asyncio.gather:
async def post_signup_tasks(email: str, user_id: int, user: User):
await asyncio.gather(
send_welcome_email(email),
update_analytics("signup", user_id),
sync_to_crm(user),
)
@app.post("/signup")
async def signup(bg: BackgroundTasks):
bg.add_task(post_signup_tasks, user.email, user.id, user)
return {"status": "created"}
Thread pool behavior for sync tasks
When you pass a synchronous function to add_task(), Starlette runs it via anyio.to_thread.run_sync(). This uses the default thread pool (typically 40 threads in AnyIO’s default configuration).
Watch out for:
- Thread pool exhaustion: If many requests queue sync background tasks simultaneously, you can exhaust the thread pool, causing sync route handlers and other sync tasks to queue up
- Thread-unsafe code: Sync tasks run in different threads. If they share mutable state without locks, you get race conditions
- Database sessions: Most ORM sessions are not thread-safe. Create a new session inside each sync background task
def sync_background_task(user_id: int):
# Create a fresh session — don't reuse the request's session
db = SessionLocal()
try:
user = db.query(User).get(user_id)
user.last_login = datetime.utcnow()
db.commit()
finally:
db.close()
Error handling strategies
Background task exceptions are silently swallowed by default — they log to stderr but don’t propagate anywhere useful. In production, you need explicit error handling:
import logging
import traceback
logger = logging.getLogger("background_tasks")
async def resilient_task(func, *args, max_retries: int = 3, **kwargs):
for attempt in range(max_retries):
try:
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
await result
return
except Exception as e:
logger.error(
f"Background task {func.__name__} failed "
f"(attempt {attempt + 1}/{max_retries}): {e}\n"
f"{traceback.format_exc()}"
)
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
logger.critical(f"Background task {func.__name__} exhausted retries")
For critical tasks, consider writing failures to a dead-letter table in your database, where a separate process can retry them.
Hybrid architecture: BackgroundTasks + Celery
Production systems often use both. The pattern:
@app.post("/orders")
async def create_order(order: OrderIn, bg: BackgroundTasks):
db_order = save_order(order)
# Lightweight, loss-tolerant → BackgroundTasks
bg.add_task(update_order_count_cache, db_order.id)
bg.add_task(log_order_event, db_order.id, "created")
# Critical, must-complete → Celery
process_payment.delay(db_order.id)
send_confirmation_email.delay(db_order.id, order.email)
return {"order_id": db_order.id}
Decision matrix:
| Criterion | BackgroundTasks | Task Queue (Celery/ARQ) |
|---|---|---|
| Delivery guarantee | None | At-least-once |
| Retry support | Manual | Built-in |
| Monitoring | None | Flower/dashboard |
| Infrastructure | None | Redis/RabbitMQ |
| Latency to start | Microseconds | Milliseconds |
| Scalability | Single process | Distributed workers |
Memory and resource leaks
Background tasks hold references to their arguments until execution. If you pass large objects (file contents, DataFrame), they stay in memory for the duration of all preceding tasks in the queue:
# Bad: holds entire file in memory until task runs
@app.post("/upload")
async def upload(file: UploadFile, bg: BackgroundTasks):
contents = await file.read() # Could be 100MB
bg.add_task(process_file, contents) # Held in memory
return {"status": "processing"}
# Better: pass a reference, let the task read from disk
@app.post("/upload")
async def upload(file: UploadFile, bg: BackgroundTasks):
path = save_to_temp(file)
bg.add_task(process_file_from_path, path)
return {"status": "processing"}
Testing background tasks
Background tasks in tests execute synchronously when using TestClient (because it runs the ASGI app synchronously). This is actually convenient — your test can assert on the side effects immediately:
def test_signup_sends_email(client, mock_email_service):
response = client.post("/signup", json={"email": "test@example.com"})
assert response.status_code == 200
# Background task already ran by this point
assert mock_email_service.sent_to == ["test@example.com"]
With httpx.AsyncClient and async tests, background tasks also complete before the response is returned to the test, because the test awaits the full ASGI cycle.
Middleware interaction
Background tasks run after middleware’s response phase. The execution order:
- Request middleware (inbound)
- Route handler
- Response middleware (outbound)
- Response sent to client
- Background tasks execute
This means middleware like CORS, GZip, or timing middleware won’t see background task execution time. But if middleware adds its own background tasks (possible but uncommon), those queue alongside the route’s tasks.
Graceful shutdown considerations
When your server receives SIGTERM (e.g., during deployment), in-flight background tasks may be interrupted. Uvicorn’s shutdown behavior:
- Active requests get a grace period to complete
- Background tasks attached to those requests may complete if the grace period is long enough
- Tasks from already-completed responses that haven’t started yet are lost
For production deployments, configure Uvicorn’s --timeout-graceful-shutdown and keep background tasks short. Long-running work belongs in a task queue.
The one thing to remember: BackgroundTasks runs sequentially after the response in the same process with zero infrastructure overhead — use it for lightweight fire-and-forget work, pair it with a proper queue for anything that must not be lost, and always handle errors explicitly because failures are silent by default.
See Also
- Python Aiohttp Client Understand Aiohttp Client through a practical analogy so your Python decisions become faster and clearer.
- Python Api Client Design Why building your own API client in Python is like creating a TV remote that only has the buttons you actually need.
- Python Api Documentation Swagger Swagger turns your Python API into an interactive playground where anyone can click buttons to try it out — no coding required.
- Python Api Mocking Responses Why testing with fake API responses is like rehearsing a play with stand-ins before the real actors show up.
- Python Api Pagination Clients Why APIs send data in pages, and how Python handles it — like reading a book one chapter at a time instead of swallowing the whole thing.