Python Network Interface Monitoring — Deep Dive

Reading /proc/net for detailed interface statistics

While psutil provides a clean API, Linux’s /proc/net/dev contains additional statistics that psutil does not expose:

from dataclasses import dataclass

@dataclass
class InterfaceStats:
    name: str
    rx_bytes: int
    rx_packets: int
    rx_errors: int
    rx_drops: int
    rx_fifo: int
    rx_frame: int
    rx_compressed: int
    rx_multicast: int
    tx_bytes: int
    tx_packets: int
    tx_errors: int
    tx_drops: int
    tx_fifo: int
    tx_collisions: int
    tx_carrier: int
    tx_compressed: int

def read_proc_net_dev() -> list[InterfaceStats]:
    """Parse /proc/net/dev for detailed interface statistics."""
    interfaces = []
    with open('/proc/net/dev') as f:
        # Skip header lines
        lines = f.readlines()[2:]

    for line in lines:
        parts = line.strip().split()
        name = parts[0].rstrip(':')
        values = [int(v) for v in parts[1:]]

        interfaces.append(InterfaceStats(
            name=name,
            rx_bytes=values[0], rx_packets=values[1],
            rx_errors=values[2], rx_drops=values[3],
            rx_fifo=values[4], rx_frame=values[5],
            rx_compressed=values[6], rx_multicast=values[7],
            tx_bytes=values[8], tx_packets=values[9],
            tx_errors=values[10], tx_drops=values[11],
            tx_fifo=values[12], tx_collisions=values[13],
            tx_carrier=values[14], tx_compressed=values[15],
        ))

    return interfaces

The additional fields like rx_fifo (FIFO buffer overruns), rx_frame (framing errors), tx_collisions (Ethernet collisions), and tx_carrier (carrier errors) help diagnose specific hardware and protocol issues.

Polling for link state changes wastes CPU and misses events between checks. Linux’s Netlink socket provides real-time kernel notifications:

import socket
import struct
import os

# Netlink constants
NETLINK_ROUTE = 0
RTMGRP_LINK = 1
RTM_NEWLINK = 16
RTM_DELLINK = 17
IFLA_IFNAME = 3
IFF_UP = 0x1
IFF_RUNNING = 0x40

def monitor_link_events():
    """Listen for real-time network interface events via Netlink."""
    sock = socket.socket(
        socket.AF_NETLINK,
        socket.SOCK_DGRAM,
        NETLINK_ROUTE
    )
    sock.bind((os.getpid(), RTMGRP_LINK))

    print("Listening for link events (Ctrl+C to stop)...")

    while True:
        data = sock.recv(65535)
        offset = 0

        while offset < len(data):
            # Parse nlmsghdr
            nlmsg_len, nlmsg_type, nlmsg_flags, nlmsg_seq, nlmsg_pid = \
                struct.unpack_from('=IHHII', data, offset)

            if nlmsg_type in (RTM_NEWLINK, RTM_DELLINK):
                # Parse ifinfomsg
                ifi_family, _, ifi_type, ifi_index, ifi_flags, ifi_change = \
                    struct.unpack_from('=BBHiII', data, offset + 16)

                is_up = bool(ifi_flags & IFF_UP)
                is_running = bool(ifi_flags & IFF_RUNNING)

                # Parse attributes for interface name
                iface_name = f"index-{ifi_index}"
                attr_offset = offset + 32
                while attr_offset < offset + nlmsg_len:
                    attr_len, attr_type = struct.unpack_from('=HH', data, attr_offset)
                    if attr_type == IFLA_IFNAME:
                        iface_name = data[attr_offset + 4:attr_offset + attr_len - 1].decode()
                        break
                    attr_offset += (attr_len + 3) & ~3  # Align to 4 bytes

                event = "NEWLINK" if nlmsg_type == RTM_NEWLINK else "DELLINK"
                status = f"UP={'Y' if is_up else 'N'} RUNNING={'Y' if is_running else 'N'}"
                print(f"[{event}] {iface_name}: {status}")

            offset += (nlmsg_len + 3) & ~3

SNMP-based monitoring for network devices

For monitoring switches, routers, and remote hosts, SNMP provides standardized interface statistics:

from pysnmp.hlapi import (
    getCmd, nextCmd, SnmpEngine, CommunityData,
    UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
)

