Python Graceful Shutdown — Deep Dive
Signal Handling Fundamentals
Python’s signal module lets you intercept OS signals, but with an important caveat: signal handlers run on the main thread only. In a multithreaded application, you must coordinate shutdown from the main thread to workers.
import signal
import sys
import threading
shutdown_event = threading.Event()
def handle_sigterm(signum, frame):
print(f"Received signal {signum}, initiating shutdown...")
shutdown_event.set()
signal.signal(signal.SIGTERM, handle_sigterm)
signal.signal(signal.SIGINT, handle_sigterm)
The shutdown_event acts as a coordination primitive. Worker threads check it periodically:
def worker_loop():
while not shutdown_event.is_set():
task = queue.get(timeout=1.0)
if task is not None:
process(task)
# Cleanup when shutdown is signaled
flush_buffers()
Asyncio Shutdown Pattern
For async applications, the approach differs. The event loop has its own signal handling mechanism that integrates with coroutines:
import asyncio
import signal
class GracefulServer:
def __init__(self):
self.active_tasks: set[asyncio.Task] = set()
self._shutting_down = False
async def start(self):
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, self._signal_shutdown)
loop.add_signal_handler(signal.SIGINT, self._signal_shutdown)
server = await asyncio.start_server(
self.handle_connection, '0.0.0.0', 8080
)
async with server:
await self._shutdown_event.wait()
server.close()
await server.wait_closed()
await self._drain_tasks(timeout=25.0)
def _signal_shutdown(self):
self._shutting_down = True
self._shutdown_event.set()
async def _drain_tasks(self, timeout: float):
if not self.active_tasks:
return
print(f"Draining {len(self.active_tasks)} active tasks...")
done, pending = await asyncio.wait(
self.active_tasks, timeout=timeout
)
for task in pending:
task.cancel()
if pending:
await asyncio.wait(pending, timeout=5.0)
print(f"Force-cancelled {len(pending)} tasks")
Key detail: add_signal_handler is only available on Unix. On Windows, you need signal.signal() with a different coordination approach.
Framework-Specific Implementation
FastAPI with Uvicorn
FastAPI uses lifespan events for startup/shutdown:
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
db_pool = await create_pool()
app.state.db = db_pool
app.state.background_tasks = set()
yield
# Shutdown — this runs on SIGTERM
for task in app.state.background_tasks:
task.cancel()
await asyncio.gather(
*app.state.background_tasks,
return_exceptions=True
)
await db_pool.close()
print("All resources released")
app = FastAPI(lifespan=lifespan)
Uvicorn’s --timeout-graceful-shutdown flag (default: None) controls how long workers have to finish. Set it to match your Kubernetes terminationGracePeriodSeconds minus a buffer:
uvicorn app:app --timeout-graceful-shutdown 25
Celery Worker Shutdown
Celery workers need special attention. By default, SIGTERM triggers a “warm shutdown” — the worker finishes its current task, then exits. But long-running tasks can exceed the grace period:
from celery import Celery
from celery.signals import worker_shutting_down
app = Celery('tasks')
@worker_shutting_down.connect
def on_shutdown(sender, sig, how, exitcode, **kwargs):
"""Called when worker begins shutdown sequence."""
# Flush metrics, close external connections
metrics_client.flush()
external_api.close()
@app.task(bind=True, soft_time_limit=120, time_limit=150)
def long_task(self, data):
for chunk in chunked(data, 100):
if self.is_aborted():
# Task was revoked during shutdown
save_checkpoint(chunk.offset)
return
process(chunk)
Setting soft_time_limit raises SoftTimeLimitExceeded, giving your task a chance to checkpoint. The hard time_limit force-kills it.
Kubernetes Integration
Kubernetes sends SIGTERM, waits terminationGracePeriodSeconds (default 30), then sends SIGKILL. But there’s a subtlety: the pod is removed from the Service endpoints concurrently with SIGTERM — meaning traffic might still arrive for a few seconds after shutdown begins.
The solution is a preStop hook that adds a small delay:
containers:
- name: api
lifecycle:
preStop:
exec:
command: ["sleep", "5"]
terminationGracePeriodSeconds: 35
This gives kube-proxy and ingress controllers time to update their routing tables before your app stops accepting connections. Your app’s shutdown timeout should be: terminationGracePeriodSeconds - preStop delay - buffer = 35 - 5 - 5 = 25 seconds.
Connection Pool Draining
Database and HTTP connection pools need explicit shutdown:
import asyncpg
import httpx
class ResourceManager:
def __init__(self):
self.db_pool: asyncpg.Pool | None = None
self.http_client: httpx.AsyncClient | None = None
async def startup(self):
self.db_pool = await asyncpg.create_pool(
dsn="postgresql://...",
min_size=5, max_size=20
)
self.http_client = httpx.AsyncClient(timeout=10.0)
async def shutdown(self, timeout: float = 10.0):
errors = []
if self.http_client:
try:
await asyncio.wait_for(
self.http_client.aclose(), timeout=timeout/2
)
except asyncio.TimeoutError:
errors.append("HTTP client close timed out")
if self.db_pool:
try:
await asyncio.wait_for(
self.db_pool.close(), timeout=timeout/2
)
except asyncio.TimeoutError:
self.db_pool.terminate()
errors.append("DB pool terminated forcefully")
if errors:
print(f"Shutdown warnings: {errors}")
The pattern: try graceful close with a timeout, fall back to forceful termination.
Testing Shutdown Behavior
You can’t just hope shutdown works — test it explicitly:
import os
import signal
import subprocess
import time
def test_graceful_shutdown():
proc = subprocess.Popen(
["python", "server.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(2) # Let it start
# Send SIGTERM
proc.send_signal(signal.SIGTERM)
# Should exit cleanly within 10 seconds
try:
stdout, stderr = proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
raise AssertionError("Server did not shut down within 10 seconds")
assert proc.returncode == 0, f"Non-zero exit: {proc.returncode}"
assert b"Shutdown complete" in stderr
Common Pitfalls
Daemon threads dying silently. Python daemon threads are killed when the main thread exits — no cleanup, no warning. If your daemon thread is writing to a file, that write is lost. Use non-daemon threads with explicit shutdown coordination instead.
atexit vs signal handlers. atexit handlers run on normal exit but not when the process is killed by a signal (unless your signal handler calls sys.exit()). Always use signal handlers for production shutdown.
Double-signal race. If SIGTERM arrives twice (impatient orchestrator), your handler runs twice. Make it idempotent or use a flag to skip re-entry:
_shutting_down = False
def handle_sigterm(signum, frame):
global _shutting_down
if _shutting_down:
return
_shutting_down = True
initiate_shutdown()
One thing to remember: A production shutdown is signal handling + task draining + resource cleanup, all within a strict time budget dictated by your orchestrator. Test it like any other critical path.
See Also
- Python Ab Testing Framework How tech companies test two versions of something to see which one wins — explained with a lemonade stand experiment.
- Python Configuration Hierarchy How your Python app decides which settings to use — explained like layers of clothing on a cold day.
- Python Feature Flag Strategies How developers turn features on and off without redeploying — explained with a TV remote control analogy.
- Python Health Check Patterns Why your Python app needs regular check-ups — explained like a doctor's visit for software.
- Python Readiness Liveness Probes The two questions every cloud platform asks your Python app — explained with a school attendance analogy.