Python Disk Usage Monitoring — Deep Dive
Building a production disk monitoring agent
A complete disk monitoring solution needs more than periodic percentage checks. It needs filesystem enumeration, growth tracking, anomaly detection, and automated remediation.
Filesystem discovery and filtering
import psutil
from dataclasses import dataclass
# Filesystem types to skip (virtual, network, snapshot, etc.)
SKIP_FSTYPES = {
'tmpfs', 'devtmpfs', 'squashfs', 'overlay', 'proc',
'sysfs', 'devpts', 'cgroup', 'cgroup2', 'autofs',
'fuse.snapfuse', 'nsfs', 'tracefs', 'debugfs',
}
@dataclass
class FilesystemInfo:
device: str
mountpoint: str
fstype: str
total_bytes: int
used_bytes: int
free_bytes: int
percent_used: float
inode_total: int
inode_used: int
inode_percent: float
def discover_filesystems() -> list[FilesystemInfo]:
"""Discover all real filesystems, skipping virtual ones."""
filesystems = []
seen_devices = set()
for part in psutil.disk_partitions(all=False):
if part.fstype in SKIP_FSTYPES:
continue
if part.device in seen_devices:
continue
seen_devices.add(part.device)
try:
usage = psutil.disk_usage(part.mountpoint)
import os
stat = os.statvfs(part.mountpoint)
inode_total = stat.f_files
inode_used = inode_total - stat.f_favail
inode_pct = (inode_used / inode_total * 100) if inode_total > 0 else 0
filesystems.append(FilesystemInfo(
device=part.device,
mountpoint=part.mountpoint,
fstype=part.fstype,
total_bytes=usage.total,
used_bytes=usage.used,
free_bytes=usage.free,
percent_used=usage.percent,
inode_total=inode_total,
inode_used=inode_used,
inode_percent=inode_pct,
))
except (PermissionError, OSError):
continue
return filesystems
Fast directory size computation
Walking a filesystem to compute directory sizes is expensive. For large filesystems (millions of files), a naive rglob('*') can take minutes. Optimized approaches:
Using os.scandir for speed
import os
from collections import defaultdict
def fast_dir_sizes(root: str, max_depth: int = 2) -> dict[str, int]:
"""Compute directory sizes using os.scandir (faster than pathlib)."""
sizes: dict[str, int] = defaultdict(int)
def walk(path: str, depth: int):
try:
with os.scandir(path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
size = entry.stat(follow_symlinks=False).st_size
# Attribute size to all parent directories up to root
current = path
while current.startswith(root):
sizes[current] += size
parent = os.path.dirname(current)
if parent == current:
break
current = parent
elif entry.is_dir(follow_symlinks=False) and depth < max_depth:
walk(entry.path, depth + 1)
except (PermissionError, OSError):
continue
except (PermissionError, OSError):
return
walk(root, 0)
return dict(sizes)
Using du as a subprocess for very large trees
Sometimes shelling out to du is faster because it uses optimized C code:
import subprocess
def du_top_dirs(path: str, count: int = 20) -> list[tuple[str, int]]:
"""Use system du for fast directory sizing."""
result = subprocess.run(
['du', '-x', '--max-depth=1', '-b', path],
capture_output=True, text=True, timeout=300
)
entries = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split('\t', 1)
if len(parts) == 2:
size = int(parts[0])
dir_path = parts[1]
if dir_path != path: # Skip the root total
entries.append((dir_path, size))
return sorted(entries, key=lambda x: x[1], reverse=True)[:count]
Predictive alerting with linear regression
Simple growth rate calculations can be noisy (a big log dump one day skews the average). Linear regression on historical data gives smoother predictions:
import json
import time
from pathlib import Path
class DiskPredictor:
def __init__(self, history_dir: str = '/var/lib/disk-monitor'):
self.history_dir = Path(history_dir)
self.history_dir.mkdir(parents=True, exist_ok=True)
def record(self, mountpoint: str, used_bytes: int, total_bytes: int):
safe_name = mountpoint.replace('/', '_') or '_root'
history_file = self.history_dir / f'{safe_name}.jsonl'
entry = {
't': time.time(),
'used': used_bytes,
'total': total_bytes,
}
with open(history_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
def predict_full_date(self, mountpoint: str, lookback_days: int = 14) -> dict:
safe_name = mountpoint.replace('/', '_') or '_root'
history_file = self.history_dir / f'{safe_name}.jsonl'
if not history_file.exists():
return {'prediction': None, 'reason': 'no history'}
cutoff = time.time() - (lookback_days * 86400)
points = []
for line in history_file.read_text().strip().split('\n'):
entry = json.loads(line)
if entry['t'] >= cutoff:
points.append(entry)
if len(points) < 10:
return {'prediction': None, 'reason': 'insufficient data'}
# Simple linear regression
n = len(points)
sum_t = sum(p['t'] for p in points)
sum_used = sum(p['used'] for p in points)
sum_t2 = sum(p['t'] ** 2 for p in points)
sum_t_used = sum(p['t'] * p['used'] for p in points)
denominator = n * sum_t2 - sum_t ** 2
if denominator == 0:
return {'prediction': None, 'reason': 'no variance'}
slope = (n * sum_t_used - sum_t * sum_used) / denominator # bytes per second
if slope <= 0:
return {
'prediction': None,
'reason': 'disk usage is stable or decreasing',
'growth_gb_per_day': slope * 86400 / (1024**3),
}
total = points[-1]['total']
current_used = points[-1]['used']
remaining = total - current_used
seconds_until_full = remaining / slope
days_until_full = seconds_until_full / 86400
return {
'prediction': time.time() + seconds_until_full,
'days_until_full': days_until_full,
'growth_gb_per_day': slope * 86400 / (1024**3),
'confidence': 'low' if len(points) < 50 else 'medium' if len(points) < 200 else 'high',
}
Automated cleanup pipelines
When monitoring detects a disk approaching capacity, automated cleanup can prevent outages:
from pathlib import Path
from dataclasses import dataclass
import shutil
import os
import time
@dataclass
class CleanupRule:
path: str
pattern: str
max_age_days: int
description: str
dry_run: bool = False
class DiskCleaner:
def __init__(self, rules: list[CleanupRule]):
self.rules = rules
def run(self, min_free_gb: float = 10.0) -> dict:
"""Run cleanup rules if free space is below threshold."""
usage = shutil.disk_usage('/')
free_gb = usage.free / (1024**3)
if free_gb >= min_free_gb:
return {'action': 'none', 'free_gb': free_gb}
results = []
total_freed = 0
for rule in self.rules:
freed = self._apply_rule(rule)
total_freed += freed
results.append({
'rule': rule.description,
'freed_bytes': freed,
'freed_gb': freed / (1024**3),
})
# Re-check after each rule
usage = shutil.disk_usage('/')
if usage.free / (1024**3) >= min_free_gb:
break
return {
'action': 'cleanup',
'rules_applied': results,
'total_freed_gb': total_freed / (1024**3),
'free_gb_after': shutil.disk_usage('/').free / (1024**3),
}
def _apply_rule(self, rule: CleanupRule) -> int:
freed = 0
cutoff = time.time() - (rule.max_age_days * 86400)
for filepath in Path(rule.path).glob(rule.pattern):
try:
if filepath.stat().st_mtime < cutoff:
size = filepath.stat().st_size
if not rule.dry_run:
filepath.unlink()
freed += size
except (PermissionError, OSError):
continue
return freed
# Example rules, ordered from safest to most aggressive
cleanup_rules = [
CleanupRule('/tmp', '**/*', max_age_days=3, description='Temp files > 3 days'),
CleanupRule('/var/log', '**/*.gz', max_age_days=14, description='Compressed logs > 14 days'),
CleanupRule('/var/cache/apt/archives', '*.deb', max_age_days=7, description='APT cache > 7 days'),
CleanupRule('/home', '**/.cache/**/*', max_age_days=7, description='User caches > 7 days'),
]
Quota monitoring
For multi-user systems, monitoring per-user or per-directory quotas:
import subprocess
import re
def get_user_quotas() -> list[dict]:
"""Parse quota information for all users."""
result = subprocess.run(
['repquota', '-a', '-p'],
capture_output=True, text=True
)
quotas = []
for line in result.stdout.split('\n'):
match = re.match(
r'^(\S+)\s+[+-]+\s+(\d+)\s+(\d+)\s+(\d+)', line
)
if match:
user = match.group(1)
used_kb = int(match.group(2))
soft_kb = int(match.group(3))
hard_kb = int(match.group(4))
if hard_kb > 0:
percent = used_kb / hard_kb * 100
quotas.append({
'user': user,
'used_gb': used_kb / (1024**2),
'limit_gb': hard_kb / (1024**2),
'percent': percent,
})
return sorted(quotas, key=lambda q: q['percent'], reverse=True)
Monitoring disk I/O alongside space
Disk health is not just about space. I/O latency and throughput matter too:
import psutil
import time
class DiskIOMonitor:
def __init__(self):
self._prev = psutil.disk_io_counters(perdisk=True)
self._prev_time = time.time()
def sample(self) -> dict[str, dict]:
current = psutil.disk_io_counters(perdisk=True)
current_time = time.time()
elapsed = current_time - self._prev_time
rates = {}
for disk, counters in current.items():
if disk in self._prev:
prev = self._prev[disk]
rates[disk] = {
'read_mb_per_sec': (counters.read_bytes - prev.read_bytes) / elapsed / (1024**2),
'write_mb_per_sec': (counters.write_bytes - prev.write_bytes) / elapsed / (1024**2),
'read_iops': (counters.read_count - prev.read_count) / elapsed,
'write_iops': (counters.write_count - prev.write_count) / elapsed,
'avg_read_ms': self._safe_latency(
counters.read_time - prev.read_time,
counters.read_count - prev.read_count
),
'avg_write_ms': self._safe_latency(
counters.write_time - prev.write_time,
counters.write_count - prev.write_count
),
}
self._prev = current
self._prev_time = current_time
return rates
@staticmethod
def _safe_latency(time_ms: int, count: int) -> float:
return time_ms / count if count > 0 else 0.0
Putting it all together
A complete monitoring agent combines these components:
- Filesystem discovery that finds all real filesystems
- Space and inode checks with configurable thresholds
- Growth prediction using historical data and regression
- Directory analysis to identify space consumers
- I/O monitoring to detect performance degradation
- Automated cleanup triggered by threshold breaches
- Alerting through email, Slack, or PagerDuty
The collection loop typically runs every 5-15 minutes, with directory size analysis running less frequently (every few hours) due to its cost. Alerts use deduplication to avoid sending the same warning every collection interval.
One thing to remember: Production disk monitoring is a pipeline — discover filesystems, check space and inodes, track growth trends for predictions, identify large directories when space is tight, monitor I/O performance, automate cleanup, and alert with enough context for operators to act quickly.
See Also
- Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
- Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
- Python Network Interface Monitoring How Python watches your computer's network connections — like having a traffic counter on every road leading to your house.
- Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.
- Python Psutil System Monitoring How Python's psutil library lets your program check on your computer's health — like a doctor with a stethoscope for your machine.