Python dpkt Packet Parsing — Deep Dive

System-level framing

Network traffic analysis at scale demands a parser that can process millions of packets without becoming a bottleneck. dpkt fills this role in the Python ecosystem — it is a minimal, C-performance-oriented packet decoding library that converts raw capture bytes into structured Python objects. Where Scapy trades speed for flexibility, dpkt trades flexibility for throughput.

This deep dive covers advanced parsing patterns, flow reconstruction, protocol extraction, and building analysis pipelines.

Installation

pip install dpkt

Reading pcap and pcapng files

import dpkt

# Standard pcap
with open("capture.pcap", "rb") as f:
    reader = dpkt.pcap.Reader(f)
    for ts, buf in reader:
        print(f"Timestamp: {ts}, Size: {len(buf)}")

# pcapng format (newer, supports metadata)
with open("capture.pcapng", "rb") as f:
    reader = dpkt.pcapng.Reader(f)
    for ts, buf in reader:
        print(f"Timestamp: {ts}, Size: {len(buf)}")

The reader is a lazy iterator — it reads one packet at a time, making it memory-efficient for large files.

Complete packet decoding pipeline

import dpkt
import socket
from datetime import datetime, timezone

def decode_packet(ts: float, buf: bytes) -> dict | None:
    """Decode a single packet into a structured dict."""
    eth = dpkt.ethernet.Ethernet(buf)

    if not isinstance(eth.data, (dpkt.ip.IP, dpkt.ip6.IP6)):
        return None

    ip = eth.data
    is_v6 = isinstance(ip, dpkt.ip6.IP6)

    result = {
        "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc),
        "src_ip": socket.inet_ntop(socket.AF_INET6 if is_v6 else socket.AF_INET, ip.src),
        "dst_ip": socket.inet_ntop(socket.AF_INET6 if is_v6 else socket.AF_INET, ip.dst),
        "ip_len": ip.len if not is_v6 else ip.plen,
        "protocol": None,
        "src_port": None,
        "dst_port": None,
    }

    transport = ip.data
    if isinstance(transport, dpkt.tcp.TCP):
        result["protocol"] = "TCP"
        result["src_port"] = transport.sport
        result["dst_port"] = transport.dport
        result["tcp_flags"] = transport.flags
        result["payload_len"] = len(transport.data)
    elif isinstance(transport, dpkt.udp.UDP):
        result["protocol"] = "UDP"
        result["src_port"] = transport.sport
        result["dst_port"] = transport.dport
        result["payload_len"] = len(transport.data)
    elif isinstance(transport, dpkt.icmp.ICMP):
        result["protocol"] = "ICMP"
        result["icmp_type"] = transport.type
        result["icmp_code"] = transport.code

    return result

# Usage
with open("capture.pcap", "rb") as f:
    for ts, buf in dpkt.pcap.Reader(f):
        pkt = decode_packet(ts, buf)
        if pkt:
            print(pkt)

HTTP request and response extraction

import dpkt

def extract_http(pcap_path: str) -> list[dict]:
    """Extract HTTP requests and responses from a pcap file."""
    results = []

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            eth = dpkt.ethernet.Ethernet(buf)
            if not isinstance(eth.data, dpkt.ip.IP):
                continue

            ip = eth.data
            if not isinstance(ip.data, dpkt.tcp.TCP):
                continue

            tcp = ip.data
            if len(tcp.data) == 0:
                continue

            # Try to parse as HTTP request
            try:
                http = dpkt.http.Request(tcp.data)
                results.append({
                    "type": "request",
                    "method": http.method,
                    "uri": http.uri,
                    "host": http.headers.get("host", ""),
                    "user_agent": http.headers.get("user-agent", ""),
                    "src": socket.inet_ntoa(ip.src),
                })
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                pass

            # Try to parse as HTTP response
            try:
                http = dpkt.http.Response(tcp.data)
                results.append({
                    "type": "response",
                    "status": http.status,
                    "reason": http.reason,
                    "content_type": http.headers.get("content-type", ""),
                    "content_length": http.headers.get("content-length", ""),
                })
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                pass

    return results

DNS query and response parsing

import dpkt
import socket

