Python Process Management — Deep Dive
Building a process supervisor
A process supervisor ensures that managed processes stay running. If a process crashes, the supervisor restarts it. If it exceeds resource limits, the supervisor kills and replaces it.
import subprocess
import psutil
import time
import signal
import sys
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
class ProcessState(Enum):
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
FAILED = "failed"
@dataclass
class ManagedProcess:
name: str
command: list[str]
max_memory_mb: float = 0 # 0 = no limit
max_restarts: int = 5
restart_delay: float = 2.0
backoff_factor: float = 2.0
max_backoff: float = 60.0
# Runtime state
process: Optional[subprocess.Popen] = field(default=None, repr=False)
state: ProcessState = ProcessState.STOPPED
restart_count: int = 0
last_start: float = 0
current_delay: float = 0
class ProcessSupervisor:
def __init__(self):
self.managed: dict[str, ManagedProcess] = {}
self._running = True
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def register(self, mp: ManagedProcess):
self.managed[mp.name] = mp
def _shutdown(self, signum, frame):
print("Supervisor shutting down...")
self._running = False
for mp in self.managed.values():
self._stop_process(mp)
sys.exit(0)
def start_process(self, mp: ManagedProcess):
try:
mp.process = subprocess.Popen(
mp.command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
mp.state = ProcessState.RUNNING
mp.last_start = time.time()
print(f"[{mp.name}] Started (PID {mp.process.pid})")
except Exception as e:
mp.state = ProcessState.FAILED
print(f"[{mp.name}] Failed to start: {e}")
def _stop_process(self, mp: ManagedProcess):
if mp.process and mp.process.poll() is None:
mp.state = ProcessState.STOPPING
mp.process.terminate()
try:
mp.process.wait(timeout=10)
except subprocess.TimeoutExpired:
mp.process.kill()
mp.process.wait()
mp.state = ProcessState.STOPPED
print(f"[{mp.name}] Stopped")
def _check_resources(self, mp: ManagedProcess) -> bool:
"""Returns True if process is within resource limits."""
if mp.max_memory_mb <= 0 or mp.process is None:
return True
try:
proc = psutil.Process(mp.process.pid)
mem_mb = proc.memory_info().rss / (1024 ** 2)
if mem_mb > mp.max_memory_mb:
print(f"[{mp.name}] Memory limit exceeded: "
f"{mem_mb:.1f} MB > {mp.max_memory_mb} MB")
return False
except psutil.NoSuchProcess:
pass
return True
def _handle_restart(self, mp: ManagedProcess):
if mp.restart_count >= mp.max_restarts:
print(f"[{mp.name}] Max restarts reached ({mp.max_restarts}). Giving up.")
mp.state = ProcessState.FAILED
return
mp.current_delay = min(
mp.restart_delay * (mp.backoff_factor ** mp.restart_count),
mp.max_backoff
)
print(f"[{mp.name}] Restarting in {mp.current_delay:.1f}s "
f"(attempt {mp.restart_count + 1}/{mp.max_restarts})")
time.sleep(mp.current_delay)
mp.restart_count += 1
self.start_process(mp)
# Reset restart count if process has been running for a while
if mp.state == ProcessState.RUNNING:
uptime = time.time() - mp.last_start
if uptime > 60: # Running for > 1 minute resets backoff
mp.restart_count = 0
mp.current_delay = mp.restart_delay
def run(self):
"""Main supervisor loop."""
# Start all registered processes
for mp in self.managed.values():
self.start_process(mp)
while self._running:
for mp in self.managed.values():
if mp.state == ProcessState.FAILED:
continue
if mp.process is None:
continue
# Check if process exited
exit_code = mp.process.poll()
if exit_code is not None:
if exit_code == 0:
print(f"[{mp.name}] Exited cleanly")
mp.state = ProcessState.STOPPED
else:
print(f"[{mp.name}] Crashed (exit code {exit_code})")
self._handle_restart(mp)
continue
# Check resource limits
if not self._check_resources(mp):
self._stop_process(mp)
self._handle_restart(mp)
time.sleep(2)
Orphan and zombie process detection
Zombie processes
A zombie process has exited but its parent has not called wait() to collect its exit status. Zombies consume a PID and a small amount of kernel memory:
def find_zombies() -> list[dict]:
zombies = []
for proc in psutil.process_iter(['pid', 'name', 'status', 'ppid']):
if proc.info['status'] == psutil.STATUS_ZOMBIE:
parent_name = "unknown"
try:
parent = psutil.Process(proc.info['ppid'])
parent_name = parent.name()
except psutil.NoSuchProcess:
pass
zombies.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'parent_pid': proc.info['ppid'],
'parent_name': parent_name,
})
return zombies
Fixing zombies requires the parent process to call wait(). Killing the zombie itself does nothing — you need to fix or restart the parent.
Orphan processes
Orphan processes lost their parent (the parent exited). On Linux, orphans are re-parented to PID 1 (init/systemd):
def find_orphans() -> list[dict]:
"""Find processes whose parent is PID 1 but that are not system services."""
orphans = []
system_names = {'systemd', 'init', 'kthreadd', 'rcu_sched'}
for proc in psutil.process_iter(['pid', 'name', 'ppid', 'username', 'create_time']):
info = proc.info
if info['ppid'] == 1 and info['name'] not in system_names:
# Check if it looks like an application process, not a system daemon
try:
p = psutil.Process(info['pid'])
cmdline = ' '.join(p.cmdline())
if any(marker in cmdline for marker in ['python', 'node', 'java', 'ruby']):
orphans.append({
'pid': info['pid'],
'name': info['name'],
'cmdline': cmdline,
'username': info['username'],
'age_hours': (time.time() - info['create_time']) / 3600,
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return orphans
Resource-based process autoscaling
A pool of worker processes that scales based on system load:
class WorkerPool:
def __init__(self, command: list[str], min_workers: int = 2,
max_workers: int = 8, cpu_target: float = 70.0):
self.command = command
self.min_workers = min_workers
self.max_workers = max_workers
self.cpu_target = cpu_target
self.workers: list[subprocess.Popen] = []
def current_count(self) -> int:
self.workers = [w for w in self.workers if w.poll() is None]
return len(self.workers)
def add_worker(self):
if self.current_count() >= self.max_workers:
return
proc = subprocess.Popen(self.command)
self.workers.append(proc)
print(f"Scaled up to {self.current_count()} workers")
def remove_worker(self):
if self.current_count() <= self.min_workers:
return
worker = self.workers.pop()
worker.terminate()
try:
worker.wait(timeout=10)
except subprocess.TimeoutExpired:
worker.kill()
print(f"Scaled down to {self.current_count()} workers")
def adjust(self):
"""Scale based on CPU usage."""
cpu = psutil.cpu_percent(interval=1)
current = self.current_count()
if cpu > self.cpu_target and current < self.max_workers:
self.add_worker()
elif cpu < self.cpu_target * 0.5 and current > self.min_workers:
self.remove_worker()
def shutdown(self):
for w in self.workers:
w.terminate()
for w in self.workers:
try:
w.wait(timeout=10)
except subprocess.TimeoutExpired:
w.kill()
Process namespace and cgroup isolation
For stronger isolation between managed processes, use Linux namespaces and cgroups:
import subprocess
import os
def run_isolated(command: list[str], memory_limit_mb: int = 512,
cpu_percent: int = 100) -> subprocess.Popen:
"""Run a process with cgroup resource limits using systemd-run."""
systemd_cmd = [
'systemd-run', '--scope',
f'--property=MemoryMax={memory_limit_mb}M',
f'--property=CPUQuota={cpu_percent}%',
'--property=TasksMax=64',
'--user',
'--',
] + command
return subprocess.Popen(systemd_cmd)
Process monitoring dashboard data
Collecting data for a process monitoring dashboard:
def system_process_summary() -> dict:
"""Aggregate process statistics for a dashboard."""
total = 0
running = 0
sleeping = 0
zombie = 0
total_rss = 0
top_cpu = []
top_mem = []
for proc in psutil.process_iter(['pid', 'name', 'status',
'cpu_percent', 'memory_info']):
total += 1
info = proc.info
if info['status'] == psutil.STATUS_RUNNING:
running += 1
elif info['status'] == psutil.STATUS_SLEEPING:
sleeping += 1
elif info['status'] == psutil.STATUS_ZOMBIE:
zombie += 1
if info['memory_info']:
rss = info['memory_info'].rss
total_rss += rss
top_mem.append((info['name'], info['pid'], rss))
if info['cpu_percent']:
top_cpu.append((info['name'], info['pid'], info['cpu_percent']))
top_mem.sort(key=lambda x: x[2], reverse=True)
top_cpu.sort(key=lambda x: x[2], reverse=True)
return {
'total_processes': total,
'running': running,
'sleeping': sleeping,
'zombie': zombie,
'total_memory_gb': total_rss / (1024**3),
'top_memory': [
{'name': n, 'pid': p, 'mb': m / (1024**2)}
for n, p, m in top_mem[:10]
],
'top_cpu': [
{'name': n, 'pid': p, 'percent': c}
for n, p, c in top_cpu[:10]
],
}
Graceful shutdown coordination
When shutting down a group of processes, order matters. A web server should stop accepting connections before workers stop processing:
def graceful_shutdown(process_groups: list[list[psutil.Process]],
timeout_per_group: int = 15):
"""Shut down process groups in order, waiting for each group to exit."""
for i, group in enumerate(process_groups):
print(f"Stopping group {i + 1}/{len(process_groups)} "
f"({len(group)} processes)")
# Send SIGTERM to all processes in the group
for proc in group:
try:
proc.terminate()
except psutil.NoSuchProcess:
continue
# Wait for all to exit
gone, alive = psutil.wait_procs(group, timeout=timeout_per_group)
# Force kill stragglers
for proc in alive:
print(f"Force killing PID {proc.pid} ({proc.name()})")
try:
proc.kill()
except psutil.NoSuchProcess:
continue
print("All process groups stopped")
One thing to remember: Real process management goes beyond listing and killing — it involves supervision with exponential backoff, zombie and orphan detection, resource-based scaling, graceful ordered shutdown, and isolation through cgroups. These patterns form the backbone of every production process manager from supervisord to systemd.
See Also
- Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
- Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
- Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
- Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
- Python Psutil System Monitoring How Python's psutil library lets your program check on your computer's health — like a doctor with a stethoscope for your machine.