Python dpkt Packet Parsing — Deep Dive
System-level framing
Network traffic analysis at scale demands a parser that can process millions of packets without becoming a bottleneck. dpkt fills this role in the Python ecosystem — it is a minimal, C-performance-oriented packet decoding library that converts raw capture bytes into structured Python objects. Where Scapy trades speed for flexibility, dpkt trades flexibility for throughput.
This deep dive covers advanced parsing patterns, flow reconstruction, protocol extraction, and building analysis pipelines.
Installation
pip install dpkt
Reading pcap and pcapng files
import dpkt
# Standard pcap
with open("capture.pcap", "rb") as f:
reader = dpkt.pcap.Reader(f)
for ts, buf in reader:
print(f"Timestamp: {ts}, Size: {len(buf)}")
# pcapng format (newer, supports metadata)
with open("capture.pcapng", "rb") as f:
reader = dpkt.pcapng.Reader(f)
for ts, buf in reader:
print(f"Timestamp: {ts}, Size: {len(buf)}")
The reader is a lazy iterator — it reads one packet at a time, making it memory-efficient for large files.
Complete packet decoding pipeline
import dpkt
import socket
from datetime import datetime, timezone
def decode_packet(ts: float, buf: bytes) -> dict | None:
"""Decode a single packet into a structured dict."""
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, (dpkt.ip.IP, dpkt.ip6.IP6)):
return None
ip = eth.data
is_v6 = isinstance(ip, dpkt.ip6.IP6)
result = {
"timestamp": datetime.fromtimestamp(ts, tz=timezone.utc),
"src_ip": socket.inet_ntop(socket.AF_INET6 if is_v6 else socket.AF_INET, ip.src),
"dst_ip": socket.inet_ntop(socket.AF_INET6 if is_v6 else socket.AF_INET, ip.dst),
"ip_len": ip.len if not is_v6 else ip.plen,
"protocol": None,
"src_port": None,
"dst_port": None,
}
transport = ip.data
if isinstance(transport, dpkt.tcp.TCP):
result["protocol"] = "TCP"
result["src_port"] = transport.sport
result["dst_port"] = transport.dport
result["tcp_flags"] = transport.flags
result["payload_len"] = len(transport.data)
elif isinstance(transport, dpkt.udp.UDP):
result["protocol"] = "UDP"
result["src_port"] = transport.sport
result["dst_port"] = transport.dport
result["payload_len"] = len(transport.data)
elif isinstance(transport, dpkt.icmp.ICMP):
result["protocol"] = "ICMP"
result["icmp_type"] = transport.type
result["icmp_code"] = transport.code
return result
# Usage
with open("capture.pcap", "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
pkt = decode_packet(ts, buf)
if pkt:
print(pkt)
HTTP request and response extraction
import dpkt
def extract_http(pcap_path: str) -> list[dict]:
"""Extract HTTP requests and responses from a pcap file."""
results = []
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
if len(tcp.data) == 0:
continue
# Try to parse as HTTP request
try:
http = dpkt.http.Request(tcp.data)
results.append({
"type": "request",
"method": http.method,
"uri": http.uri,
"host": http.headers.get("host", ""),
"user_agent": http.headers.get("user-agent", ""),
"src": socket.inet_ntoa(ip.src),
})
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
pass
# Try to parse as HTTP response
try:
http = dpkt.http.Response(tcp.data)
results.append({
"type": "response",
"status": http.status,
"reason": http.reason,
"content_type": http.headers.get("content-type", ""),
"content_length": http.headers.get("content-length", ""),
})
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
pass
return results
DNS query and response parsing
import dpkt
import socket
def extract_dns(pcap_path: str) -> list[dict]:
"""Extract DNS queries and responses."""
results = []
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.udp.UDP):
continue
udp = ip.data
if udp.sport != 53 and udp.dport != 53:
continue
try:
dns = dpkt.dns.DNS(udp.data)
except dpkt.dpkt.UnpackError:
continue
entry = {
"timestamp": ts,
"src": socket.inet_ntoa(ip.src),
"dst": socket.inet_ntoa(ip.dst),
"id": dns.id,
"is_response": bool(dns.qr),
"queries": [],
"answers": [],
}
for q in dns.qd:
entry["queries"].append({
"name": q.name,
"type": q.type,
})
for a in dns.an:
answer = {"name": a.name, "type": a.type, "ttl": a.ttl}
if a.type == dpkt.dns.DNS_A:
answer["ip"] = socket.inet_ntoa(a.rdata)
elif a.type == dpkt.dns.DNS_AAAA:
answer["ip"] = socket.inet_ntop(socket.AF_INET6, a.rdata)
elif a.type == dpkt.dns.DNS_CNAME:
answer["cname"] = a.cname
entry["answers"].append(answer)
results.append(entry)
return results
TCP flow reconstruction
Individual packets only show fragments. To analyze complete conversations, reconstruct TCP flows:
from collections import defaultdict
import dpkt
import socket
class TCPFlow:
def __init__(self):
self.client_data = bytearray()
self.server_data = bytearray()
self.packets = 0
def add_packet(self, is_client: bool, data: bytes):
self.packets += 1
if is_client:
self.client_data.extend(data)
else:
self.server_data.extend(data)
def reconstruct_flows(pcap_path: str) -> dict:
"""Reconstruct TCP flows from a pcap file."""
flows: dict[tuple, TCPFlow] = defaultdict(TCPFlow)
syn_tracker = set()
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
src = (socket.inet_ntoa(ip.src), tcp.sport)
dst = (socket.inet_ntoa(ip.dst), tcp.dport)
# Identify flow direction by SYN packets
if tcp.flags & dpkt.tcp.TH_SYN and not (tcp.flags & dpkt.tcp.TH_ACK):
syn_tracker.add((src, dst))
# Determine flow key (always client→server order)
if (src, dst) in syn_tracker:
flow_key = (src, dst)
is_client = True
elif (dst, src) in syn_tracker:
flow_key = (dst, src)
is_client = False
else:
flow_key = (min(src, dst), max(src, dst))
is_client = src == flow_key[0]
if len(tcp.data) > 0:
flows[flow_key].add_packet(is_client, tcp.data)
return dict(flows)
Traffic statistics and bandwidth analysis
import dpkt
import socket
from collections import Counter, defaultdict
from datetime import datetime, timezone
def traffic_stats(pcap_path: str) -> dict:
"""Generate traffic statistics from a pcap file."""
stats = {
"total_packets": 0,
"total_bytes": 0,
"protocols": Counter(),
"top_talkers": Counter(),
"port_distribution": Counter(),
"packets_per_second": defaultdict(int),
}
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
stats["total_packets"] += 1
stats["total_bytes"] += len(buf)
# Timestamp bucketed by second
second = int(ts)
stats["packets_per_second"][second] += 1
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
stats["top_talkers"][(src, dst)] += len(buf)
if isinstance(ip.data, dpkt.tcp.TCP):
stats["protocols"]["TCP"] += 1
stats["port_distribution"][ip.data.dport] += 1
elif isinstance(ip.data, dpkt.udp.UDP):
stats["protocols"]["UDP"] += 1
stats["port_distribution"][ip.data.dport] += 1
elif isinstance(ip.data, dpkt.icmp.ICMP):
stats["protocols"]["ICMP"] += 1
return stats
TLS/SSL connection analysis
import dpkt
import socket
def extract_tls_handshakes(pcap_path: str) -> list[dict]:
"""Extract TLS Client Hello messages (including SNI)."""
results = []
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
if len(tcp.data) < 5:
continue
# Check for TLS record (content type 0x16 = handshake)
if tcp.data[0] != 0x16:
continue
try:
tls = dpkt.ssl.TLSRecord(tcp.data)
if isinstance(tls.data, bytes) and len(tls.data) > 0:
# Handshake type 0x01 = Client Hello
if tls.data[0] == 0x01:
entry = {
"timestamp": ts,
"src": socket.inet_ntoa(ip.src),
"dst": socket.inet_ntoa(ip.dst),
"dst_port": tcp.dport,
"tls_version": f"{tls.version[0]}.{tls.version[1]}",
"sni": _extract_sni(tls.data),
}
results.append(entry)
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
continue
return results
def _extract_sni(handshake_data: bytes) -> str | None:
"""Extract Server Name Indication from Client Hello."""
try:
# Skip handshake header (4 bytes) + client hello fields
# This is a simplified parser; production code should use a proper TLS parser
idx = handshake_data.find(b"\x00\x00") # SNI extension type
if idx == -1:
return None
# Parse SNI extension — simplified
name_start = idx + 9
name_len = int.from_bytes(handshake_data[name_start - 2:name_start], "big")
return handshake_data[name_start:name_start + name_len].decode(errors="ignore")
except (IndexError, ValueError):
return None
Performance: dpkt vs alternatives
Benchmark context: parsing a 1 GB pcap file with 2 million packets.
| Library | Parse Time | Memory | Notes |
|---|---|---|---|
| dpkt | ~15 seconds | Low (streaming) | Best for batch processing |
| Scapy rdpcap | ~120 seconds | High (loads all) | Flexible but slow |
| pyshark | ~45 seconds | Medium | Wraps tshark (subprocess) |
| C libpcap | ~3 seconds | Minimal | Not Python-native |
dpkt’s streaming iterator keeps memory constant regardless of file size. Scapy’s rdpcap() loads everything into memory.
Writing pcap files
import dpkt
import struct
import time
# Create a new pcap file
writer = dpkt.pcap.Writer(open("output.pcap", "wb"))
# Craft and write a packet
eth = dpkt.ethernet.Ethernet(
dst=b"\xff\xff\xff\xff\xff\xff",
src=b"\x00\x11\x22\x33\x44\x55",
type=dpkt.ethernet.ETH_TYPE_IP,
)
ip = dpkt.ip.IP(
src=b"\xc0\xa8\x01\x01", # 192.168.1.1
dst=b"\xc0\xa8\x01\x02", # 192.168.1.2
p=dpkt.ip.IP_PROTO_TCP,
)
tcp = dpkt.tcp.TCP(sport=12345, dport=80, flags=dpkt.tcp.TH_SYN)
ip.data = tcp
ip.len = len(ip)
eth.data = ip
writer.writepkt(bytes(eth), ts=time.time())
writer.close()
Error handling patterns
import dpkt
def safe_parse(pcap_path: str):
"""Parse with comprehensive error handling."""
errors = 0
with open(pcap_path, "rb") as f:
try:
reader = dpkt.pcap.Reader(f)
except ValueError:
# Try pcapng format
f.seek(0)
reader = dpkt.pcapng.Reader(f)
for ts, buf in reader:
try:
eth = dpkt.ethernet.Ethernet(buf)
except dpkt.dpkt.NeedData:
errors += 1
continue # Truncated packet
except dpkt.dpkt.UnpackError:
errors += 1
continue # Malformed packet
if isinstance(eth.data, dpkt.ip.IP):
try:
ip = eth.data
# Process IP packet
except Exception:
errors += 1
print(f"Parse errors: {errors}")
Tradeoffs
| Approach | Pros | Cons |
|---|---|---|
| dpkt | Fast, lightweight, streaming, good protocol coverage | Read-focused, limited crafting, smaller community |
| Scapy | Full read/write, interactive, huge community | Slow for large captures, high memory |
| pyshark | Leverages Wireshark’s dissectors, most protocols | Subprocess overhead, slower |
| gopacket (Go) | Very fast, compiled | Not Python |
| libpcap (C) | Maximum performance | C API, harder to use |
One thing to remember: dpkt is the workhorse for batch packet analysis in Python. Its streaming architecture and lightweight parsing make it the right tool when you need to process large captures efficiently. For interactive exploration or packet crafting, look to Scapy; for raw speed, look to C. dpkt sits at the sweet spot of Python convenience and real-world performance.
See Also
- Python Dns Resolver Understand how Python translates website names into addresses, like a phone book for the entire internet.
- Python Ftp Sftp Transfers Understand how Python moves files between computers over a network, like a digital delivery truck with a locked or unlocked cargo door.
- Python Impacket Security Tools Understand how Python speaks the secret languages of Windows networks, helping security teams find weaknesses before attackers do.
- Python Netconf Yang Understand how Python configures network devices automatically, like a remote control for every router and switch in your building.
- Python Pcap Analysis Understand how Python reads recordings of network traffic, like playing back security camera footage to see what happened on your network.