Python psutil System Monitoring — Deep Dive

Architecture of a psutil-based monitoring agent

A production monitoring agent built on psutil typically follows a collect-aggregate-report pattern:

  1. Collectors gather raw metrics at regular intervals
  2. Aggregators compute rates, averages, and percentiles
  3. Reporters push data to a time-series database, log file, or alerting system
import psutil
import time
import json
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class SystemSnapshot:
    timestamp: float
    cpu_percent: float
    cpu_per_core: list[float]
    memory_total: int
    memory_available: int
    memory_percent: float
    swap_used: int
    swap_percent: float
    disk_usage_percent: float
    disk_read_bytes: int
    disk_write_bytes: int
    net_bytes_sent: int
    net_bytes_recv: int
    load_avg: Optional[tuple] = None

def collect_snapshot() -> SystemSnapshot:
    cpu = psutil.cpu_percent(interval=None, percpu=False)
    cpu_cores = psutil.cpu_percent(interval=None, percpu=True)
    mem = psutil.virtual_memory()
    swap = psutil.swap_memory()
    disk = psutil.disk_usage('/')
    disk_io = psutil.disk_io_counters()
    net_io = psutil.net_io_counters()

    load = None
    if hasattr(psutil, 'getloadavg'):
        load = psutil.getloadavg()

    return SystemSnapshot(
        timestamp=time.time(),
        cpu_percent=cpu,
        cpu_per_core=cpu_cores,
        memory_total=mem.total,
        memory_available=mem.available,
        memory_percent=mem.percent,
        swap_used=swap.used,
        swap_percent=swap.percent,
        disk_usage_percent=disk.percent,
        disk_read_bytes=disk_io.read_bytes,
        disk_write_bytes=disk_io.write_bytes,
        net_bytes_sent=net_io.bytes_sent,
        net_bytes_recv=net_io.bytes_recv,
        load_avg=load,
    )

Computing rates from counters

Disk I/O and network counters are cumulative since boot. To get meaningful rates, you need two snapshots:

class RateCalculator:
    def __init__(self):
        self._prev: Optional[SystemSnapshot] = None

    def compute_rates(self, current: SystemSnapshot) -> dict:
        if self._prev is None:
            self._prev = current
            return {}

        elapsed = current.timestamp - self._prev.timestamp
        if elapsed <= 0:
            return {}

        rates = {
            'disk_read_bytes_per_sec': (current.disk_read_bytes - self._prev.disk_read_bytes) / elapsed,
            'disk_write_bytes_per_sec': (current.disk_write_bytes - self._prev.disk_write_bytes) / elapsed,
            'net_sent_bytes_per_sec': (current.net_bytes_sent - self._prev.net_bytes_sent) / elapsed,
            'net_recv_bytes_per_sec': (current.net_bytes_recv - self._prev.net_bytes_recv) / elapsed,
        }
        self._prev = current
        return rates

Handling counter wraps

On 32-bit systems, counters can wrap around (overflow back to zero). psutil uses 64-bit integers on most platforms, so wraps are rare, but defensive code handles them:

def safe_rate(current: int, previous: int, elapsed: float) -> float:
    diff = current - previous
    if diff < 0:
        # Counter wrapped — skip this interval
        return 0.0
    return diff / elapsed

Process monitoring at scale

Efficient iteration

psutil.process_iter() with the attrs parameter is the fastest way to scan all processes. It batches system calls and handles NoSuchProcess and AccessDenied exceptions internally:

def find_heavy_processes(cpu_threshold=80, mem_threshold_mb=500):
    heavy = []
    for proc in psutil.process_iter(['pid', 'name', 'cpu_percent',
                                      'memory_info', 'username', 'create_time']):
        info = proc.info
        mem_mb = info['memory_info'].rss / (1024 * 1024) if info['memory_info'] else 0

        if info['cpu_percent'] and info['cpu_percent'] > cpu_threshold:
            heavy.append(('cpu', info))
        elif mem_mb > mem_threshold_mb:
            heavy.append(('memory', info))

    return heavy

Tracking process resource usage over time

For processes that spike intermittently, sample CPU over a window:

class ProcessTracker:
    def __init__(self, pid: int, window_size: int = 10):
        self.process = psutil.Process(pid)
        self.window_size = window_size
        self.cpu_samples: list[float] = []

    def sample(self):
        try:
            cpu = self.process.cpu_percent()
            self.cpu_samples.append(cpu)
            if len(self.cpu_samples) > self.window_size:
                self.cpu_samples.pop(0)
        except psutil.NoSuchProcess:
            return None

    @property
    def avg_cpu(self) -> float:
        if not self.cpu_samples:
            return 0.0
        return sum(self.cpu_samples) / len(self.cpu_samples)

    def memory_info(self) -> dict:
        try:
            mem = self.process.memory_info()
            return {'rss': mem.rss, 'vms': mem.vms}
        except psutil.NoSuchProcess:
            return {}

Platform-specific gotchas

Linux: /proc permissions

Running as a non-root user limits what you can see. psutil.process_iter() will silently skip processes you cannot access, and net_connections() requires root for full connection details. Use AccessDenied handling or run your monitoring agent with appropriate capabilities (CAP_SYS_PTRACE, CAP_NET_ADMIN).

macOS: No disk I/O per-process