def extract_dns(pcap_path: str) -> list[dict]:
    """Extract DNS queries and responses."""
    results = []

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            eth = dpkt.ethernet.Ethernet(buf)
            if not isinstance(eth.data, dpkt.ip.IP):
                continue

            ip = eth.data
            if not isinstance(ip.data, dpkt.udp.UDP):
                continue

            udp = ip.data
            if udp.sport != 53 and udp.dport != 53:
                continue

            try:
                dns = dpkt.dns.DNS(udp.data)
            except dpkt.dpkt.UnpackError:
                continue

            entry = {
                "timestamp": ts,
                "src": socket.inet_ntoa(ip.src),
                "dst": socket.inet_ntoa(ip.dst),
                "id": dns.id,
                "is_response": bool(dns.qr),
                "queries": [],
                "answers": [],
            }

            for q in dns.qd:
                entry["queries"].append({
                    "name": q.name,
                    "type": q.type,
                })

            for a in dns.an:
                answer = {"name": a.name, "type": a.type, "ttl": a.ttl}
                if a.type == dpkt.dns.DNS_A:
                    answer["ip"] = socket.inet_ntoa(a.rdata)
                elif a.type == dpkt.dns.DNS_AAAA:
                    answer["ip"] = socket.inet_ntop(socket.AF_INET6, a.rdata)
                elif a.type == dpkt.dns.DNS_CNAME:
                    answer["cname"] = a.cname
                entry["answers"].append(answer)

            results.append(entry)

    return results

TCP flow reconstruction

Individual packets only show fragments. To analyze complete conversations, reconstruct TCP flows:

from collections import defaultdict
import dpkt
import socket

class TCPFlow:
    def __init__(self):
        self.client_data = bytearray()
        self.server_data = bytearray()
        self.packets = 0

    def add_packet(self, is_client: bool, data: bytes):
        self.packets += 1
        if is_client:
            self.client_data.extend(data)
        else:
            self.server_data.extend(data)

def reconstruct_flows(pcap_path: str) -> dict:
    """Reconstruct TCP flows from a pcap file."""
    flows: dict[tuple, TCPFlow] = defaultdict(TCPFlow)
    syn_tracker = set()

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            eth = dpkt.ethernet.Ethernet(buf)
            if not isinstance(eth.data, dpkt.ip.IP):
                continue

            ip = eth.data
            if not isinstance(ip.data, dpkt.tcp.TCP):
                continue

            tcp = ip.data
            src = (socket.inet_ntoa(ip.src), tcp.sport)
            dst = (socket.inet_ntoa(ip.dst), tcp.dport)

            # Identify flow direction by SYN packets
            if tcp.flags & dpkt.tcp.TH_SYN and not (tcp.flags & dpkt.tcp.TH_ACK):
                syn_tracker.add((src, dst))

            # Determine flow key (always client→server order)
            if (src, dst) in syn_tracker:
                flow_key = (src, dst)
                is_client = True
            elif (dst, src) in syn_tracker:
                flow_key = (dst, src)
                is_client = False
            else:
                flow_key = (min(src, dst), max(src, dst))
                is_client = src == flow_key[0]

            if len(tcp.data) > 0:
                flows[flow_key].add_packet(is_client, tcp.data)

    return dict(flows)

Traffic statistics and bandwidth analysis

import dpkt
import socket
from collections import Counter, defaultdict
from datetime import datetime, timezone

def traffic_stats(pcap_path: str) -> dict:
    """Generate traffic statistics from a pcap file."""
    stats = {
        "total_packets": 0,
        "total_bytes": 0,
        "protocols": Counter(),
        "top_talkers": Counter(),
        "port_distribution": Counter(),
        "packets_per_second": defaultdict(int),
    }

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            stats["total_packets"] += 1
            stats["total_bytes"] += len(buf)

            # Timestamp bucketed by second
            second = int(ts)
            stats["packets_per_second"][second] += 1

            eth = dpkt.ethernet.Ethernet(buf)
            if not isinstance(eth.data, dpkt.ip.IP):
                continue

            ip = eth.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            stats["top_talkers"][(src, dst)] += len(buf)

            if isinstance(ip.data, dpkt.tcp.TCP):
                stats["protocols"]["TCP"] += 1
                stats["port_distribution"][ip.data.dport] += 1
            elif isinstance(ip.data, dpkt.udp.UDP):
                stats["protocols"]["UDP"] += 1
                stats["port_distribution"][ip.data.dport] += 1
            elif isinstance(ip.data, dpkt.icmp.ICMP):
                stats["protocols"]["ICMP"] += 1

    return stats

TLS/SSL connection analysis

import dpkt
import socket

def extract_tls_handshakes(pcap_path: str) -> list[dict]:
    """Extract TLS Client Hello messages (including SNI)."""
    results = []

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            eth = dpkt.ethernet.Ethernet(buf)
            if not isinstance(eth.data, dpkt.ip.IP):
                continue

            ip = eth.data
            if not isinstance(ip.data, dpkt.tcp.TCP):
                continue

            tcp = ip.data
            if len(tcp.data) < 5:
                continue

            # Check for TLS record (content type 0x16 = handshake)
            if tcp.data[0] != 0x16:
                continue

            try:
                tls = dpkt.ssl.TLSRecord(tcp.data)
                if isinstance(tls.data, bytes) and len(tls.data) > 0:
                    # Handshake type 0x01 = Client Hello
                    if tls.data[0] == 0x01:
                        entry = {
                            "timestamp": ts,
                            "src": socket.inet_ntoa(ip.src),
                            "dst": socket.inet_ntoa(ip.dst),
                            "dst_port": tcp.dport,
                            "tls_version": f"{tls.version[0]}.{tls.version[1]}",
                            "sni": _extract_sni(tls.data),
                        }
                        results.append(entry)
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                continue

    return results

