Python Connection Draining — Deep Dive
Signal Handling for Graceful Shutdown
Python applications typically receive SIGTERM when the orchestrator wants them to stop. Here’s how to handle it properly:
import asyncio
import signal
import logging
from contextlib import asynccontextmanager
logger = logging.getLogger(__name__)
class GracefulShutdown:
def __init__(self, grace_period: float = 30.0):
self.grace_period = grace_period
self._shutting_down = False
self._active_requests = 0
self._shutdown_event = asyncio.Event()
@property
def is_shutting_down(self) -> bool:
return self._shutting_down
def request_started(self) -> None:
self._active_requests += 1
def request_finished(self) -> None:
self._active_requests -= 1
if self._shutting_down and self._active_requests <= 0:
self._shutdown_event.set()
async def initiate(self, sig: signal.Signals) -> None:
logger.info(
"Received %s — starting graceful shutdown "
"(grace period: %.1fs, active requests: %d)",
sig.name, self.grace_period, self._active_requests,
)
self._shutting_down = True
if self._active_requests <= 0:
self._shutdown_event.set()
return
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=self.grace_period,
)
logger.info("All requests completed — shutting down cleanly")
except asyncio.TimeoutError:
logger.warning(
"Grace period expired with %d active requests — forcing shutdown",
self._active_requests,
)
FastAPI Integration with Lifespan
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
shutdown_manager = GracefulShutdown(grace_period=30.0)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: asyncio.create_task(shutdown_manager.initiate(s)),
)
logger.info("Application started — signal handlers registered")
yield
# Shutdown (after signal received and drain completed)
logger.info("Application shutdown complete")
app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def drain_middleware(request: Request, call_next):
if shutdown_manager.is_shutting_down:
# Reject new requests during drain
from starlette.responses import JSONResponse
return JSONResponse(
status_code=503,
content={"error": "Server is shutting down"},
headers={
"Connection": "close",
"Retry-After": "5",
},
)
shutdown_manager.request_started()
try:
response = await call_next(request)
# Tell the client not to reuse this connection
if shutdown_manager.is_shutting_down:
response.headers["Connection"] = "close"
return response
finally:
shutdown_manager.request_finished()
Uvicorn Graceful Shutdown Configuration
Uvicorn supports graceful shutdown natively. Key settings:
# programmatic
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=8000,
timeout_graceful_shutdown=30, # seconds to wait for in-flight requests
)
# CLI
uvicorn app:app --timeout-graceful-shutdown 30
When Uvicorn receives SIGTERM:
- It stops accepting new connections
- Waits up to
timeout_graceful_shutdownseconds - Cancels remaining tasks
- Exits
Gunicorn Worker Draining
For Gunicorn with sync or async workers:
# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
graceful_timeout = 30 # seconds to wait for workers to finish
timeout = 120 # max request time (should be > graceful_timeout)
def on_exit(server):
"""Called just before the master process exits."""
logger.info("Gunicorn master shutting down")
def worker_exit(server, worker):
"""Called when a worker process exits."""
logger.info("Worker %s exited", worker.pid)
Gunicorn’s shutdown flow:
- Master receives SIGTERM
- Sends SIGTERM to all workers
- Workers stop accepting new connections
- Workers finish in-flight requests (up to
graceful_timeout) - Workers that exceed timeout get SIGKILL
- Master exits
Kubernetes Integration
Pod Lifecycle and preStop Hook
Kubernetes sends SIGTERM but simultaneously removes the pod from the Service endpoints. There’s a race condition: the load balancer might still send requests after SIGTERM arrives. The preStop hook adds a delay:
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
terminationGracePeriodSeconds: 60
containers:
- name: api
lifecycle:
preStop:
exec:
command: ["sh", "-c", "sleep 10"]
# App gets SIGTERM after preStop completes
The timeline:
- Kubernetes marks pod as Terminating
preStophook runs (10s sleep)- During those 10s, the pod is removed from endpoints
- After
preStop, SIGTERM is sent to the container - Container drains for up to
terminationGracePeriodSeconds - preStop duration
Readiness Probe Integration
When your app starts draining, fail the readiness probe so the load balancer stops sending traffic:
@app.get("/healthz/ready")
async def readiness():
if shutdown_manager.is_shutting_down:
return JSONResponse(
status_code=503,
content={"status": "draining"},
)
return {"status": "ready"}
WebSocket Connection Draining
WebSocket connections need special handling — you can’t just wait for them to finish:
import asyncio
from fastapi import WebSocket, WebSocketDisconnect
active_websockets: set[WebSocket] = set()
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
active_websockets.add(ws)
try:
while True:
if shutdown_manager.is_shutting_down:
# Tell client to reconnect to a different instance
await ws.send_json({
"type": "server_drain",
"message": "Server is restarting. Reconnecting...",
"retry_after_ms": 1000,
})
await ws.close(code=1012) # 1012 = Service Restart
break
data = await ws.receive_text()
await ws.send_text(f"Echo: {data}")
except WebSocketDisconnect:
pass
finally:
active_websockets.discard(ws)
async def drain_websockets():
"""Close all WebSocket connections gracefully."""
tasks = []
for ws in list(active_websockets):
tasks.append(
asyncio.create_task(
ws.send_json({
"type": "server_drain",
"retry_after_ms": 1000,
})
)
)
await asyncio.gather(*tasks, return_exceptions=True)
# Give clients a moment to reconnect
await asyncio.sleep(2)
Database Connection Cleanup
Draining isn’t just about HTTP — database connections need proper cleanup too:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=20,
pool_pre_ping=True,
)
async def drain_database():
"""Wait for active queries, then close the pool."""
logger.info("Draining database connections...")
# dispose() waits for checked-out connections to be returned,
# then closes all connections in the pool
await engine.dispose()
logger.info("Database connections drained")
Testing Drain Behavior
import pytest
import asyncio
@pytest.mark.asyncio
async def test_rejects_new_requests_during_drain():
manager = GracefulShutdown(grace_period=5.0)
await manager.initiate(signal.SIGTERM)
assert manager.is_shutting_down is True
@pytest.mark.asyncio
async def test_waits_for_active_requests():
manager = GracefulShutdown(grace_period=5.0)
# Simulate an active request
manager.request_started()
# Start drain in background
drain_task = asyncio.create_task(
manager.initiate(signal.SIGTERM)
)
# Drain should not complete yet
await asyncio.sleep(0.1)
assert not drain_task.done()
# Finish the request
manager.request_finished()
await asyncio.sleep(0.1)
assert drain_task.done()
@pytest.mark.asyncio
async def test_force_shutdown_after_grace_period():
manager = GracefulShutdown(grace_period=0.5)
manager.request_started() # Never finished
await manager.initiate(signal.SIGTERM)
# Should complete after grace period even with active request
Common Pitfalls
-
SIGKILL arrives before drain completes. If
terminationGracePeriodSecondsis shorter than your drain time, Kubernetes will SIGKILL your process. Always ensure:terminationGracePeriodSeconds > preStop delay + app drain time. -
Load balancer race condition. SIGTERM and endpoint removal happen simultaneously. Without a
preStopdelay, the load balancer might send requests to a draining instance. The 10-secondpreStopsleep is a standard workaround. -
Background tasks ignored. Draining HTTP requests but forgetting about Celery tasks, cron jobs, or Kafka consumers. Each subsystem needs its own drain logic.
-
Keep-alive connections stay open. HTTP/1.1 keep-alive connections from the load balancer may stay open even after you stop accepting new requests. Send
Connection: closeheaders during drain to signal clients to disconnect. -
Health checks still pass during drain. If your readiness probe returns 200 while draining, the load balancer keeps sending traffic. Fail the readiness probe as soon as drain starts.
One thing to remember: Connection draining is a coordination problem between your load balancer, your application, and your orchestrator. All three must agree on the timeline: stop routing, finish work, then shutdown. Test this under realistic load — a drain that works in dev with zero traffic might fail spectacularly under production load.
See Also
- Python Aggregate Pattern Why grouping related objects under a single gatekeeper prevents data chaos in your Python application.
- Python Bounded Contexts Why the same word means different things in different parts of your code — and why that is perfectly fine.
- Python Bulkhead Pattern Why smart Python apps put walls between their parts — like a ship that stays afloat even with a hole in the hull.
- Python Circuit Breaker Pattern How a circuit breaker saves your app from crashing — explained with a home electrical fuse analogy.
- Python Clean Architecture Why your Python app should look like an onion — and how that saves you from painful rewrites.