Python PCAP Analysis — Deep Dive
System-level framing
PCAP analysis at scale is a data engineering problem. A day of traffic on a moderately busy network produces gigabytes of capture data containing millions of packets. Manual analysis with Wireshark is useful for small samples, but incident response and threat hunting require automated pipelines that can process captures quickly, extract indicators, and correlate findings across multiple data sources.
Python is the dominant language for this work because it bridges three worlds: fast packet parsing (dpkt), rich protocol decoding (Scapy, pyshark), and data analysis (pandas, matplotlib). This deep dive covers building production analysis pipelines.
Multi-library strategy
No single Python library is best for everything. Use them together:
# dpkt for fast initial pass and statistics
import dpkt
# Scapy for interactive deep inspection of specific packets
from scapy.all import rdpcap, IP, TCP
# pyshark for protocol-specific decoding when you need Wireshark-level detail
import pyshark
Rule of thumb: dpkt for bulk processing, Scapy for targeted analysis, pyshark for complex protocol decoding.
Automated overview report
import dpkt
import socket
from collections import Counter
from datetime import datetime, timezone
def pcap_overview(path: str) -> dict:
"""Generate a high-level overview of a PCAP file."""
stats = {
"total_packets": 0,
"total_bytes": 0,
"start_time": None,
"end_time": None,
"unique_src_ips": set(),
"unique_dst_ips": set(),
"protocols": Counter(),
"top_conversations": Counter(),
"top_ports": Counter(),
}
with open(path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
stats["total_packets"] += 1
stats["total_bytes"] += len(buf)
if stats["start_time"] is None:
stats["start_time"] = ts
stats["end_time"] = ts
try:
eth = dpkt.ethernet.Ethernet(buf)
except dpkt.dpkt.UnpackError:
continue
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
stats["unique_src_ips"].add(src)
stats["unique_dst_ips"].add(dst)
conv_key = tuple(sorted([src, dst]))
stats["top_conversations"][conv_key] += len(buf)
if isinstance(ip.data, dpkt.tcp.TCP):
stats["protocols"]["TCP"] += 1
stats["top_ports"][ip.data.dport] += 1
elif isinstance(ip.data, dpkt.udp.UDP):
stats["protocols"]["UDP"] += 1
stats["top_ports"][ip.data.dport] += 1
elif isinstance(ip.data, dpkt.icmp.ICMP):
stats["protocols"]["ICMP"] += 1
duration = stats["end_time"] - stats["start_time"] if stats["start_time"] else 0
stats["duration_seconds"] = round(duration, 2)
stats["avg_pps"] = round(stats["total_packets"] / max(duration, 1), 1)
stats["unique_src_ips"] = len(stats["unique_src_ips"])
stats["unique_dst_ips"] = len(stats["unique_dst_ips"])
stats["top_conversations"] = stats["top_conversations"].most_common(10)
stats["top_ports"] = stats["top_ports"].most_common(20)
return stats
DNS analysis pipeline
Extract and analyze all DNS activity:
import dpkt
import socket
from collections import defaultdict
def analyze_dns(pcap_path: str) -> dict:
"""Extract comprehensive DNS intelligence from a PCAP."""
queries = defaultdict(list)
responses = defaultdict(list)
nxdomains = []
query_sources = defaultdict(set)
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
try:
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.udp.UDP):
continue
udp = ip.data
if udp.sport != 53 and udp.dport != 53:
continue
dns = dpkt.dns.DNS(udp.data)
except (dpkt.dpkt.UnpackError, dpkt.dpkt.NeedData):
continue
src = socket.inet_ntoa(ip.src)
# Queries
if dns.qr == 0: # Query
for q in dns.qd:
queries[q.name].append(ts)
query_sources[q.name].add(src)
# Responses
elif dns.qr == 1: # Response
if dns.rcode == dpkt.dns.DNS_RCODE_NXDOMAIN:
for q in dns.qd:
nxdomains.append({"domain": q.name, "timestamp": ts})
for a in dns.an:
if a.type == dpkt.dns.DNS_A:
responses[a.name].append(socket.inet_ntoa(a.rdata))
return {
"unique_domains": len(queries),
"top_queried": sorted(queries.items(), key=lambda x: len(x[1]), reverse=True)[:20],
"nxdomain_count": len(nxdomains),
"nxdomains": nxdomains[:50], # First 50 for inspection
"domains_with_multiple_ips": {
d: list(set(ips)) for d, ips in responses.items() if len(set(ips)) > 1
},
"domains_queried_by_multiple_hosts": {
d: list(srcs) for d, srcs in query_sources.items() if len(srcs) > 3
},
}
HTTP artifact extraction
import dpkt
import socket
import os
import hashlib
from io import BytesIO
import gzip
def extract_http_artifacts(pcap_path: str, output_dir: str) -> list[dict]:
"""Extract files and URLs from HTTP traffic."""
os.makedirs(output_dir, exist_ok=True)
artifacts = []
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
try:
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
if len(tcp.data) == 0:
continue
except dpkt.dpkt.UnpackError:
continue
# Try HTTP request
try:
req = dpkt.http.Request(tcp.data)
artifacts.append({
"type": "request",
"timestamp": ts,
"src": socket.inet_ntoa(ip.src),
"method": req.method,
"host": req.headers.get("host", "unknown"),
"uri": req.uri,
"user_agent": req.headers.get("user-agent", ""),
})
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
pass
# Try HTTP response with body
try:
resp = dpkt.http.Response(tcp.data)
content_type = resp.headers.get("content-type", "")
body = resp.body
# Decompress if gzipped
if resp.headers.get("content-encoding") == "gzip" and body:
try:
body = gzip.decompress(body)
except Exception:
pass
if body and len(body) > 100:
file_hash = hashlib.sha256(body).hexdigest()[:16]
ext = _guess_extension(content_type)
filename = f"{file_hash}{ext}"
filepath = os.path.join(output_dir, filename)
with open(filepath, "wb") as out:
out.write(body)
artifacts.append({
"type": "file",
"timestamp": ts,
"content_type": content_type,
"size": len(body),
"sha256": hashlib.sha256(body).hexdigest(),
"saved_as": filepath,
})
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
pass
return artifacts
def _guess_extension(content_type: str) -> str:
mapping = {
"text/html": ".html",
"application/javascript": ".js",
"text/css": ".css",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"application/pdf": ".pdf",
"application/zip": ".zip",
"application/octet-stream": ".bin",
}
for ct, ext in mapping.items():
if ct in content_type:
return ext
return ".bin"
TLS certificate extraction
import dpkt
import socket
import ssl
from datetime import datetime
def extract_tls_certs(pcap_path: str) -> list[dict]:
"""Extract TLS server certificates from handshakes."""
certs = []
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
try:
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
if len(tcp.data) < 10:
continue
# Look for TLS handshake records
if tcp.data[0] != 0x16: # Not a handshake
continue
records, _ = dpkt.ssl.tls_multi_factory(tcp.data)
for record in records:
if (record.type == 22 and len(record.data) > 0
and record.data[0] == 11): # Certificate message
certs.append({
"timestamp": ts,
"server_ip": socket.inet_ntoa(ip.src),
"server_port": tcp.sport,
"client_ip": socket.inet_ntoa(ip.dst),
"tls_version": f"{record.version[0]}.{record.version[1]}",
"cert_data_length": len(record.data),
})
except (dpkt.dpkt.UnpackError, dpkt.dpkt.NeedData, Exception):
continue
return certs
Anomaly detection patterns
Port scan detection
import dpkt
import socket
from collections import defaultdict
def detect_port_scans(pcap_path: str, threshold: int = 50) -> list[dict]:
"""Detect hosts that connect to many ports on a target (port scanning)."""
connections = defaultdict(lambda: defaultdict(set))
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
try:
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
tcp = ip.data
# SYN packet (connection attempt)
if tcp.flags & dpkt.tcp.TH_SYN and not (tcp.flags & dpkt.tcp.TH_ACK):
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
connections[src][dst].add(tcp.dport)
except dpkt.dpkt.UnpackError:
continue
scanners = []
for src, targets in connections.items():
for dst, ports in targets.items():
if len(ports) >= threshold:
scanners.append({
"scanner": src,
"target": dst,
"unique_ports": len(ports),
"port_sample": sorted(list(ports))[:20],
})
return sorted(scanners, key=lambda x: x["unique_ports"], reverse=True)
Data exfiltration detection
def detect_exfiltration(pcap_path: str, threshold_bytes: int = 10_000_000) -> list[dict]:
"""Detect large outbound data transfers that could indicate exfiltration."""
transfers = defaultdict(lambda: {"bytes": 0, "packets": 0, "first_seen": None, "last_seen": None})
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
try:
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
src = socket.inet_ntoa(ip.src)
dst = socket.inet_ntoa(ip.dst)
# Skip private-to-private traffic
if _is_private(src) and _is_private(dst):
continue
if _is_private(src) and not _is_private(dst):
key = (src, dst)
transfers[key]["bytes"] += len(buf)
transfers[key]["packets"] += 1
if transfers[key]["first_seen"] is None:
transfers[key]["first_seen"] = ts
transfers[key]["last_seen"] = ts
except dpkt.dpkt.UnpackError:
continue
suspects = []
for (src, dst), info in transfers.items():
if info["bytes"] >= threshold_bytes:
duration = info["last_seen"] - info["first_seen"]
suspects.append({
"internal_host": src,
"external_host": dst,
"total_bytes": info["bytes"],
"total_mb": round(info["bytes"] / 1_048_576, 2),
"packets": info["packets"],
"duration_seconds": round(duration, 1),
})
return sorted(suspects, key=lambda x: x["total_bytes"], reverse=True)
def _is_private(ip: str) -> bool:
parts = ip.split(".")
first = int(parts[0])
second = int(parts[1])
return (first == 10 or (first == 172 and 16 <= second <= 31) or
(first == 192 and second == 168))
Combining with pandas for analysis
import dpkt
import socket
import pandas as pd
def pcap_to_dataframe(pcap_path: str) -> pd.DataFrame:
"""Convert a PCAP file to a pandas DataFrame for analysis."""
rows = []
with open(pcap_path, "rb") as f:
for ts, buf in dpkt.pcap.Reader(f):
try:
eth = dpkt.ethernet.Ethernet(buf)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
row = {
"timestamp": pd.Timestamp(ts, unit="s", tz="UTC"),
"src_ip": socket.inet_ntoa(ip.src),
"dst_ip": socket.inet_ntoa(ip.dst),
"ip_len": ip.len,
"protocol": type(ip.data).__name__,
}
if isinstance(ip.data, dpkt.tcp.TCP):
row["src_port"] = ip.data.sport
row["dst_port"] = ip.data.dport
row["payload_len"] = len(ip.data.data)
elif isinstance(ip.data, dpkt.udp.UDP):
row["src_port"] = ip.data.sport
row["dst_port"] = ip.data.dport
row["payload_len"] = len(ip.data.data)
rows.append(row)
except dpkt.dpkt.UnpackError:
continue
return pd.DataFrame(rows)
# Usage example:
# df = pcap_to_dataframe("capture.pcap")
# print(df.groupby("dst_ip")["ip_len"].sum().sort_values(ascending=False).head(10))
# print(df.set_index("timestamp").resample("1S")["ip_len"].sum()) # Bandwidth per second
Performance tips for large captures
- Stream, do not load — Use dpkt’s iterator, never load an entire pcap into memory.
- Filter early — Check protocol and port before parsing payload.
- Use BPF at capture —
tcpdump -w out.pcap 'port 443'is faster than filtering in Python. - Parallelize — Split large captures with
editcapand process chunks in parallel. - Index by time — For repeated queries, build a timestamp index on first pass.
# Split a large pcap into 100MB chunks
editcap -c 100000 large.pcap chunk_
# Process chunks in parallel
ls chunk_*.pcap | parallel -j4 python analyze.py {}
Tradeoffs
| Approach | Pros | Cons |
|---|---|---|
| Python + dpkt | Fast, scriptable, integrates with data tools | Manual protocol parsing |
| Python + pyshark | Wireshark-level protocol decoding | Subprocess overhead, slower |
| Wireshark GUI | Interactive, visual, comprehensive | Manual, does not scale |
| tshark CLI | Fast, scriptable, full protocol support | Output parsing can be fragile |
| Zeek (formerly Bro) | Connection-level logs, protocol analysis | Separate tool, learning curve |
One thing to remember: PCAP analysis in Python is about building pipelines — automated workflows that transform raw packet captures into actionable intelligence. The libraries give you the parsing; the value comes from knowing what questions to ask and how to correlate the answers across protocols, timestamps, and hosts.
See Also
- Python Dns Resolver Understand how Python translates website names into addresses, like a phone book for the entire internet.
- Python Dpkt Packet Parsing Understand how Python reads and decodes captured network traffic, like opening envelopes to see what is inside each message.
- Python Ftp Sftp Transfers Understand how Python moves files between computers over a network, like a digital delivery truck with a locked or unlocked cargo door.
- Python Impacket Security Tools Understand how Python speaks the secret languages of Windows networks, helping security teams find weaknesses before attackers do.
- Python Netconf Yang Understand how Python configures network devices automatically, like a remote control for every router and switch in your building.