macOS does not expose per-process disk I/O counters through its kernel APIs. psutil.Process(pid).io_counters() raises AccessDenied or returns None. System-wide disk_io_counters() works fine.

Windows: WMI fallbacks

Some psutil functions on Windows fall back to WMI queries, which are slower. If you notice performance issues when iterating processes on Windows, reduce the frequency of calls that trigger WMI (like cmdline() or exe() on system processes).

Building an alerting pipeline

from enum import Enum
from dataclasses import dataclass

class Severity(Enum):
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    metric: str
    value: float
    threshold: float
    severity: Severity
    message: str

class AlertEvaluator:
    def __init__(self):
        self.rules = [
            ('cpu_percent', 80, Severity.WARNING, "CPU usage above 80%"),
            ('cpu_percent', 95, Severity.CRITICAL, "CPU usage above 95%"),
            ('memory_percent', 85, Severity.WARNING, "Memory usage above 85%"),
            ('memory_percent', 95, Severity.CRITICAL, "Memory usage above 95%"),
            ('disk_usage_percent', 85, Severity.WARNING, "Disk usage above 85%"),
            ('disk_usage_percent', 95, Severity.CRITICAL, "Disk usage above 95%"),
            ('swap_percent', 50, Severity.WARNING, "Swap usage above 50%"),
        ]

    def evaluate(self, snapshot: SystemSnapshot) -> list[Alert]:
        alerts = []
        snapshot_dict = asdict(snapshot)

        for metric, threshold, severity, message in self.rules:
            value = snapshot_dict.get(metric, 0)
            if value and value > threshold:
                alerts.append(Alert(
                    metric=metric,
                    value=value,
                    threshold=threshold,
                    severity=severity,
                    message=f"{message} (current: {value:.1f}%)",
                ))

        return alerts

Alert deduplication

Without deduplication, you get a flood of identical alerts every collection interval. Track active alerts and only fire on state transitions:

class AlertDeduplicator:
    def __init__(self):
        self.active_alerts: dict[str, Severity] = {}

    def filter_new(self, alerts: list[Alert]) -> list[Alert]:
        new_alerts = []
        current_keys = set()

        for alert in alerts:
            key = f"{alert.metric}:{alert.severity.value}"
            current_keys.add(key)
            if key not in self.active_alerts:
                new_alerts.append(alert)
                self.active_alerts[key] = alert.severity

        # Clear resolved alerts
        resolved = set(self.active_alerts.keys()) - current_keys
        for key in resolved:
            del self.active_alerts[key]

        return new_alerts

Integration with time-series databases

Pushing to Prometheus

Using the prometheus_client library, expose psutil metrics as a Prometheus endpoint:

from prometheus_client import Gauge, start_http_server

cpu_gauge = Gauge('system_cpu_percent', 'CPU usage percentage')
memory_gauge = Gauge('system_memory_percent', 'Memory usage percentage')
disk_gauge = Gauge('system_disk_percent', 'Disk usage percentage')

def update_metrics():
    cpu_gauge.set(psutil.cpu_percent(interval=None))
    memory_gauge.set(psutil.virtual_memory().percent)
    disk_gauge.set(psutil.disk_usage('/').percent)

start_http_server(8000)
while True:
    update_metrics()
    time.sleep(15)

Writing to InfluxDB

from influxdb_client import InfluxDBClient, Point

def write_snapshot(client, bucket, snapshot: SystemSnapshot):
    point = (
        Point("system_metrics")
        .field("cpu_percent", snapshot.cpu_percent)
        .field("memory_percent", snapshot.memory_percent)
        .field("disk_percent", snapshot.disk_usage_percent)
        .field("swap_percent", snapshot.swap_percent)
        .time(int(snapshot.timestamp * 1e9))
    )
    write_api = client.write_api()
    write_api.write(bucket=bucket, record=point)

Performance considerations

Collection interval tuning

  • 1-second intervals: Useful for real-time dashboards but generates significant data volume
  • 15-second intervals: Good balance for most monitoring use cases
  • 60-second intervals: Suitable for capacity planning and trend analysis

Memory footprint

psutil itself is lightweight (under 5 MB RSS typically), but storing history in-process can grow. Use a fixed-size deque or write directly to external storage:

from collections import deque
history = deque(maxlen=3600)  # Keep 1 hour at 1-second intervals

Thread safety

psutil functions are thread-safe for reading system-wide metrics. However, psutil.Process objects are not inherently thread-safe — avoid sharing a Process instance across threads. Create new instances or use process_iter() per thread.

Real-world deployment pattern

A production-tested monitoring agent combines all of these pieces:

  1. A collection loop running every 15 seconds
  2. Rate calculation for cumulative counters
  3. Alert evaluation with deduplication
  4. Metric export to Prometheus or InfluxDB
  5. Heavy process scanning every 60 seconds (less frequent due to cost)
  6. Graceful shutdown via signal handling

This approach gives you Datadog-like visibility without a third-party agent — useful for air-gapped environments, cost-sensitive deployments, or custom monitoring requirements that commercial tools do not cover.

One thing to remember: psutil gives you the raw building blocks. The engineering challenge is not reading the metrics — it is deciding what to do with them: choosing the right collection interval, avoiding alert storms, computing meaningful rates, and handling the quirks of each operating system.

pythonmonitoringsystem-administrationdevopsperformance

See Also

  • Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
  • Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
  • Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
  • Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
  • Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.