Python Disk Usage Monitoring — Deep Dive

Building a production disk monitoring agent

A complete disk monitoring solution needs more than periodic percentage checks. It needs filesystem enumeration, growth tracking, anomaly detection, and automated remediation.

Filesystem discovery and filtering

import psutil
from dataclasses import dataclass

# Filesystem types to skip (virtual, network, snapshot, etc.)
SKIP_FSTYPES = {
    'tmpfs', 'devtmpfs', 'squashfs', 'overlay', 'proc',
    'sysfs', 'devpts', 'cgroup', 'cgroup2', 'autofs',
    'fuse.snapfuse', 'nsfs', 'tracefs', 'debugfs',
}

@dataclass
class FilesystemInfo:
    device: str
    mountpoint: str
    fstype: str
    total_bytes: int
    used_bytes: int
    free_bytes: int
    percent_used: float
    inode_total: int
    inode_used: int
    inode_percent: float

def discover_filesystems() -> list[FilesystemInfo]:
    """Discover all real filesystems, skipping virtual ones."""
    filesystems = []
    seen_devices = set()

    for part in psutil.disk_partitions(all=False):
        if part.fstype in SKIP_FSTYPES:
            continue
        if part.device in seen_devices:
            continue
        seen_devices.add(part.device)

        try:
            usage = psutil.disk_usage(part.mountpoint)
            import os
            stat = os.statvfs(part.mountpoint)
            inode_total = stat.f_files
            inode_used = inode_total - stat.f_favail
            inode_pct = (inode_used / inode_total * 100) if inode_total > 0 else 0

            filesystems.append(FilesystemInfo(
                device=part.device,
                mountpoint=part.mountpoint,
                fstype=part.fstype,
                total_bytes=usage.total,
                used_bytes=usage.used,
                free_bytes=usage.free,
                percent_used=usage.percent,
                inode_total=inode_total,
                inode_used=inode_used,
                inode_percent=inode_pct,
            ))
        except (PermissionError, OSError):
            continue

    return filesystems

Fast directory size computation

Walking a filesystem to compute directory sizes is expensive. For large filesystems (millions of files), a naive rglob('*') can take minutes. Optimized approaches:

Using os.scandir for speed

import os
from collections import defaultdict

def fast_dir_sizes(root: str, max_depth: int = 2) -> dict[str, int]:
    """Compute directory sizes using os.scandir (faster than pathlib)."""
    sizes: dict[str, int] = defaultdict(int)

    def walk(path: str, depth: int):
        try:
            with os.scandir(path) as entries:
                for entry in entries:
                    try:
                        if entry.is_file(follow_symlinks=False):
                            size = entry.stat(follow_symlinks=False).st_size
                            # Attribute size to all parent directories up to root
                            current = path
                            while current.startswith(root):
                                sizes[current] += size
                                parent = os.path.dirname(current)
                                if parent == current:
                                    break
                                current = parent
                        elif entry.is_dir(follow_symlinks=False) and depth < max_depth:
                            walk(entry.path, depth + 1)
                    except (PermissionError, OSError):
                        continue
        except (PermissionError, OSError):
            return

    walk(root, 0)
    return dict(sizes)

Using du as a subprocess for very large trees

Sometimes shelling out to du is faster because it uses optimized C code:

import subprocess

def du_top_dirs(path: str, count: int = 20) -> list[tuple[str, int]]:
    """Use system du for fast directory sizing."""
    result = subprocess.run(
        ['du', '-x', '--max-depth=1', '-b', path],
        capture_output=True, text=True, timeout=300
    )
    entries = []
    for line in result.stdout.strip().split('\n'):
        if not line:
            continue
        parts = line.split('\t', 1)
        if len(parts) == 2:
            size = int(parts[0])
            dir_path = parts[1]
            if dir_path != path:  # Skip the root total
                entries.append((dir_path, size))

    return sorted(entries, key=lambda x: x[1], reverse=True)[:count]

Predictive alerting with linear regression

Simple growth rate calculations can be noisy (a big log dump one day skews the average). Linear regression on historical data gives smoother predictions:

import json
import time
from pathlib import Path

class DiskPredictor:
    def __init__(self, history_dir: str = '/var/lib/disk-monitor'):
        self.history_dir = Path(history_dir)
        self.history_dir.mkdir(parents=True, exist_ok=True)

    def record(self, mountpoint: str, used_bytes: int, total_bytes: int):
        safe_name = mountpoint.replace('/', '_') or '_root'
        history_file = self.history_dir / f'{safe_name}.jsonl'

        entry = {
            't': time.time(),
            'used': used_bytes,
            'total': total_bytes,
        }

        with open(history_file, 'a') as f:
            f.write(json.dumps(entry) + '\n')

    def predict_full_date(self, mountpoint: str, lookback_days: int = 14) -> dict:
        safe_name = mountpoint.replace('/', '_') or '_root'
        history_file = self.history_dir / f'{safe_name}.jsonl'

        if not history_file.exists():
            return {'prediction': None, 'reason': 'no history'}

        cutoff = time.time() - (lookback_days * 86400)
        points = []
        for line in history_file.read_text().strip().split('\n'):
            entry = json.loads(line)
            if entry['t'] >= cutoff:
                points.append(entry)

        if len(points) < 10:
            return {'prediction': None, 'reason': 'insufficient data'}

        # Simple linear regression
        n = len(points)
        sum_t = sum(p['t'] for p in points)
        sum_used = sum(p['used'] for p in points)
        sum_t2 = sum(p['t'] ** 2 for p in points)
        sum_t_used = sum(p['t'] * p['used'] for p in points)

        denominator = n * sum_t2 - sum_t ** 2
        if denominator == 0:
            return {'prediction': None, 'reason': 'no variance'}

        slope = (n * sum_t_used - sum_t * sum_used) / denominator  # bytes per second

        if slope <= 0:
            return {
                'prediction': None,
                'reason': 'disk usage is stable or decreasing',
                'growth_gb_per_day': slope * 86400 / (1024**3),
            }

        total = points[-1]['total']
        current_used = points[-1]['used']
        remaining = total - current_used
        seconds_until_full = remaining / slope
        days_until_full = seconds_until_full / 86400

        return {
            'prediction': time.time() + seconds_until_full,
            'days_until_full': days_until_full,
            'growth_gb_per_day': slope * 86400 / (1024**3),
            'confidence': 'low' if len(points) < 50 else 'medium' if len(points) < 200 else 'high',
        }

Automated cleanup pipelines

When monitoring detects a disk approaching capacity, automated cleanup can prevent outages:

from pathlib import Path
from dataclasses import dataclass
import shutil
import os
import time

@dataclass
class CleanupRule:
    path: str
    pattern: str
    max_age_days: int
    description: str
    dry_run: bool = False

class DiskCleaner:
    def __init__(self, rules: list[CleanupRule]):
        self.rules = rules

    def run(self, min_free_gb: float = 10.0) -> dict:
        """Run cleanup rules if free space is below threshold."""
        usage = shutil.disk_usage('/')
        free_gb = usage.free / (1024**3)

        if free_gb >= min_free_gb:
            return {'action': 'none', 'free_gb': free_gb}

        results = []
        total_freed = 0

        for rule in self.rules:
            freed = self._apply_rule(rule)
            total_freed += freed
            results.append({
                'rule': rule.description,
                'freed_bytes': freed,
                'freed_gb': freed / (1024**3),
            })

            # Re-check after each rule
            usage = shutil.disk_usage('/')
            if usage.free / (1024**3) >= min_free_gb:
                break

        return {
            'action': 'cleanup',
            'rules_applied': results,
            'total_freed_gb': total_freed / (1024**3),
            'free_gb_after': shutil.disk_usage('/').free / (1024**3),
        }

    def _apply_rule(self, rule: CleanupRule) -> int:
        freed = 0
        cutoff = time.time() - (rule.max_age_days * 86400)

        for filepath in Path(rule.path).glob(rule.pattern):
            try:
                if filepath.stat().st_mtime < cutoff:
                    size = filepath.stat().st_size
                    if not rule.dry_run:
                        filepath.unlink()
                    freed += size
            except (PermissionError, OSError):
                continue

        return freed

