Python Network Interface Monitoring — Deep Dive
Reading /proc/net for detailed interface statistics
While psutil provides a clean API, Linux’s /proc/net/dev contains additional statistics that psutil does not expose:
from dataclasses import dataclass
@dataclass
class InterfaceStats:
name: str
rx_bytes: int
rx_packets: int
rx_errors: int
rx_drops: int
rx_fifo: int
rx_frame: int
rx_compressed: int
rx_multicast: int
tx_bytes: int
tx_packets: int
tx_errors: int
tx_drops: int
tx_fifo: int
tx_collisions: int
tx_carrier: int
tx_compressed: int
def read_proc_net_dev() -> list[InterfaceStats]:
"""Parse /proc/net/dev for detailed interface statistics."""
interfaces = []
with open('/proc/net/dev') as f:
# Skip header lines
lines = f.readlines()[2:]
for line in lines:
parts = line.strip().split()
name = parts[0].rstrip(':')
values = [int(v) for v in parts[1:]]
interfaces.append(InterfaceStats(
name=name,
rx_bytes=values[0], rx_packets=values[1],
rx_errors=values[2], rx_drops=values[3],
rx_fifo=values[4], rx_frame=values[5],
rx_compressed=values[6], rx_multicast=values[7],
tx_bytes=values[8], tx_packets=values[9],
tx_errors=values[10], tx_drops=values[11],
tx_fifo=values[12], tx_collisions=values[13],
tx_carrier=values[14], tx_compressed=values[15],
))
return interfaces
The additional fields like rx_fifo (FIFO buffer overruns), rx_frame (framing errors), tx_collisions (Ethernet collisions), and tx_carrier (carrier errors) help diagnose specific hardware and protocol issues.
Real-time link event monitoring with Netlink
Polling for link state changes wastes CPU and misses events between checks. Linux’s Netlink socket provides real-time kernel notifications:
import socket
import struct
import os
# Netlink constants
NETLINK_ROUTE = 0
RTMGRP_LINK = 1
RTM_NEWLINK = 16
RTM_DELLINK = 17
IFLA_IFNAME = 3
IFF_UP = 0x1
IFF_RUNNING = 0x40
def monitor_link_events():
"""Listen for real-time network interface events via Netlink."""
sock = socket.socket(
socket.AF_NETLINK,
socket.SOCK_DGRAM,
NETLINK_ROUTE
)
sock.bind((os.getpid(), RTMGRP_LINK))
print("Listening for link events (Ctrl+C to stop)...")
while True:
data = sock.recv(65535)
offset = 0
while offset < len(data):
# Parse nlmsghdr
nlmsg_len, nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid = \
struct.unpack_from('=IHHII', data, offset)
if nlmsg_type in (RTM_NEWLINK, RTM_DELLINK):
# Parse ifinfomsg
ifi_family, _, ifi_type, ifi_index, ifi_flags, ifi_change = \
struct.unpack_from('=BBHiII', data, offset + 16)
is_up = bool(ifi_flags & IFF_UP)
is_running = bool(ifi_flags & IFF_RUNNING)
# Parse attributes for interface name
iface_name = f"index-{ifi_index}"
attr_offset = offset + 32
while attr_offset < offset + nlmsg_len:
attr_len, attr_type = struct.unpack_from('=HH', data, attr_offset)
if attr_type == IFLA_IFNAME:
iface_name = data[attr_offset + 4:attr_offset + attr_len - 1].decode()
break
attr_offset += (attr_len + 3) & ~3 # Align to 4 bytes
event = "NEWLINK" if nlmsg_type == RTM_NEWLINK else "DELLINK"
status = f"UP={'Y' if is_up else 'N'} RUNNING={'Y' if is_running else 'N'}"
print(f"[{event}] {iface_name}: {status}")
offset += (nlmsg_len + 3) & ~3
SNMP-based monitoring for network devices
For monitoring switches, routers, and remote hosts, SNMP provides standardized interface statistics:
from pysnmp.hlapi import (
getCmd, nextCmd, SnmpEngine, CommunityData,
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
)
def get_interface_stats_snmp(host: str, community: str = 'public') -> list[dict]:
"""Fetch interface statistics from a network device via SNMP."""
interfaces = {}
# OIDs for IF-MIB
oids = {
'ifDescr': '1.3.6.1.2.1.2.2.1.2',
'ifOperStatus': '1.3.6.1.2.1.2.2.1.8',
'ifSpeed': '1.3.6.1.2.1.2.2.1.5',
'ifInOctets': '1.3.6.1.2.1.2.2.1.10',
'ifOutOctets': '1.3.6.1.2.1.2.2.1.16',
'ifInErrors': '1.3.6.1.2.1.2.2.1.14',
'ifOutErrors': '1.3.6.1.2.1.2.2.1.20',
}
engine = SnmpEngine()
for oid_name, oid_base in oids.items():
for errorIndication, errorStatus, errorIndex, varBinds in nextCmd(
engine,
CommunityData(community),
UdpTransportTarget((host, 161)),
ContextData(),
ObjectType(ObjectIdentity(oid_base)),
lexicographicMode=False,
):
if errorIndication or errorStatus:
break
for varBind in varBinds:
oid = str(varBind[0])
value = varBind[1]
# Extract interface index from OID
idx = oid.split('.')[-1]
if idx not in interfaces:
interfaces[idx] = {}
interfaces[idx][oid_name] = str(value)
return list(interfaces.values())
Traffic pattern analysis
Bandwidth utilization tracking
import time
from collections import deque
from dataclasses import dataclass, field
@dataclass
class InterfaceBandwidthTracker:
interface: str
capacity_mbps: float
samples: deque = field(default_factory=lambda: deque(maxlen=360)) # 1 hour at 10s
_prev_counters: dict = field(default_factory=dict)
_prev_time: float = 0
def record(self, rx_bytes: int, tx_bytes: int):
now = time.time()
if self._prev_time > 0:
elapsed = now - self._prev_time
rx_mbps = (rx_bytes - self._prev_counters.get('rx', rx_bytes)) * 8 / elapsed / 1_000_000
tx_mbps = (tx_bytes - self._prev_counters.get('tx', tx_bytes)) * 8 / elapsed / 1_000_000
self.samples.append({
'timestamp': now,
'rx_mbps': max(0, rx_mbps),
'tx_mbps': max(0, tx_mbps),
'utilization_pct': max(0, rx_mbps + tx_mbps) / self.capacity_mbps * 100
if self.capacity_mbps > 0 else 0,
})
self._prev_counters = {'rx': rx_bytes, 'tx': tx_bytes}
self._prev_time = now
def avg_utilization(self, last_n: int = 30) -> float:
recent = list(self.samples)[-last_n:]
if not recent:
return 0.0
return sum(s['utilization_pct'] for s in recent) / len(recent)
def peak_utilization(self, last_n: int = 30) -> float:
recent = list(self.samples)[-last_n:]
if not recent:
return 0.0
return max(s['utilization_pct'] for s in recent)
Anomaly detection
Detecting unusual traffic spikes that might indicate DDoS, data exfiltration, or misconfigured services:
class TrafficAnomalyDetector:
def __init__(self, window_size: int = 60, threshold_sigma: float = 3.0):
self.window_size = window_size
self.threshold_sigma = threshold_sigma
self.history: dict[str, deque] = {}
def check(self, interface: str, rx_bps: float, tx_bps: float) -> list[str]:
if interface not in self.history:
self.history[interface] = deque(maxlen=self.window_size)
history = self.history[interface]
total_bps = rx_bps + tx_bps
history.append(total_bps)
if len(history) < self.window_size // 2:
return [] # Not enough data
values = list(history)
mean = sum(values) / len(values)
variance = sum((v - mean) ** 2 for v in values) / len(values)
std_dev = variance ** 0.5
anomalies = []
if std_dev > 0 and total_bps > mean + self.threshold_sigma * std_dev:
anomalies.append(
f"{interface}: Traffic spike detected "
f"({total_bps / 1_000_000:.1f} Mbps, "
f"normal: {mean / 1_000_000:.1f} ± {std_dev / 1_000_000:.1f} Mbps)"
)
# Check for asymmetric traffic (potential exfiltration)
if tx_bps > rx_bps * 10 and tx_bps > 10_000_000: # 10x more out than in, >10 Mbps
anomalies.append(
f"{interface}: Asymmetric traffic "
f"(TX: {tx_bps / 1_000_000:.1f} Mbps, RX: {rx_bps / 1_000_000:.1f} Mbps)"
)
return anomalies
Building a complete network monitoring agent
import psutil
import time
import json
from pathlib import Path
class NetworkMonitoringAgent:
def __init__(self, data_dir: str = '/var/lib/net-monitor'):
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.link_monitor = LinkStateMonitor()
self.anomaly_detector = TrafficAnomalyDetector()
self.bandwidth_trackers: dict[str, InterfaceBandwidthTracker] = {}
self._prev_counters = {}
self._prev_time = 0
def _init_trackers(self):
stats = psutil.net_if_stats()
for iface, stat in stats.items():
if iface not in self.bandwidth_trackers and stat.speed > 0:
self.bandwidth_trackers[iface] = InterfaceBandwidthTracker(
interface=iface,
capacity_mbps=stat.speed,
)
def collect(self) -> dict:
self._init_trackers()
now = time.time()
counters = psutil.net_io_counters(pernic=True)
report = {
'timestamp': now,
'interfaces': {},
'link_changes': [],
'anomalies': [],
'health_issues': [],
}
# Per-interface metrics
for iface, c in counters.items():
elapsed = now - self._prev_time if self._prev_time > 0 else 0
prev = self._prev_counters.get(iface)
rx_bps = (c.bytes_recv - prev.bytes_recv) / elapsed * 8 if prev and elapsed > 0 else 0
tx_bps = (c.bytes_sent - prev.bytes_sent) / elapsed * 8 if prev and elapsed > 0 else 0
report['interfaces'][iface] = {
'rx_mbps': rx_bps / 1_000_000,
'tx_mbps': tx_bps / 1_000_000,
'rx_packets': c.packets_recv,
'tx_packets': c.packets_sent,
'errors': c.errin + c.errout,
'drops': c.dropin + c.dropout,
}
# Update bandwidth tracker
if iface in self.bandwidth_trackers:
self.bandwidth_trackers[iface].record(c.bytes_recv, c.bytes_sent)
# Check for anomalies
if elapsed > 0:
anomalies = self.anomaly_detector.check(iface, rx_bps, tx_bps)
report['anomalies'].extend(anomalies)
# Link state changes
report['link_changes'] = self.link_monitor.check()
# Interface health
report['health_issues'] = check_interface_health()
self._prev_counters = counters
self._prev_time = now
return report
def run(self, interval: float = 10.0):
print(f"Network monitoring agent started (interval: {interval}s)")
while True:
report = self.collect()
# Log anomalies and changes
for anomaly in report['anomalies']:
print(f"⚠️ {anomaly}")
for change in report['link_changes']:
print(f"🔗 {change['interface']}: {change['previous']} → {change['current']}")
# Persist metrics
metrics_file = self.data_dir / f"metrics-{time.strftime('%Y-%m-%d')}.jsonl"
with open(metrics_file, 'a') as f:
f.write(json.dumps(report, default=str) + '\n')
time.sleep(interval)
class LinkStateMonitor:
def __init__(self):
self.previous_states = {
iface: stat.isup
for iface, stat in psutil.net_if_stats().items()
}
def check(self) -> list[dict]:
changes = []
current = psutil.net_if_stats()
for iface, stat in current.items():
prev = self.previous_states.get(iface)
if prev is not None and prev != stat.isup:
changes.append({
'interface': iface,
'previous': 'up' if prev else 'down',
'current': 'up' if stat.isup else 'down',
})
self.previous_states[iface] = stat.isup
return changes
def check_interface_health() -> list[dict]:
issues = []
for iface, c in psutil.net_io_counters(pernic=True).items():
total = c.packets_recv + c.packets_sent
if total == 0:
continue
error_pct = (c.errin + c.errout) / total * 100
drop_pct = (c.dropin + c.dropout) / total * 100
if error_pct > 0.01 or drop_pct > 0.1:
issues.append({
'interface': iface,
'error_pct': error_pct,
'drop_pct': drop_pct,
})
return issues
One thing to remember: Production network monitoring combines multiple techniques — psutil for cross-platform basics, /proc/net/dev for detailed Linux statistics, Netlink sockets for real-time link events, SNMP for remote devices, and statistical analysis for anomaly detection. The goal is not just measuring bandwidth but understanding traffic patterns and detecting problems before users notice them.
See Also
- Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
- Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
- Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
- Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.
- Python Psutil System Monitoring How Python's psutil library lets your program check on your computer's health — like a doctor with a stethoscope for your machine.