Python Connection Draining — Deep Dive

Signal Handling for Graceful Shutdown

Python applications typically receive SIGTERM when the orchestrator wants them to stop. Here’s how to handle it properly:

import asyncio
import signal
import logging
from contextlib import asynccontextmanager

logger = logging.getLogger(__name__)

class GracefulShutdown:
    def __init__(self, grace_period: float = 30.0):
        self.grace_period = grace_period
        self._shutting_down = False
        self._active_requests = 0
        self._shutdown_event = asyncio.Event()

    @property
    def is_shutting_down(self) -> bool:
        return self._shutting_down

    def request_started(self) -> None:
        self._active_requests += 1

    def request_finished(self) -> None:
        self._active_requests -= 1
        if self._shutting_down and self._active_requests <= 0:
            self._shutdown_event.set()

    async def initiate(self, sig: signal.Signals) -> None:
        logger.info(
            "Received %s — starting graceful shutdown "
            "(grace period: %.1fs, active requests: %d)",
            sig.name, self.grace_period, self._active_requests,
        )
        self._shutting_down = True

        if self._active_requests <= 0:
            self._shutdown_event.set()
            return

        try:
            await asyncio.wait_for(
                self._shutdown_event.wait(),
                timeout=self.grace_period,
            )
            logger.info("All requests completed — shutting down cleanly")
        except asyncio.TimeoutError:
            logger.warning(
                "Grace period expired with %d active requests — forcing shutdown",
                self._active_requests,
            )

FastAPI Integration with Lifespan

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager

shutdown_manager = GracefulShutdown(grace_period=30.0)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    loop = asyncio.get_running_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda s=sig: asyncio.create_task(shutdown_manager.initiate(s)),
        )
    logger.info("Application started — signal handlers registered")
    yield
    # Shutdown (after signal received and drain completed)
    logger.info("Application shutdown complete")

app = FastAPI(lifespan=lifespan)

@app.middleware("http")
async def drain_middleware(request: Request, call_next):
    if shutdown_manager.is_shutting_down:
        # Reject new requests during drain
        from starlette.responses import JSONResponse
        return JSONResponse(
            status_code=503,
            content={"error": "Server is shutting down"},
            headers={
                "Connection": "close",
                "Retry-After": "5",
            },
        )

    shutdown_manager.request_started()
    try:
        response = await call_next(request)
        # Tell the client not to reuse this connection
        if shutdown_manager.is_shutting_down:
            response.headers["Connection"] = "close"
        return response
    finally:
        shutdown_manager.request_finished()

Uvicorn Graceful Shutdown Configuration

Uvicorn supports graceful shutdown natively. Key settings:

# programmatic
import uvicorn

uvicorn.run(
    "app:app",
    host="0.0.0.0",
    port=8000,
    timeout_graceful_shutdown=30,  # seconds to wait for in-flight requests
)
# CLI
uvicorn app:app --timeout-graceful-shutdown 30

When Uvicorn receives SIGTERM:

  1. It stops accepting new connections
  2. Waits up to timeout_graceful_shutdown seconds
  3. Cancels remaining tasks
  4. Exits

Gunicorn Worker Draining

For Gunicorn with sync or async workers:

# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
graceful_timeout = 30  # seconds to wait for workers to finish
timeout = 120          # max request time (should be > graceful_timeout)

def on_exit(server):
    """Called just before the master process exits."""
    logger.info("Gunicorn master shutting down")

def worker_exit(server, worker):
    """Called when a worker process exits."""
    logger.info("Worker %s exited", worker.pid)

Gunicorn’s shutdown flow:

  1. Master receives SIGTERM
  2. Sends SIGTERM to all workers
  3. Workers stop accepting new connections
  4. Workers finish in-flight requests (up to graceful_timeout)
  5. Workers that exceed timeout get SIGKILL
  6. Master exits

Kubernetes Integration

Pod Lifecycle and preStop Hook

Kubernetes sends SIGTERM but simultaneously removes the pod from the Service endpoints. There’s a race condition: the load balancer might still send requests after SIGTERM arrives. The preStop hook adds a delay:

apiVersion: apps/v1
kind: Deployment
spec:
  template:
    spec:
      terminationGracePeriodSeconds: 60
      containers:
        - name: api
          lifecycle:
            preStop:
              exec:
                command: ["sh", "-c", "sleep 10"]
          # App gets SIGTERM after preStop completes

