Python Graceful Shutdown — Deep Dive

Signal Handling Fundamentals

Python’s signal module lets you intercept OS signals, but with an important caveat: signal handlers run on the main thread only. In a multithreaded application, you must coordinate shutdown from the main thread to workers.

import signal
import sys
import threading

shutdown_event = threading.Event()

def handle_sigterm(signum, frame):
    print(f"Received signal {signum}, initiating shutdown...")
    shutdown_event.set()

signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGINT, handle_sigterm)

The shutdown_event acts as a coordination primitive. Worker threads check it periodically:

def worker_loop():
    while not shutdown_event.is_set():
        task = queue.get(timeout=1.0)
        if task is not None:
            process(task)
    # Cleanup when shutdown is signaled
    flush_buffers()

Asyncio Shutdown Pattern

For async applications, the approach differs. The event loop has its own signal handling mechanism that integrates with coroutines:

import asyncio
import signal

class GracefulServer:
    def __init__(self):
        self.active_tasks: set[asyncio.Task] = set()
        self._shutting_down = False

    async def start(self):
        loop = asyncio.get_running_loop()
        loop.add_signal_handler(signal.SIGTERM, self._signal_shutdown)
        loop.add_signal_handler(signal.SIGINT, self._signal_shutdown)
        
        server = await asyncio.start_server(
            self.handle_connection, '0.0.0.0', 8080
        )
        async with server:
            await self._shutdown_event.wait()
            server.close()
            await server.wait_closed()
            await self._drain_tasks(timeout=25.0)

    def _signal_shutdown(self):
        self._shutting_down = True
        self._shutdown_event.set()

    async def _drain_tasks(self, timeout: float):
        if not self.active_tasks:
            return
        print(f"Draining {len(self.active_tasks)} active tasks...")
        done, pending = await asyncio.wait(
            self.active_tasks, timeout=timeout
        )
        for task in pending:
            task.cancel()
        if pending:
            await asyncio.wait(pending, timeout=5.0)
            print(f"Force-cancelled {len(pending)} tasks")

Key detail: add_signal_handler is only available on Unix. On Windows, you need signal.signal() with a different coordination approach.

Framework-Specific Implementation

FastAPI with Uvicorn

FastAPI uses lifespan events for startup/shutdown:

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    db_pool = await create_pool()
    app.state.db = db_pool
    app.state.background_tasks = set()
    yield
    # Shutdown — this runs on SIGTERM
    for task in app.state.background_tasks:
        task.cancel()
    await asyncio.gather(
        *app.state.background_tasks, 
        return_exceptions=True
    )
    await db_pool.close()
    print("All resources released")

app = FastAPI(lifespan=lifespan)

Uvicorn’s --timeout-graceful-shutdown flag (default: None) controls how long workers have to finish. Set it to match your Kubernetes terminationGracePeriodSeconds minus a buffer:

uvicorn app:app --timeout-graceful-shutdown 25

Celery Worker Shutdown

Celery workers need special attention. By default, SIGTERM triggers a “warm shutdown” — the worker finishes its current task, then exits. But long-running tasks can exceed the grace period:

from celery import Celery
from celery.signals import worker_shutting_down

app = Celery('tasks')

@worker_shutting_down.connect
def on_shutdown(sender, sig, how, exitcode, **kwargs):
    """Called when worker begins shutdown sequence."""
    # Flush metrics, close external connections
    metrics_client.flush()
    external_api.close()

@app.task(bind=True, soft_time_limit=120, time_limit=150)
def long_task(self, data):
    for chunk in chunked(data, 100):
        if self.is_aborted():
            # Task was revoked during shutdown
            save_checkpoint(chunk.offset)
            return
        process(chunk)

Setting soft_time_limit raises SoftTimeLimitExceeded, giving your task a chance to checkpoint. The hard time_limit force-kills it.

Kubernetes Integration

Kubernetes sends SIGTERM, waits terminationGracePeriodSeconds (default 30), then sends SIGKILL. But there’s a subtlety: the pod is removed from the Service endpoints concurrently with SIGTERM — meaning traffic might still arrive for a few seconds after shutdown begins.

The solution is a preStop hook that adds a small delay:

containers:
  - name: api
    lifecycle:
      preStop:
        exec:
          command: ["sleep", "5"]
    terminationGracePeriodSeconds: 35

This gives kube-proxy and ingress controllers time to update their routing tables before your app stops accepting connections. Your app’s shutdown timeout should be: terminationGracePeriodSeconds - preStop delay - buffer = 35 - 5 - 5 = 25 seconds.

Connection Pool Draining

Database and HTTP connection pools need explicit shutdown:

import asyncpg
import httpx

class ResourceManager:
    def __init__(self):
        self.db_pool: asyncpg.Pool | None = None
        self.http_client: httpx.AsyncClient | None = None

    async def startup(self):
        self.db_pool = await asyncpg.create_pool(
            dsn="postgresql://...",
            min_size=5, max_size=20
        )
        self.http_client = httpx.AsyncClient(timeout=10.0)

    async def shutdown(self, timeout: float = 10.0):
        errors = []
        if self.http_client:
            try:
                await asyncio.wait_for(
                    self.http_client.aclose(), timeout=timeout/2
                )
            except asyncio.TimeoutError:
                errors.append("HTTP client close timed out")
        
        if self.db_pool:
            try:
                await asyncio.wait_for(
                    self.db_pool.close(), timeout=timeout/2
                )
            except asyncio.TimeoutError:
                self.db_pool.terminate()
                errors.append("DB pool terminated forcefully")
        
        if errors:
            print(f"Shutdown warnings: {errors}")

The pattern: try graceful close with a timeout, fall back to forceful termination.

Testing Shutdown Behavior

You can’t just hope shutdown works — test it explicitly:

import os
import signal
import subprocess
import time

def test_graceful_shutdown():
    proc = subprocess.Popen(
        ["python", "server.py"],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    time.sleep(2)  # Let it start
    
    # Send SIGTERM
    proc.send_signal(signal.SIGTERM)
    
    # Should exit cleanly within 10 seconds
    try:
        stdout, stderr = proc.communicate(timeout=10)
    except subprocess.TimeoutExpired:
        proc.kill()
        raise AssertionError("Server did not shut down within 10 seconds")
    
    assert proc.returncode == 0, f"Non-zero exit: {proc.returncode}"
    assert b"Shutdown complete" in stderr

Common Pitfalls

Daemon threads dying silently. Python daemon threads are killed when the main thread exits — no cleanup, no warning. If your daemon thread is writing to a file, that write is lost. Use non-daemon threads with explicit shutdown coordination instead.

atexit vs signal handlers. atexit handlers run on normal exit but not when the process is killed by a signal (unless your signal handler calls sys.exit()). Always use signal handlers for production shutdown.

Double-signal race. If SIGTERM arrives twice (impatient orchestrator), your handler runs twice. Make it idempotent or use a flag to skip re-entry:

_shutting_down = False

def handle_sigterm(signum, frame):
    global _shutting_down
    if _shutting_down:
        return
    _shutting_down = True
    initiate_shutdown()

One thing to remember: A production shutdown is signal handling + task draining + resource cleanup, all within a strict time budget dictated by your orchestrator. Test it like any other critical path.

pythonproductionreliability

See Also