def get_interface_stats_snmp(host: str, community: str = 'public') -> list[dict]:
    """Fetch interface statistics from a network device via SNMP."""
    interfaces = {}

    # OIDs for IF-MIB
    oids = {
        'ifDescr': '1.3.6.1.2.1.2.2.1.2',
        'ifOperStatus': '1.3.6.1.2.1.2.2.1.8',
        'ifSpeed': '1.3.6.1.2.1.2.2.1.5',
        'ifInOctets': '1.3.6.1.2.1.2.2.1.10',
        'ifOutOctets': '1.3.6.1.2.1.2.2.1.16',
        'ifInErrors': '1.3.6.1.2.1.2.2.1.14',
        'ifOutErrors': '1.3.6.1.2.1.2.2.1.20',
    }

    engine = SnmpEngine()

    for oid_name, oid_base in oids.items():
        for errorIndication, errorStatus, errorIndex, varBinds in nextCmd(
            engine,
            CommunityData(community),
            UdpTransportTarget((host, 161)),
            ContextData(),
            ObjectType(ObjectIdentity(oid_base)),
            lexicographicMode=False,
        ):
            if errorIndication or errorStatus:
                break

            for varBind in varBinds:
                oid = str(varBind[0])
                value = varBind[1]
                # Extract interface index from OID
                idx = oid.split('.')[-1]

                if idx not in interfaces:
                    interfaces[idx] = {}
                interfaces[idx][oid_name] = str(value)

    return list(interfaces.values())

Traffic pattern analysis

Bandwidth utilization tracking

import time
from collections import deque
from dataclasses import dataclass, field

@dataclass
class InterfaceBandwidthTracker:
    interface: str
    capacity_mbps: float
    samples: deque = field(default_factory=lambda: deque(maxlen=360))  # 1 hour at 10s
    _prev_counters: dict = field(default_factory=dict)
    _prev_time: float = 0

    def record(self, rx_bytes: int, tx_bytes: int):
        now = time.time()
        if self._prev_time > 0:
            elapsed = now - self._prev_time
            rx_mbps = (rx_bytes - self._prev_counters.get('rx', rx_bytes)) * 8 / elapsed / 1_000_000
            tx_mbps = (tx_bytes - self._prev_counters.get('tx', tx_bytes)) * 8 / elapsed / 1_000_000

            self.samples.append({
                'timestamp': now,
                'rx_mbps': max(0, rx_mbps),
                'tx_mbps': max(0, tx_mbps),
                'utilization_pct': max(0, rx_mbps + tx_mbps) / self.capacity_mbps * 100
                    if self.capacity_mbps > 0 else 0,
            })

        self._prev_counters = {'rx': rx_bytes, 'tx': tx_bytes}
        self._prev_time = now

    def avg_utilization(self, last_n: int = 30) -> float:
        recent = list(self.samples)[-last_n:]
        if not recent:
            return 0.0
        return sum(s['utilization_pct'] for s in recent) / len(recent)

    def peak_utilization(self, last_n: int = 30) -> float:
        recent = list(self.samples)[-last_n:]
        if not recent:
            return 0.0
        return max(s['utilization_pct'] for s in recent)

Anomaly detection

Detecting unusual traffic spikes that might indicate DDoS, data exfiltration, or misconfigured services:

class TrafficAnomalyDetector:
    def __init__(self, window_size: int = 60, threshold_sigma: float = 3.0):
        self.window_size = window_size
        self.threshold_sigma = threshold_sigma
        self.history: dict[str, deque] = {}

    def check(self, interface: str, rx_bps: float, tx_bps: float) -> list[str]:
        if interface not in self.history:
            self.history[interface] = deque(maxlen=self.window_size)

        history = self.history[interface]
        total_bps = rx_bps + tx_bps
        history.append(total_bps)

        if len(history) < self.window_size // 2:
            return []  # Not enough data

        values = list(history)
        mean = sum(values) / len(values)
        variance = sum((v - mean) ** 2 for v in values) / len(values)
        std_dev = variance ** 0.5

        anomalies = []

        if std_dev > 0 and total_bps > mean + self.threshold_sigma * std_dev:
            anomalies.append(
                f"{interface}: Traffic spike detected "
                f"({total_bps / 1_000_000:.1f} Mbps, "
                f"normal: {mean / 1_000_000:.1f} ± {std_dev / 1_000_000:.1f} Mbps)"
            )

        # Check for asymmetric traffic (potential exfiltration)
        if tx_bps > rx_bps * 10 and tx_bps > 10_000_000:  # 10x more out than in, >10 Mbps
            anomalies.append(
                f"{interface}: Asymmetric traffic "
                f"(TX: {tx_bps / 1_000_000:.1f} Mbps, RX: {rx_bps / 1_000_000:.1f} Mbps)"
            )

        return anomalies

Building a complete network monitoring agent

import psutil
import time
import json
from pathlib import Path