The timeline:

  1. Kubernetes marks pod as Terminating
  2. preStop hook runs (10s sleep)
  3. During those 10s, the pod is removed from endpoints
  4. After preStop, SIGTERM is sent to the container
  5. Container drains for up to terminationGracePeriodSeconds - preStop duration

Readiness Probe Integration

When your app starts draining, fail the readiness probe so the load balancer stops sending traffic:

@app.get("/healthz/ready")
async def readiness():
    if shutdown_manager.is_shutting_down:
        return JSONResponse(
            status_code=503,
            content={"status": "draining"},
        )
    return {"status": "ready"}

WebSocket Connection Draining

WebSocket connections need special handling — you can’t just wait for them to finish:

import asyncio
from fastapi import WebSocket, WebSocketDisconnect

active_websockets: set[WebSocket] = set()

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    active_websockets.add(ws)
    try:
        while True:
            if shutdown_manager.is_shutting_down:
                # Tell client to reconnect to a different instance
                await ws.send_json({
                    "type": "server_drain",
                    "message": "Server is restarting. Reconnecting...",
                    "retry_after_ms": 1000,
                })
                await ws.close(code=1012)  # 1012 = Service Restart
                break
            data = await ws.receive_text()
            await ws.send_text(f"Echo: {data}")
    except WebSocketDisconnect:
        pass
    finally:
        active_websockets.discard(ws)

async def drain_websockets():
    """Close all WebSocket connections gracefully."""
    tasks = []
    for ws in list(active_websockets):
        tasks.append(
            asyncio.create_task(
                ws.send_json({
                    "type": "server_drain",
                    "retry_after_ms": 1000,
                })
            )
        )
    await asyncio.gather(*tasks, return_exceptions=True)
    # Give clients a moment to reconnect
    await asyncio.sleep(2)

Database Connection Cleanup

Draining isn’t just about HTTP — database connections need proper cleanup too:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

engine = create_async_engine(
    "postgresql+asyncpg://...",
    pool_size=20,
    pool_pre_ping=True,
)

async def drain_database():
    """Wait for active queries, then close the pool."""
    logger.info("Draining database connections...")
    # dispose() waits for checked-out connections to be returned,
    # then closes all connections in the pool
    await engine.dispose()
    logger.info("Database connections drained")

Testing Drain Behavior

import pytest
import asyncio

@pytest.mark.asyncio
async def test_rejects_new_requests_during_drain():
    manager = GracefulShutdown(grace_period=5.0)
    await manager.initiate(signal.SIGTERM)
    assert manager.is_shutting_down is True

@pytest.mark.asyncio
async def test_waits_for_active_requests():
    manager = GracefulShutdown(grace_period=5.0)

    # Simulate an active request
    manager.request_started()

    # Start drain in background
    drain_task = asyncio.create_task(
        manager.initiate(signal.SIGTERM)
    )

    # Drain should not complete yet
    await asyncio.sleep(0.1)
    assert not drain_task.done()

    # Finish the request
    manager.request_finished()
    await asyncio.sleep(0.1)
    assert drain_task.done()

@pytest.mark.asyncio
async def test_force_shutdown_after_grace_period():
    manager = GracefulShutdown(grace_period=0.5)
    manager.request_started()  # Never finished

    await manager.initiate(signal.SIGTERM)
    # Should complete after grace period even with active request

Common Pitfalls

  1. SIGKILL arrives before drain completes. If terminationGracePeriodSeconds is shorter than your drain time, Kubernetes will SIGKILL your process. Always ensure: terminationGracePeriodSeconds > preStop delay + app drain time.

  2. Load balancer race condition. SIGTERM and endpoint removal happen simultaneously. Without a preStop delay, the load balancer might send requests to a draining instance. The 10-second preStop sleep is a standard workaround.

  3. Background tasks ignored. Draining HTTP requests but forgetting about Celery tasks, cron jobs, or Kafka consumers. Each subsystem needs its own drain logic.

  4. Keep-alive connections stay open. HTTP/1.1 keep-alive connections from the load balancer may stay open even after you stop accepting new requests. Send Connection: close headers during drain to signal clients to disconnect.

  5. Health checks still pass during drain. If your readiness probe returns 200 while draining, the load balancer keeps sending traffic. Fail the readiness probe as soon as drain starts.

One thing to remember: Connection draining is a coordination problem between your load balancer, your application, and your orchestrator. All three must agree on the timeline: stop routing, finish work, then shutdown. Test this under realistic load — a drain that works in dev with zero traffic might fail spectacularly under production load.

pythonreliabilitydeployment

See Also