Python atexit — Deep Dive
How atexit works internally
The atexit module is implemented in C (since Python 3.0) as _atexit and maintains a list of registered callbacks in the interpreter state. During normal shutdown, the interpreter’s Py_FinalizeEx() function calls _atexit_run_exitfuncs(), which iterates the callback list in reverse order.
The Python-level module is thin:
# Simplified view of the atexit module
_callbacks = [] # Actually stored in C interpreter state
def register(func, *args, **kwargs):
_callbacks.append((func, args, kwargs))
return func
def unregister(func):
_callbacks[:] = [(f, a, k) for f, a, k in _callbacks if f is not func]
def _run_exitfuncs():
while _callbacks:
func, args, kwargs = _callbacks.pop()
try:
func(*args, **kwargs)
except Exception:
import traceback
traceback.print_exc()
Shutdown sequence
Python’s shutdown follows a specific order:
- Interpreter calls
atexit._run_exitfuncs() - atexit callbacks execute in LIFO order
- Module globals start getting set to
None(module cleanup) - The garbage collector runs final collection
- C-level finalizers run
This ordering has a critical implication: atexit functions can still access module-level objects because module cleanup happens after atexit runs. But the global state may be partially torn down if other modules have already been cleaned up.
Interaction with signals
SIGTERM and SIGINT
SIGINT (Ctrl+C) raises KeyboardInterrupt, which triggers normal shutdown including atexit. SIGTERM (default kill signal) does NOT trigger atexit by default — Python’s default SIGTERM handler terminates immediately.
To make atexit work with SIGTERM:
import atexit, signal, sys
def handle_sigterm(signo, frame):
sys.exit(128 + signo)
signal.signal(signal.SIGTERM, handle_sigterm)
@atexit.register
def cleanup():
print("Cleaning up before exit")
By converting SIGTERM to sys.exit(), you trigger the normal shutdown path including atexit.
SIGKILL
SIGKILL (kill -9) cannot be caught or handled. The OS terminates the process immediately. No cleanup runs — not atexit, not signal handlers, not finally blocks. Design your system to tolerate this (use write-ahead logs, atomic file operations, etc.).
Multiprocessing and forking
fork()
After os.fork(), the child process inherits the parent’s atexit registry. This is usually wrong — you don’t want the child to run the parent’s cleanup. Reset atexit in child processes:
import atexit, os
@atexit.register
def parent_cleanup():
print("Parent cleanup")
pid = os.fork()
if pid == 0:
# Child process — clear parent's atexit handlers
atexit.unregister(parent_cleanup)
# Register child-specific cleanup
atexit.register(lambda: print("Child cleanup"))
multiprocessing module
The multiprocessing module handles this automatically for Process objects — child processes start with a clean atexit registry when using the spawn or forkserver start methods. The fork method inherits the registry, same as os.fork().
atexit in daemon threads
atexit callbacks run in the main thread during shutdown. If your atexit function needs to interact with daemon threads, those threads may already be terminated:
import atexit, threading
worker_running = True
def worker():
while worker_running:
pass
t = threading.Thread(target=worker, daemon=True)
t.start()
@atexit.register
def stop_worker():
global worker_running
worker_running = False
# t.join() may hang here — daemon threads are killed during shutdown
# Use a timeout: t.join(timeout=2)
Thread safety
The C implementation of atexit uses interpreter-level locks. Registering and unregistering callbacks from multiple threads is safe. However, the callbacks themselves run sequentially in the main thread during shutdown — they are not parallelized.
Production shutdown patterns
Layered cleanup with ordering guarantees
import atexit
class ShutdownManager:
"""Register cleanup functions with explicit priority."""
def __init__(self):
self._handlers = [] # (priority, func, args, kwargs)
atexit.register(self._run)
def register(self, func, *args, priority=0, **kwargs):
self._handlers.append((priority, func, args, kwargs))
def _run(self):
# Lower priority number = runs last (cleaned up last = most fundamental)
for priority, func, args, kwargs in sorted(
self._handlers, key=lambda x: -x[0]
):
try:
func(*args, **kwargs)
except Exception as e:
print(f"Cleanup error ({func.__name__}): {e}")
shutdown = ShutdownManager()
shutdown.register(close_db, priority=10) # runs first
shutdown.register(flush_logs, priority=5) # runs second
shutdown.register(save_metrics, priority=1) # runs last
Idempotent cleanup
Cleanup functions should be safe to call multiple times — a signal handler might trigger cleanup before normal shutdown does:
import atexit, signal, sys
_cleaned_up = False
def cleanup():
global _cleaned_up
if _cleaned_up:
return
_cleaned_up = True
# actual cleanup work
print("Cleaning up (once)")
atexit.register(cleanup)
def signal_handler(signo, frame):
cleanup() # may run before atexit
sys.exit(128 + signo)
signal.signal(signal.SIGTERM, signal_handler)
Timeout-protected cleanup
Cleanup that takes too long can prevent shutdown. Add timeouts:
import atexit, signal
class TimeoutError(Exception):
pass
def cleanup_with_timeout(func, timeout=5):
def handler(signo, frame):
raise TimeoutError(f"Cleanup timed out after {timeout}s")
old_handler = signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
try:
func()
except TimeoutError:
print(f"Warning: {func.__name__} timed out")
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)
@atexit.register
def safe_cleanup():
cleanup_with_timeout(flush_data, timeout=10)
cleanup_with_timeout(close_connections, timeout=5)
Note: signal.alarm only works on Unix and only in the main thread.
Coordination with context managers
For resources that have both a with-based lifecycle and need program-level cleanup:
import atexit
class ConnectionPool:
_instances = []
def __init__(self, dsn):
self.dsn = dsn
self.connections = []
ConnectionPool._instances.append(self)
def close(self):
for conn in self.connections:
conn.close()
self.connections.clear()
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
@classmethod
def _atexit_cleanup(cls):
for pool in cls._instances:
pool.close()
atexit.register(ConnectionPool._atexit_cleanup)
atexit vs. alternatives
| Mechanism | Scope | Guaranteed? | Use case |
|---|---|---|---|
atexit | Process lifetime | On normal exit | Program-level cleanup |
try/finally | Block scope | Always (except kill) | Local resource cleanup |
with statement | Block scope | Always (except kill) | Resource management |
__del__ | Object lifetime | Not guaranteed | Last-resort cleanup |
| Signal handlers | Signal receipt | Depends on signal | Graceful shutdown |
systemd ExecStop | Service lifetime | On service stop | System service cleanup |
atexit fills the gap between block-scoped cleanup (with/finally) and external process management (systemd). It’s the right tool when cleanup must happen at program end, not at block exit.
One thing to remember
atexit is reliable for normal shutdown but fragile against violent termination. Combine it with signal handlers for SIGTERM, make cleanup idempotent, add timeouts for blocking operations, and never rely on it as your only data-safety mechanism — use write-ahead patterns for truly critical data.
See Also
- Python Bisect Sorted Lists How Python's bisect module finds things in sorted lists the way you'd find a word in a dictionary — by jumping to the middle.
- Python Contextlib How Python's contextlib module makes the 'with' statement work for anything, not just files.
- Python Copy Module Why copying data in Python isn't as simple as it sounds, and how the copy module prevents sneaky bugs.
- Python Dataclass Field Metadata How Python dataclass fields can carry hidden notes — like sticky notes on a filing cabinet that tools read automatically.
- Python Datetime Handling Why dealing with dates and times in Python is trickier than it sounds — and how the datetime module tames the chaos