Python PCAP Analysis — Deep Dive

System-level framing

PCAP analysis at scale is a data engineering problem. A day of traffic on a moderately busy network produces gigabytes of capture data containing millions of packets. Manual analysis with Wireshark is useful for small samples, but incident response and threat hunting require automated pipelines that can process captures quickly, extract indicators, and correlate findings across multiple data sources.

Python is the dominant language for this work because it bridges three worlds: fast packet parsing (dpkt), rich protocol decoding (Scapy, pyshark), and data analysis (pandas, matplotlib). This deep dive covers building production analysis pipelines.

Multi-library strategy

No single Python library is best for everything. Use them together:

# dpkt for fast initial pass and statistics
import dpkt

# Scapy for interactive deep inspection of specific packets
from scapy.all import rdpcap, IP, TCP

# pyshark for protocol-specific decoding when you need Wireshark-level detail
import pyshark

Rule of thumb: dpkt for bulk processing, Scapy for targeted analysis, pyshark for complex protocol decoding.

Automated overview report

import dpkt
import socket
from collections import Counter
from datetime import datetime, timezone

def pcap_overview(path: str) -> dict:
    """Generate a high-level overview of a PCAP file."""
    stats = {
        "total_packets": 0,
        "total_bytes": 0,
        "start_time": None,
        "end_time": None,
        "unique_src_ips": set(),
        "unique_dst_ips": set(),
        "protocols": Counter(),
        "top_conversations": Counter(),
        "top_ports": Counter(),
    }

    with open(path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            stats["total_packets"] += 1
            stats["total_bytes"] += len(buf)

            if stats["start_time"] is None:
                stats["start_time"] = ts
            stats["end_time"] = ts

            try:
                eth = dpkt.ethernet.Ethernet(buf)
            except dpkt.dpkt.UnpackError:
                continue

            if not isinstance(eth.data, dpkt.ip.IP):
                continue

            ip = eth.data
            src = socket.inet_ntoa(ip.src)
            dst = socket.inet_ntoa(ip.dst)
            stats["unique_src_ips"].add(src)
            stats["unique_dst_ips"].add(dst)

            conv_key = tuple(sorted([src, dst]))
            stats["top_conversations"][conv_key] += len(buf)

            if isinstance(ip.data, dpkt.tcp.TCP):
                stats["protocols"]["TCP"] += 1
                stats["top_ports"][ip.data.dport] += 1
            elif isinstance(ip.data, dpkt.udp.UDP):
                stats["protocols"]["UDP"] += 1
                stats["top_ports"][ip.data.dport] += 1
            elif isinstance(ip.data, dpkt.icmp.ICMP):
                stats["protocols"]["ICMP"] += 1

    duration = stats["end_time"] - stats["start_time"] if stats["start_time"] else 0
    stats["duration_seconds"] = round(duration, 2)
    stats["avg_pps"] = round(stats["total_packets"] / max(duration, 1), 1)
    stats["unique_src_ips"] = len(stats["unique_src_ips"])
    stats["unique_dst_ips"] = len(stats["unique_dst_ips"])
    stats["top_conversations"] = stats["top_conversations"].most_common(10)
    stats["top_ports"] = stats["top_ports"].most_common(20)

    return stats

DNS analysis pipeline

Extract and analyze all DNS activity:

import dpkt
import socket
from collections import defaultdict

def analyze_dns(pcap_path: str) -> dict:
    """Extract comprehensive DNS intelligence from a PCAP."""
    queries = defaultdict(list)
    responses = defaultdict(list)
    nxdomains = []
    query_sources = defaultdict(set)

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                if not isinstance(eth.data, dpkt.ip.IP):
                    continue
                ip = eth.data
                if not isinstance(ip.data, dpkt.udp.UDP):
                    continue
                udp = ip.data
                if udp.sport != 53 and udp.dport != 53:
                    continue

                dns = dpkt.dns.DNS(udp.data)
            except (dpkt.dpkt.UnpackError, dpkt.dpkt.NeedData):
                continue

            src = socket.inet_ntoa(ip.src)

            # Queries
            if dns.qr == 0:  # Query
                for q in dns.qd:
                    queries[q.name].append(ts)
                    query_sources[q.name].add(src)

            # Responses
            elif dns.qr == 1:  # Response
                if dns.rcode == dpkt.dns.DNS_RCODE_NXDOMAIN:
                    for q in dns.qd:
                        nxdomains.append({"domain": q.name, "timestamp": ts})

                for a in dns.an:
                    if a.type == dpkt.dns.DNS_A:
                        responses[a.name].append(socket.inet_ntoa(a.rdata))

    return {
        "unique_domains": len(queries),
        "top_queried": sorted(queries.items(), key=lambda x: len(x[1]), reverse=True)[:20],
        "nxdomain_count": len(nxdomains),
        "nxdomains": nxdomains[:50],  # First 50 for inspection
        "domains_with_multiple_ips": {
            d: list(set(ips)) for d, ips in responses.items() if len(set(ips)) > 1
        },
        "domains_queried_by_multiple_hosts": {
            d: list(srcs) for d, srcs in query_sources.items() if len(srcs) > 3
        },
    }

HTTP artifact extraction

import dpkt
import socket
import os
import hashlib
from io import BytesIO
import gzip

def extract_http_artifacts(pcap_path: str, output_dir: str) -> list[dict]:
    """Extract files and URLs from HTTP traffic."""
    os.makedirs(output_dir, exist_ok=True)
    artifacts = []

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                if not isinstance(eth.data, dpkt.ip.IP):
                    continue
                ip = eth.data
                if not isinstance(ip.data, dpkt.tcp.TCP):
                    continue
                tcp = ip.data
                if len(tcp.data) == 0:
                    continue
            except dpkt.dpkt.UnpackError:
                continue

            # Try HTTP request
            try:
                req = dpkt.http.Request(tcp.data)
                artifacts.append({
                    "type": "request",
                    "timestamp": ts,
                    "src": socket.inet_ntoa(ip.src),
                    "method": req.method,
                    "host": req.headers.get("host", "unknown"),
                    "uri": req.uri,
                    "user_agent": req.headers.get("user-agent", ""),
                })
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                pass

            # Try HTTP response with body
            try:
                resp = dpkt.http.Response(tcp.data)
                content_type = resp.headers.get("content-type", "")
                body = resp.body

                # Decompress if gzipped
                if resp.headers.get("content-encoding") == "gzip" and body:
                    try:
                        body = gzip.decompress(body)
                    except Exception:
                        pass

                if body and len(body) > 100:
                    file_hash = hashlib.sha256(body).hexdigest()[:16]
                    ext = _guess_extension(content_type)
                    filename = f"{file_hash}{ext}"
                    filepath = os.path.join(output_dir, filename)

                    with open(filepath, "wb") as out:
                        out.write(body)

                    artifacts.append({
                        "type": "file",
                        "timestamp": ts,
                        "content_type": content_type,
                        "size": len(body),
                        "sha256": hashlib.sha256(body).hexdigest(),
                        "saved_as": filepath,
                    })
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                pass

    return artifacts

def _guess_extension(content_type: str) -> str:
    mapping = {
        "text/html": ".html",
        "application/javascript": ".js",
        "text/css": ".css",
        "image/png": ".png",
        "image/jpeg": ".jpg",
        "image/gif": ".gif",
        "application/pdf": ".pdf",
        "application/zip": ".zip",
        "application/octet-stream": ".bin",
    }
    for ct, ext in mapping.items():
        if ct in content_type:
            return ext
    return ".bin"

TLS certificate extraction

import dpkt
import socket
import ssl
from datetime import datetime

def extract_tls_certs(pcap_path: str) -> list[dict]:
    """Extract TLS server certificates from handshakes."""
    certs = []

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                if not isinstance(eth.data, dpkt.ip.IP):
                    continue
                ip = eth.data
                if not isinstance(ip.data, dpkt.tcp.TCP):
                    continue
                tcp = ip.data
                if len(tcp.data) < 10:
                    continue

                # Look for TLS handshake records
                if tcp.data[0] != 0x16:  # Not a handshake
                    continue

                records, _ = dpkt.ssl.tls_multi_factory(tcp.data)
                for record in records:
                    if (record.type == 22 and len(record.data) > 0
                            and record.data[0] == 11):  # Certificate message
                        certs.append({
                            "timestamp": ts,
                            "server_ip": socket.inet_ntoa(ip.src),
                            "server_port": tcp.sport,
                            "client_ip": socket.inet_ntoa(ip.dst),
                            "tls_version": f"{record.version[0]}.{record.version[1]}",
                            "cert_data_length": len(record.data),
                        })
            except (dpkt.dpkt.UnpackError, dpkt.dpkt.NeedData, Exception):
                continue

    return certs

Anomaly detection patterns

Port scan detection

import dpkt
import socket
from collections import defaultdict

def detect_port_scans(pcap_path: str, threshold: int = 50) -> list[dict]:
    """Detect hosts that connect to many ports on a target (port scanning)."""
    connections = defaultdict(lambda: defaultdict(set))

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                if not isinstance(eth.data, dpkt.ip.IP):
                    continue
                ip = eth.data
                if not isinstance(ip.data, dpkt.tcp.TCP):
                    continue
                tcp = ip.data

                # SYN packet (connection attempt)
                if tcp.flags & dpkt.tcp.TH_SYN and not (tcp.flags & dpkt.tcp.TH_ACK):
                    src = socket.inet_ntoa(ip.src)
                    dst = socket.inet_ntoa(ip.dst)
                    connections[src][dst].add(tcp.dport)
            except dpkt.dpkt.UnpackError:
                continue

    scanners = []
    for src, targets in connections.items():
        for dst, ports in targets.items():
            if len(ports) >= threshold:
                scanners.append({
                    "scanner": src,
                    "target": dst,
                    "unique_ports": len(ports),
                    "port_sample": sorted(list(ports))[:20],
                })

    return sorted(scanners, key=lambda x: x["unique_ports"], reverse=True)

Data exfiltration detection

def detect_exfiltration(pcap_path: str, threshold_bytes: int = 10_000_000) -> list[dict]:
    """Detect large outbound data transfers that could indicate exfiltration."""
    transfers = defaultdict(lambda: {"bytes": 0, "packets": 0, "first_seen": None, "last_seen": None})

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                if not isinstance(eth.data, dpkt.ip.IP):
                    continue
                ip = eth.data
                src = socket.inet_ntoa(ip.src)
                dst = socket.inet_ntoa(ip.dst)

                # Skip private-to-private traffic
                if _is_private(src) and _is_private(dst):
                    continue

                if _is_private(src) and not _is_private(dst):
                    key = (src, dst)
                    transfers[key]["bytes"] += len(buf)
                    transfers[key]["packets"] += 1
                    if transfers[key]["first_seen"] is None:
                        transfers[key]["first_seen"] = ts
                    transfers[key]["last_seen"] = ts
            except dpkt.dpkt.UnpackError:
                continue

    suspects = []
    for (src, dst), info in transfers.items():
        if info["bytes"] >= threshold_bytes:
            duration = info["last_seen"] - info["first_seen"]
            suspects.append({
                "internal_host": src,
                "external_host": dst,
                "total_bytes": info["bytes"],
                "total_mb": round(info["bytes"] / 1_048_576, 2),
                "packets": info["packets"],
                "duration_seconds": round(duration, 1),
            })

    return sorted(suspects, key=lambda x: x["total_bytes"], reverse=True)

def _is_private(ip: str) -> bool:
    parts = ip.split(".")
    first = int(parts[0])
    second = int(parts[1])
    return (first == 10 or (first == 172 and 16 <= second <= 31) or
            (first == 192 and second == 168))

Combining with pandas for analysis

import dpkt
import socket
import pandas as pd

def pcap_to_dataframe(pcap_path: str) -> pd.DataFrame:
    """Convert a PCAP file to a pandas DataFrame for analysis."""
    rows = []

    with open(pcap_path, "rb") as f:
        for ts, buf in dpkt.pcap.Reader(f):
            try:
                eth = dpkt.ethernet.Ethernet(buf)
                if not isinstance(eth.data, dpkt.ip.IP):
                    continue
                ip = eth.data

                row = {
                    "timestamp": pd.Timestamp(ts, unit="s", tz="UTC"),
                    "src_ip": socket.inet_ntoa(ip.src),
                    "dst_ip": socket.inet_ntoa(ip.dst),
                    "ip_len": ip.len,
                    "protocol": type(ip.data).__name__,
                }

                if isinstance(ip.data, dpkt.tcp.TCP):
                    row["src_port"] = ip.data.sport
                    row["dst_port"] = ip.data.dport
                    row["payload_len"] = len(ip.data.data)
                elif isinstance(ip.data, dpkt.udp.UDP):
                    row["src_port"] = ip.data.sport
                    row["dst_port"] = ip.data.dport
                    row["payload_len"] = len(ip.data.data)

                rows.append(row)
            except dpkt.dpkt.UnpackError:
                continue

    return pd.DataFrame(rows)

# Usage example:
# df = pcap_to_dataframe("capture.pcap")
# print(df.groupby("dst_ip")["ip_len"].sum().sort_values(ascending=False).head(10))
# print(df.set_index("timestamp").resample("1S")["ip_len"].sum())  # Bandwidth per second

Performance tips for large captures

  1. Stream, do not load — Use dpkt’s iterator, never load an entire pcap into memory.
  2. Filter early — Check protocol and port before parsing payload.
  3. Use BPF at capturetcpdump -w out.pcap 'port 443' is faster than filtering in Python.
  4. Parallelize — Split large captures with editcap and process chunks in parallel.
  5. Index by time — For repeated queries, build a timestamp index on first pass.
# Split a large pcap into 100MB chunks
editcap -c 100000 large.pcap chunk_

# Process chunks in parallel
ls chunk_*.pcap | parallel -j4 python analyze.py {}

Tradeoffs

ApproachProsCons
Python + dpktFast, scriptable, integrates with data toolsManual protocol parsing
Python + pysharkWireshark-level protocol decodingSubprocess overhead, slower
Wireshark GUIInteractive, visual, comprehensiveManual, does not scale
tshark CLIFast, scriptable, full protocol supportOutput parsing can be fragile
Zeek (formerly Bro)Connection-level logs, protocol analysisSeparate tool, learning curve

One thing to remember: PCAP analysis in Python is about building pipelines — automated workflows that transform raw packet captures into actionable intelligence. The libraries give you the parsing; the value comes from knowing what questions to ask and how to correlate the answers across protocols, timestamps, and hosts.

pythonnetworkinganalysis

See Also

  • Python Dns Resolver Understand how Python translates website names into addresses, like a phone book for the entire internet.
  • Python Dpkt Packet Parsing Understand how Python reads and decodes captured network traffic, like opening envelopes to see what is inside each message.
  • Python Ftp Sftp Transfers Understand how Python moves files between computers over a network, like a digital delivery truck with a locked or unlocked cargo door.
  • Python Impacket Security Tools Understand how Python speaks the secret languages of Windows networks, helping security teams find weaknesses before attackers do.
  • Python Netconf Yang Understand how Python configures network devices automatically, like a remote control for every router and switch in your building.