Python Process Management — Deep Dive

Building a process supervisor

A process supervisor ensures that managed processes stay running. If a process crashes, the supervisor restarts it. If it exceeds resource limits, the supervisor kills and replaces it.

import subprocess
import psutil
import time
import signal
import sys
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class ProcessState(Enum):
    STARTING = "starting"
    RUNNING = "running"
    STOPPING = "stopping"
    STOPPED = "stopped"
    FAILED = "failed"

@dataclass
class ManagedProcess:
    name: str
    command: list[str]
    max_memory_mb: float = 0  # 0 = no limit
    max_restarts: int = 5
    restart_delay: float = 2.0
    backoff_factor: float = 2.0
    max_backoff: float = 60.0

    # Runtime state
    process: Optional[subprocess.Popen] = field(default=None, repr=False)
    state: ProcessState = ProcessState.STOPPED
    restart_count: int = 0
    last_start: float = 0
    current_delay: float = 0

class ProcessSupervisor:
    def __init__(self):
        self.managed: dict[str, ManagedProcess] = {}
        self._running = True
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def register(self, mp: ManagedProcess):
        self.managed[mp.name] = mp

    def _shutdown(self, signum, frame):
        print("Supervisor shutting down...")
        self._running = False
        for mp in self.managed.values():
            self._stop_process(mp)
        sys.exit(0)

    def start_process(self, mp: ManagedProcess):
        try:
            mp.process = subprocess.Popen(
                mp.command,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            mp.state = ProcessState.RUNNING
            mp.last_start = time.time()
            print(f"[{mp.name}] Started (PID {mp.process.pid})")
        except Exception as e:
            mp.state = ProcessState.FAILED
            print(f"[{mp.name}] Failed to start: {e}")

    def _stop_process(self, mp: ManagedProcess):
        if mp.process and mp.process.poll() is None:
            mp.state = ProcessState.STOPPING
            mp.process.terminate()
            try:
                mp.process.wait(timeout=10)
            except subprocess.TimeoutExpired:
                mp.process.kill()
                mp.process.wait()
            mp.state = ProcessState.STOPPED
            print(f"[{mp.name}] Stopped")

    def _check_resources(self, mp: ManagedProcess) -> bool:
        """Returns True if process is within resource limits."""
        if mp.max_memory_mb <= 0 or mp.process is None:
            return True

        try:
            proc = psutil.Process(mp.process.pid)
            mem_mb = proc.memory_info().rss / (1024 ** 2)
            if mem_mb > mp.max_memory_mb:
                print(f"[{mp.name}] Memory limit exceeded: "
                      f"{mem_mb:.1f} MB > {mp.max_memory_mb} MB")
                return False
        except psutil.NoSuchProcess:
            pass

        return True

    def _handle_restart(self, mp: ManagedProcess):
        if mp.restart_count >= mp.max_restarts:
            print(f"[{mp.name}] Max restarts reached ({mp.max_restarts}). Giving up.")
            mp.state = ProcessState.FAILED
            return

        mp.current_delay = min(
            mp.restart_delay * (mp.backoff_factor ** mp.restart_count),
            mp.max_backoff
        )
        print(f"[{mp.name}] Restarting in {mp.current_delay:.1f}s "
              f"(attempt {mp.restart_count + 1}/{mp.max_restarts})")
        time.sleep(mp.current_delay)

        mp.restart_count += 1
        self.start_process(mp)

        # Reset restart count if process has been running for a while
        if mp.state == ProcessState.RUNNING:
            uptime = time.time() - mp.last_start
            if uptime > 60:  # Running for > 1 minute resets backoff
                mp.restart_count = 0
                mp.current_delay = mp.restart_delay

    def run(self):
        """Main supervisor loop."""
        # Start all registered processes
        for mp in self.managed.values():
            self.start_process(mp)

        while self._running:
            for mp in self.managed.values():
                if mp.state == ProcessState.FAILED:
                    continue

                if mp.process is None:
                    continue

                # Check if process exited
                exit_code = mp.process.poll()
                if exit_code is not None:
                    if exit_code == 0:
                        print(f"[{mp.name}] Exited cleanly")
                        mp.state = ProcessState.STOPPED
                    else:
                        print(f"[{mp.name}] Crashed (exit code {exit_code})")
                        self._handle_restart(mp)
                    continue

                # Check resource limits
                if not self._check_resources(mp):
                    self._stop_process(mp)
                    self._handle_restart(mp)

            time.sleep(2)

Orphan and zombie process detection

Zombie processes

A zombie process has exited but its parent has not called wait() to collect its exit status. Zombies consume a PID and a small amount of kernel memory:

def find_zombies() -> list[dict]:
    zombies = []
    for proc in psutil.process_iter(['pid', 'name', 'status', 'ppid']):
        if proc.info['status'] == psutil.STATUS_ZOMBIE:
            parent_name = "unknown"
            try:
                parent = psutil.Process(proc.info['ppid'])
                parent_name = parent.name()
            except psutil.NoSuchProcess:
                pass

            zombies.append({
                'pid': proc.info['pid'],
                'name': proc.info['name'],
                'parent_pid': proc.info['ppid'],
                'parent_name': parent_name,
            })
    return zombies

Fixing zombies requires the parent process to call wait(). Killing the zombie itself does nothing — you need to fix or restart the parent.

Orphan processes

Orphan processes lost their parent (the parent exited). On Linux, orphans are re-parented to PID 1 (init/systemd):

def find_orphans() -> list[dict]:
    """Find processes whose parent is PID 1 but that are not system services."""
    orphans = []
    system_names = {'systemd', 'init', 'kthreadd', 'rcu_sched'}

    for proc in psutil.process_iter(['pid', 'name', 'ppid', 'username', 'create_time']):
        info = proc.info
        if info['ppid'] == 1 and info['name'] not in system_names:
            # Check if it looks like an application process, not a system daemon
            try:
                p = psutil.Process(info['pid'])
                cmdline = ' '.join(p.cmdline())
                if any(marker in cmdline for marker in ['python', 'node', 'java', 'ruby']):
                    orphans.append({
                        'pid': info['pid'],
                        'name': info['name'],
                        'cmdline': cmdline,
                        'username': info['username'],
                        'age_hours': (time.time() - info['create_time']) / 3600,
                    })
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue

    return orphans

Resource-based process autoscaling

A pool of worker processes that scales based on system load:

class WorkerPool:
    def __init__(self, command: list[str], min_workers: int = 2,
                 max_workers: int = 8, cpu_target: float = 70.0):
        self.command = command
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.cpu_target = cpu_target
        self.workers: list[subprocess.Popen] = []

    def current_count(self) -> int:
        self.workers = [w for w in self.workers if w.poll() is None]
        return len(self.workers)

    def add_worker(self):
        if self.current_count() >= self.max_workers:
            return
        proc = subprocess.Popen(self.command)
        self.workers.append(proc)
        print(f"Scaled up to {self.current_count()} workers")

    def remove_worker(self):
        if self.current_count() <= self.min_workers:
            return
        worker = self.workers.pop()
        worker.terminate()
        try:
            worker.wait(timeout=10)
        except subprocess.TimeoutExpired:
            worker.kill()
        print(f"Scaled down to {self.current_count()} workers")

    def adjust(self):
        """Scale based on CPU usage."""
        cpu = psutil.cpu_percent(interval=1)
        current = self.current_count()

        if cpu > self.cpu_target and current < self.max_workers:
            self.add_worker()
        elif cpu < self.cpu_target * 0.5 and current > self.min_workers:
            self.remove_worker()

    def shutdown(self):
        for w in self.workers:
            w.terminate()
        for w in self.workers:
            try:
                w.wait(timeout=10)
            except subprocess.TimeoutExpired:
                w.kill()

Process namespace and cgroup isolation

For stronger isolation between managed processes, use Linux namespaces and cgroups:

import subprocess
import os

def run_isolated(command: list[str], memory_limit_mb: int = 512,
                 cpu_percent: int = 100) -> subprocess.Popen:
    """Run a process with cgroup resource limits using systemd-run."""
    systemd_cmd = [
        'systemd-run', '--scope',
        f'--property=MemoryMax={memory_limit_mb}M',
        f'--property=CPUQuota={cpu_percent}%',
        '--property=TasksMax=64',
        '--user',
        '--',
    ] + command

    return subprocess.Popen(systemd_cmd)

Process monitoring dashboard data

Collecting data for a process monitoring dashboard:

def system_process_summary() -> dict:
    """Aggregate process statistics for a dashboard."""
    total = 0
    running = 0
    sleeping = 0
    zombie = 0
    total_rss = 0
    top_cpu = []
    top_mem = []

    for proc in psutil.process_iter(['pid', 'name', 'status',
                                      'cpu_percent', 'memory_info']):
        total += 1
        info = proc.info

        if info['status'] == psutil.STATUS_RUNNING:
            running += 1
        elif info['status'] == psutil.STATUS_SLEEPING:
            sleeping += 1
        elif info['status'] == psutil.STATUS_ZOMBIE:
            zombie += 1

        if info['memory_info']:
            rss = info['memory_info'].rss
            total_rss += rss
            top_mem.append((info['name'], info['pid'], rss))

        if info['cpu_percent']:
            top_cpu.append((info['name'], info['pid'], info['cpu_percent']))

    top_mem.sort(key=lambda x: x[2], reverse=True)
    top_cpu.sort(key=lambda x: x[2], reverse=True)

    return {
        'total_processes': total,
        'running': running,
        'sleeping': sleeping,
        'zombie': zombie,
        'total_memory_gb': total_rss / (1024**3),
        'top_memory': [
            {'name': n, 'pid': p, 'mb': m / (1024**2)}
            for n, p, m in top_mem[:10]
        ],
        'top_cpu': [
            {'name': n, 'pid': p, 'percent': c}
            for n, p, c in top_cpu[:10]
        ],
    }

Graceful shutdown coordination

When shutting down a group of processes, order matters. A web server should stop accepting connections before workers stop processing:

def graceful_shutdown(process_groups: list[list[psutil.Process]],
                       timeout_per_group: int = 15):
    """Shut down process groups in order, waiting for each group to exit."""
    for i, group in enumerate(process_groups):
        print(f"Stopping group {i + 1}/{len(process_groups)} "
              f"({len(group)} processes)")

        # Send SIGTERM to all processes in the group
        for proc in group:
            try:
                proc.terminate()
            except psutil.NoSuchProcess:
                continue

        # Wait for all to exit
        gone, alive = psutil.wait_procs(group, timeout=timeout_per_group)

        # Force kill stragglers
        for proc in alive:
            print(f"Force killing PID {proc.pid} ({proc.name()})")
            try:
                proc.kill()
            except psutil.NoSuchProcess:
                continue

    print("All process groups stopped")

One thing to remember: Real process management goes beyond listing and killing — it involves supervision with exponential backoff, zombie and orphan detection, resource-based scaling, graceful ordered shutdown, and isolation through cgroups. These patterns form the backbone of every production process manager from supervisord to systemd.

pythonsystem-administrationautomationprocessesdevops

See Also