# Example rules, ordered from safest to most aggressive
cleanup_rules = [
    CleanupRule('/tmp', '**/*', max_age_days=3, description='Temp files > 3 days'),
    CleanupRule('/var/log', '**/*.gz', max_age_days=14, description='Compressed logs > 14 days'),
    CleanupRule('/var/cache/apt/archives', '*.deb', max_age_days=7, description='APT cache > 7 days'),
    CleanupRule('/home', '**/.cache/**/*', max_age_days=7, description='User caches > 7 days'),
]

Quota monitoring

For multi-user systems, monitoring per-user or per-directory quotas:

import subprocess
import re

def get_user_quotas() -> list[dict]:
    """Parse quota information for all users."""
    result = subprocess.run(
        ['repquota', '-a', '-p'],
        capture_output=True, text=True
    )

    quotas = []
    for line in result.stdout.split('\n'):
        match = re.match(
            r'^(\S+)\s+[+-]+\s+(\d+)\s+(\d+)\s+(\d+)', line
        )
        if match:
            user = match.group(1)
            used_kb = int(match.group(2))
            soft_kb = int(match.group(3))
            hard_kb = int(match.group(4))

            if hard_kb > 0:
                percent = used_kb / hard_kb * 100
                quotas.append({
                    'user': user,
                    'used_gb': used_kb / (1024**2),
                    'limit_gb': hard_kb / (1024**2),
                    'percent': percent,
                })

    return sorted(quotas, key=lambda q: q['percent'], reverse=True)

Monitoring disk I/O alongside space

Disk health is not just about space. I/O latency and throughput matter too:

import psutil
import time

class DiskIOMonitor:
    def __init__(self):
        self._prev = psutil.disk_io_counters(perdisk=True)
        self._prev_time = time.time()

    def sample(self) -> dict[str, dict]:
        current = psutil.disk_io_counters(perdisk=True)
        current_time = time.time()
        elapsed = current_time - self._prev_time

        rates = {}
        for disk, counters in current.items():
            if disk in self._prev:
                prev = self._prev[disk]
                rates[disk] = {
                    'read_mb_per_sec': (counters.read_bytes - prev.read_bytes) / elapsed / (1024**2),
                    'write_mb_per_sec': (counters.write_bytes - prev.write_bytes) / elapsed / (1024**2),
                    'read_iops': (counters.read_count - prev.read_count) / elapsed,
                    'write_iops': (counters.write_count - prev.write_count) / elapsed,
                    'avg_read_ms': self._safe_latency(
                        counters.read_time - prev.read_time,
                        counters.read_count - prev.read_count
                    ),
                    'avg_write_ms': self._safe_latency(
                        counters.write_time - prev.write_time,
                        counters.write_count - prev.write_count
                    ),
                }

        self._prev = current
        self._prev_time = current_time
        return rates

    @staticmethod
    def _safe_latency(time_ms: int, count: int) -> float:
        return time_ms / count if count > 0 else 0.0

Putting it all together

A complete monitoring agent combines these components:

  1. Filesystem discovery that finds all real filesystems
  2. Space and inode checks with configurable thresholds
  3. Growth prediction using historical data and regression
  4. Directory analysis to identify space consumers
  5. I/O monitoring to detect performance degradation
  6. Automated cleanup triggered by threshold breaches
  7. Alerting through email, Slack, or PagerDuty

The collection loop typically runs every 5-15 minutes, with directory size analysis running less frequently (every few hours) due to its cost. Alerts use deduplication to avoid sending the same warning every collection interval.

One thing to remember: Production disk monitoring is a pipeline — discover filesystems, check space and inodes, track growth trends for predictions, identify large directories when space is tight, monitor I/O performance, automate cleanup, and alert with enough context for operators to act quickly.

pythonmonitoringsystem-administrationstoragedevops

See Also