class NetworkMonitoringAgent:
    def __init__(self, data_dir: str = '/var/lib/net-monitor'):
        self.data_dir = Path(data_dir)
        self.data_dir.mkdir(parents=True, exist_ok=True)

        self.link_monitor = LinkStateMonitor()
        self.anomaly_detector = TrafficAnomalyDetector()
        self.bandwidth_trackers: dict[str, InterfaceBandwidthTracker] = {}
        self._prev_counters = {}
        self._prev_time = 0

    def _init_trackers(self):
        stats = psutil.net_if_stats()
        for iface, stat in stats.items():
            if iface not in self.bandwidth_trackers and stat.speed > 0:
                self.bandwidth_trackers[iface] = InterfaceBandwidthTracker(
                    interface=iface,
                    capacity_mbps=stat.speed,
                )

    def collect(self) -> dict:
        self._init_trackers()
        now = time.time()
        counters = psutil.net_io_counters(pernic=True)
        report = {
            'timestamp': now,
            'interfaces': {},
            'link_changes': [],
            'anomalies': [],
            'health_issues': [],
        }

        # Per-interface metrics
        for iface, c in counters.items():
            elapsed = now - self._prev_time if self._prev_time > 0 else 0
            prev = self._prev_counters.get(iface)

            rx_bps = (c.bytes_recv - prev.bytes_recv) / elapsed * 8 if prev and elapsed > 0 else 0
            tx_bps = (c.bytes_sent - prev.bytes_sent) / elapsed * 8 if prev and elapsed > 0 else 0

            report['interfaces'][iface] = {
                'rx_mbps': rx_bps / 1_000_000,
                'tx_mbps': tx_bps / 1_000_000,
                'rx_packets': c.packets_recv,
                'tx_packets': c.packets_sent,
                'errors': c.errin + c.errout,
                'drops': c.dropin + c.dropout,
            }

            # Update bandwidth tracker
            if iface in self.bandwidth_trackers:
                self.bandwidth_trackers[iface].record(c.bytes_recv, c.bytes_sent)

            # Check for anomalies
            if elapsed > 0:
                anomalies = self.anomaly_detector.check(iface, rx_bps, tx_bps)
                report['anomalies'].extend(anomalies)

        # Link state changes
        report['link_changes'] = self.link_monitor.check()

        # Interface health
        report['health_issues'] = check_interface_health()

        self._prev_counters = counters
        self._prev_time = now

        return report

    def run(self, interval: float = 10.0):
        print(f"Network monitoring agent started (interval: {interval}s)")
        while True:
            report = self.collect()

            # Log anomalies and changes
            for anomaly in report['anomalies']:
                print(f"⚠️  {anomaly}")
            for change in report['link_changes']:
                print(f"🔗 {change['interface']}: {change['previous']}{change['current']}")

            # Persist metrics
            metrics_file = self.data_dir / f"metrics-{time.strftime('%Y-%m-%d')}.jsonl"
            with open(metrics_file, 'a') as f:
                f.write(json.dumps(report, default=str) + '\n')

            time.sleep(interval)


class LinkStateMonitor:
    def __init__(self):
        self.previous_states = {
            iface: stat.isup
            for iface, stat in psutil.net_if_stats().items()
        }

    def check(self) -> list[dict]:
        changes = []
        current = psutil.net_if_stats()
        for iface, stat in current.items():
            prev = self.previous_states.get(iface)
            if prev is not None and prev != stat.isup:
                changes.append({
                    'interface': iface,
                    'previous': 'up' if prev else 'down',
                    'current': 'up' if stat.isup else 'down',
                })
            self.previous_states[iface] = stat.isup
        return changes


def check_interface_health() -> list[dict]:
    issues = []
    for iface, c in psutil.net_io_counters(pernic=True).items():
        total = c.packets_recv + c.packets_sent
        if total == 0:
            continue
        error_pct = (c.errin + c.errout) / total * 100
        drop_pct = (c.dropin + c.dropout) / total * 100
        if error_pct > 0.01 or drop_pct > 0.1:
            issues.append({
                'interface': iface,
                'error_pct': error_pct,
                'drop_pct': drop_pct,
            })
    return issues

One thing to remember: Production network monitoring combines multiple techniques — psutil for cross-platform basics, /proc/net/dev for detailed Linux statistics, Netlink sockets for real-time link events, SNMP for remote devices, and statistical analysis for anomaly detection. The goal is not just measuring bandwidth but understanding traffic patterns and detecting problems before users notice them.

pythonnetworkingmonitoringsystem-administrationdevops

See Also

  • Python Crontab Management How Python can set up automatic timers on your computer — like programming an alarm clock that runs tasks instead of waking you up.
  • Python Disk Usage Monitoring How Python helps you keep an eye on your computer's storage — like a fuel gauge that warns you before you run out of space.
  • Python Log Rotation Management Why your program's diary needs page limits — and how Python keeps log files from eating all your disk space.
  • Python Process Management How Python lets you see and control all the programs running on your computer — like being the manager of a busy office.
  • Python Psutil System Monitoring How Python's psutil library lets your program check on your computer's health — like a doctor with a stethoscope for your machine.