def _extract_sni(handshake_data: bytes) -> str | None:
    """Extract Server Name Indication from Client Hello."""
    try:
        # Skip handshake header (4 bytes) + client hello fields
        # This is a simplified parser; production code should use a proper TLS parser
        idx = handshake_data.find(b"\x00\x00")  # SNI extension type
        if idx == -1:
            return None
        # Parse SNI extension — simplified
        name_start = idx + 9
        name_len = int.from_bytes(handshake_data[name_start - 2:name_start], "big")
        return handshake_data[name_start:name_start + name_len].decode(errors="ignore")
    except (IndexError, ValueError):
        return None

Performance: dpkt vs alternatives

Benchmark context: parsing a 1 GB pcap file with 2 million packets.

LibraryParse TimeMemoryNotes
dpkt~15 secondsLow (streaming)Best for batch processing
Scapy rdpcap~120 secondsHigh (loads all)Flexible but slow
pyshark~45 secondsMediumWraps tshark (subprocess)
C libpcap~3 secondsMinimalNot Python-native

dpkt’s streaming iterator keeps memory constant regardless of file size. Scapy’s rdpcap() loads everything into memory.

Writing pcap files

import dpkt
import struct
import time

# Create a new pcap file
writer = dpkt.pcap.Writer(open("output.pcap", "wb"))

# Craft and write a packet
eth = dpkt.ethernet.Ethernet(
    dst=b"\xff\xff\xff\xff\xff\xff",
    src=b"\x00\x11\x22\x33\x44\x55",
    type=dpkt.ethernet.ETH_TYPE_IP,
)
ip = dpkt.ip.IP(
    src=b"\xc0\xa8\x01\x01",  # 192.168.1.1
    dst=b"\xc0\xa8\x01\x02",  # 192.168.1.2
    p=dpkt.ip.IP_PROTO_TCP,
)
tcp = dpkt.tcp.TCP(sport=12345, dport=80, flags=dpkt.tcp.TH_SYN)

ip.data = tcp
ip.len = len(ip)
eth.data = ip

writer.writepkt(bytes(eth), ts=time.time())
writer.close()

Error handling patterns

import dpkt

def safe_parse(pcap_path: str):
    """Parse with comprehensive error handling."""
    errors = 0

    with open(pcap_path, "rb") as f:
        try:
            reader = dpkt.pcap.Reader(f)
        except ValueError:
            # Try pcapng format
            f.seek(0)
            reader = dpkt.pcapng.Reader(f)

        for ts, buf in reader:
            try:
                eth = dpkt.ethernet.Ethernet(buf)
            except dpkt.dpkt.NeedData:
                errors += 1
                continue  # Truncated packet
            except dpkt.dpkt.UnpackError:
                errors += 1
                continue  # Malformed packet

            if isinstance(eth.data, dpkt.ip.IP):
                try:
                    ip = eth.data
                    # Process IP packet
                except Exception:
                    errors += 1

    print(f"Parse errors: {errors}")

Tradeoffs

ApproachProsCons
dpktFast, lightweight, streaming, good protocol coverageRead-focused, limited crafting, smaller community
ScapyFull read/write, interactive, huge communitySlow for large captures, high memory
pysharkLeverages Wireshark’s dissectors, most protocolsSubprocess overhead, slower
gopacket (Go)Very fast, compiledNot Python
libpcap (C)Maximum performanceC API, harder to use

One thing to remember: dpkt is the workhorse for batch packet analysis in Python. Its streaming architecture and lightweight parsing make it the right tool when you need to process large captures efficiently. For interactive exploration or packet crafting, look to Scapy; for raw speed, look to C. dpkt sits at the sweet spot of Python convenience and real-world performance.

pythonnetworkingpacket-analysis

See Also

  • Python Dns Resolver Understand how Python translates website names into addresses, like a phone book for the entire internet.
  • Python Ftp Sftp Transfers Understand how Python moves files between computers over a network, like a digital delivery truck with a locked or unlocked cargo door.
  • Python Impacket Security Tools Understand how Python speaks the secret languages of Windows networks, helping security teams find weaknesses before attackers do.
  • Python Netconf Yang Understand how Python configures network devices automatically, like a remote control for every router and switch in your building.
  • Python Pcap Analysis Understand how Python reads recordings of network traffic, like playing back security camera footage to see what happened on your network.