eDiscovery Processing with Python — Deep Dive

Multi-format document processing

eDiscovery data includes dozens of file types. A robust processor handles them all through a unified interface:

import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class ProcessedDocument:
    doc_id: str
    source_path: str
    file_type: str
    extracted_text: str
    metadata: dict
    md5_hash: str
    sha256_hash: str
    word_count: int
    attachments: list["ProcessedDocument"] = field(default_factory=list)
    parent_id: str | None = None


class DocumentProcessor:
    def process(self, file_path: Path) -> ProcessedDocument:
        suffix = file_path.suffix.lower()
        extractors = {
            ".pdf": self._extract_pdf,
            ".docx": self._extract_docx,
            ".msg": self._extract_msg,
            ".eml": self._extract_eml,
            ".xlsx": self._extract_xlsx,
            ".txt": self._extract_text,
            ".html": self._extract_html,
        }

        extractor = extractors.get(suffix, self._extract_text)
        text, metadata = extractor(file_path)

        # Generate hashes for deduplication
        content_bytes = text.encode("utf-8")
        md5 = hashlib.md5(content_bytes).hexdigest()
        sha256 = hashlib.sha256(content_bytes).hexdigest()

        return ProcessedDocument(
            doc_id=sha256[:16],
            source_path=str(file_path),
            file_type=suffix,
            extracted_text=text,
            metadata=metadata,
            md5_hash=md5,
            sha256_hash=sha256,
            word_count=len(text.split()),
        )

    def _extract_pdf(self, path: Path) -> tuple[str, dict]:
        import pdfplumber
        pages = []
        metadata = {}
        with pdfplumber.open(path) as pdf:
            metadata = pdf.metadata or {}
            for page in pdf.pages:
                text = page.extract_text()
                if text:
                    pages.append(text)
                else:
                    # Fall back to OCR for scanned pages
                    pages.append(self._ocr_page(page))
        return "\n\n".join(pages), metadata

    def _ocr_page(self, page) -> str:
        """OCR a single PDF page using pytesseract."""
        import pytesseract
        from PIL import Image
        img = page.to_image(resolution=300).original
        return pytesseract.image_to_string(img)

    def _extract_msg(self, path: Path) -> tuple[str, dict]:
        """Extract Outlook .msg files."""
        import extract_msg
        msg = extract_msg.Message(str(path))
        metadata = {
            "from": msg.sender,
            "to": msg.to,
            "cc": msg.cc,
            "subject": msg.subject,
            "date": str(msg.date),
            "message_id": msg.messageId,
            "in_reply_to": getattr(msg, "inReplyTo", None),
        }
        return msg.body or "", metadata

    def _extract_eml(self, path: Path) -> tuple[str, dict]:
        """Extract .eml email files."""
        import email
        from email import policy
        with open(path, "rb") as f:
            msg = email.message_from_binary_file(f, policy=policy.default)

        body = msg.get_body(preferencelist=("plain", "html"))
        text = body.get_content() if body else ""
        metadata = {
            "from": msg["from"],
            "to": msg["to"],
            "cc": msg.get("cc", ""),
            "subject": msg["subject"],
            "date": msg["date"],
            "message_id": msg["message-id"],
            "in_reply_to": msg.get("in-reply-to", ""),
            "references": msg.get("references", ""),
        }
        return text, metadata

    def _extract_docx(self, path: Path) -> tuple[str, dict]:
        from docx import Document
        doc = Document(str(path))
        text = "\n".join(p.text for p in doc.paragraphs)
        props = doc.core_properties
        metadata = {
            "author": props.author,
            "created": str(props.created),
            "modified": str(props.modified),
            "title": props.title,
        }
        return text, metadata

    def _extract_xlsx(self, path: Path) -> tuple[str, dict]:
        import openpyxl
        wb = openpyxl.load_workbook(str(path), read_only=True)
        sheets_text = []
        for sheet in wb.sheetnames:
            ws = wb[sheet]
            rows = []
            for row in ws.iter_rows(values_only=True):
                rows.append(" | ".join(str(c) if c else "" for c in row))
            sheets_text.append(f"[Sheet: {sheet}]\n" + "\n".join(rows))
        return "\n\n".join(sheets_text), {"sheets": wb.sheetnames}

    def _extract_html(self, path: Path) -> tuple[str, dict]:
        from bs4 import BeautifulSoup
        with open(path, "r", errors="ignore") as f:
            soup = BeautifulSoup(f, "html.parser")
        return soup.get_text(separator="\n"), {"title": soup.title.string if soup.title else ""}

    def _extract_text(self, path: Path) -> tuple[str, dict]:
        with open(path, "r", errors="ignore") as f:
            return f.read(), {}

Deduplication engine

Exact deduplication uses hashes. Near-duplicate detection uses MinHash with LSH for efficiency across millions of documents:

from datasketch import MinHash, MinHashLSH
from dataclasses import dataclass


@dataclass
class DeduplicationResult:
    unique_docs: list[ProcessedDocument]
    exact_dupes: list[tuple[str, str]]   # (kept_id, removed_id)
    near_dupes: list[tuple[str, str, float]]  # (id1, id2, similarity)


class DeduplicationEngine:
    def __init__(self, similarity_threshold: float = 0.85):
        self.threshold = similarity_threshold
        self.lsh = MinHashLSH(threshold=similarity_threshold, num_perm=128)
        self.minhashes: dict[str, MinHash] = {}

    def _create_minhash(self, text: str) -> MinHash:
        """Create MinHash from document text using word shingles."""
        m = MinHash(num_perm=128)
        words = text.lower().split()
        # Use 3-word shingles for better near-dupe detection
        for i in range(len(words) - 2):
            shingle = " ".join(words[i:i + 3])
            m.update(shingle.encode("utf-8"))
        return m

    def deduplicate(
        self, documents: list[ProcessedDocument]
    ) -> DeduplicationResult:
        # Phase 1: exact deduplication by hash
        seen_hashes: dict[str, ProcessedDocument] = {}
        exact_dupes = []
        unique_after_exact = []

        for doc in documents:
            if doc.sha256_hash in seen_hashes:
                exact_dupes.append(
                    (seen_hashes[doc.sha256_hash].doc_id, doc.doc_id)
                )
            else:
                seen_hashes[doc.sha256_hash] = doc
                unique_after_exact.append(doc)

        # Phase 2: near-duplicate detection with MinHash LSH
        near_dupes = []
        unique_final = []
        suppressed = set()

        for doc in unique_after_exact:
            if doc.doc_id in suppressed:
                continue

            mh = self._create_minhash(doc.extracted_text)
            self.minhashes[doc.doc_id] = mh

            # Check for near-duplicates
            try:
                candidates = self.lsh.query(mh)
                for cand_id in candidates:
                    if cand_id != doc.doc_id:
                        sim = self.minhashes[cand_id].jaccard(mh)
                        if sim >= self.threshold:
                            near_dupes.append((cand_id, doc.doc_id, sim))
                            suppressed.add(doc.doc_id)
                            break
            except ValueError:
                pass

            if doc.doc_id not in suppressed:
                self.lsh.insert(doc.doc_id, mh)
                unique_final.append(doc)

        return DeduplicationResult(
            unique_docs=unique_final,
            exact_dupes=exact_dupes,
            near_dupes=near_dupes,
        )

Email threading

Reconstructing email threads from individual messages lets reviewers see complete conversations:

from dataclasses import dataclass, field


@dataclass
class EmailThread:
    thread_id: str
    subject: str
    messages: list[ProcessedDocument]
    participants: set[str] = field(default_factory=set)
    inclusive_message_id: str | None = None  # the message containing full thread

    @property
    def date_range(self) -> tuple[str, str]:
        dates = [m.metadata.get("date", "") for m in self.messages]
        dates = [d for d in dates if d]
        return (min(dates), max(dates)) if dates else ("", "")


class EmailThreader:
    def build_threads(
        self, emails: list[ProcessedDocument]
    ) -> list[EmailThread]:
        """Group emails into conversation threads."""
        # Build message-id index
        by_message_id: dict[str, ProcessedDocument] = {}
        for email in emails:
            msg_id = email.metadata.get("message_id", "")
            if msg_id:
                by_message_id[msg_id] = email

        # Build thread graph using In-Reply-To and References headers
        thread_map: dict[str, list[ProcessedDocument]] = {}
        assigned: dict[str, str] = {}  # doc_id -> thread_id

        for email in emails:
            # Find thread root by following reply chain
            root_id = self._find_root(email, by_message_id)
            if root_id not in thread_map:
                thread_map[root_id] = []
            thread_map[root_id].append(email)
            assigned[email.doc_id] = root_id

        # Build thread objects
        threads = []
        for thread_id, messages in thread_map.items():
            messages.sort(key=lambda m: m.metadata.get("date", ""))
            participants = set()
            for m in messages:
                participants.add(m.metadata.get("from", ""))
                for field_name in ("to", "cc"):
                    val = m.metadata.get(field_name, "")
                    if val:
                        participants.update(
                            addr.strip() for addr in val.split(",")
                        )
            participants.discard("")

            thread = EmailThread(
                thread_id=thread_id,
                subject=messages[0].metadata.get("subject", ""),
                messages=messages,
                participants=participants,
            )

            # The inclusive message is the last reply (contains all quoted text)
            if messages:
                thread.inclusive_message_id = messages[-1].doc_id

            threads.append(thread)

        return threads

    def _find_root(
        self,
        email: ProcessedDocument,
        index: dict[str, ProcessedDocument],
    ) -> str:
        """Follow reply chain to find the thread root message ID."""
        # Check References header first (lists all ancestors)
        refs = email.metadata.get("references", "")
        if refs:
            ref_ids = refs.strip().split()
            if ref_ids:
                return ref_ids[0]  # First reference is the root

        # Fall back to In-Reply-To
        reply_to = email.metadata.get("in_reply_to", "")
        if reply_to and reply_to in index:
            return self._find_root(index[reply_to], index)

        # This message is a root
        return email.metadata.get("message_id", email.doc_id)

Technology-Assisted Review (TAR)

TAR uses active learning to prioritize the most informative documents for human review:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
import numpy as np


class TAREngine:
    """Continuous Active Learning (CAL) for document review."""

    def __init__(self):
        self.vectorizer = TfidfVectorizer(
            max_features=50000,
            ngram_range=(1, 2),
            stop_words="english",
        )
        self.classifier = CalibratedClassifierCV(
            LogisticRegression(
                C=1.0, max_iter=1000, class_weight="balanced"
            ),
            cv=3,
        )
        self.is_fitted = False
        self.all_texts: list[str] = []
        self.all_ids: list[str] = []
        self.labels: dict[str, int] = {}  # doc_id -> 0/1

    def initialize(self, documents: list[ProcessedDocument]):
        """Load all documents for TAR."""
        self.all_texts = [doc.extracted_text for doc in documents]
        self.all_ids = [doc.doc_id for doc in documents]
        self.features = self.vectorizer.fit_transform(self.all_texts)

    def add_labels(self, labels: dict[str, int]):
        """Add human review decisions. 1=relevant, 0=not relevant."""
        self.labels.update(labels)
        self._retrain()

    def _retrain(self):
        """Retrain classifier on all labeled documents."""
        if len(self.labels) < 10:
            return

        labeled_indices = [
            self.all_ids.index(doc_id)
            for doc_id in self.labels
            if doc_id in self.all_ids
        ]
        X_train = self.features[labeled_indices]
        y_train = np.array([self.labels[self.all_ids[i]] for i in labeled_indices])

        # Need both classes represented
        if len(set(y_train)) < 2:
            return

        self.classifier.fit(X_train, y_train)
        self.is_fitted = True

    def get_next_batch(self, batch_size: int = 100) -> list[str]:
        """Return the most informative unlabeled documents for review."""
        if not self.is_fitted:
            # Before model is trained, return random sample
            unlabeled = [
                doc_id for doc_id in self.all_ids
                if doc_id not in self.labels
            ]
            return unlabeled[:batch_size]

        # Score all unlabeled documents
        unlabeled_indices = [
            i for i, doc_id in enumerate(self.all_ids)
            if doc_id not in self.labels
        ]

        if not unlabeled_indices:
            return []

        X_unlabeled = self.features[unlabeled_indices]
        probas = self.classifier.predict_proba(X_unlabeled)[:, 1]

        # Uncertainty sampling: pick documents closest to decision boundary
        uncertainty = np.abs(probas - 0.5)
        ranked = np.argsort(uncertainty)[:batch_size]

        return [self.all_ids[unlabeled_indices[i]] for i in ranked]

    def predict_relevance(self) -> dict[str, float]:
        """Predict relevance probability for all unlabeled documents."""
        if not self.is_fitted:
            return {}

        unlabeled_indices = [
            i for i, doc_id in enumerate(self.all_ids)
            if doc_id not in self.labels
        ]
        X = self.features[unlabeled_indices]
        probas = self.classifier.predict_proba(X)[:, 1]

        return {
            self.all_ids[unlabeled_indices[i]]: float(probas[i])
            for i in range(len(unlabeled_indices))
        }

    def estimate_recall(self) -> float:
        """Estimate the proportion of relevant documents found so far."""
        if not self.is_fitted:
            return 0.0
        predictions = self.predict_relevance()
        predicted_relevant = sum(1 for p in predictions.values() if p > 0.5)
        labeled_relevant = sum(1 for v in self.labels.values() if v == 1)
        total_estimated = predicted_relevant + labeled_relevant
        return labeled_relevant / total_estimated if total_estimated > 0 else 0.0

Production considerations

Chain of custody — Every processing step must be logged: which files were ingested, what hashes they produced, which were deduplicated, and what text was extracted. Courts require defensible processing, meaning you must demonstrate that no data was altered or lost.

Privilege detection — Attorney-client privileged documents must be identified and withheld from production. Python can flag potential privilege by detecting attorney names, law firm domains in email addresses, and privilege-indicating phrases (“attorney-client privileged,” “work product”).

Production formats — Opposing counsel may require specific formats: TIFF images with load files (Concordance, Relativity), native files, or PDF. Python generates these using Pillow for TIFF conversion and custom load file generators for review platform compatibility.

Proportionality — Federal Rule of Civil Procedure 26(b)(1) requires that discovery be proportional to the needs of the case. TAR helps demonstrate proportionality by showing statistical recall estimates — proving you’ve found a defensible percentage of relevant documents.

The one thing to remember: A production eDiscovery pipeline chains multi-format extraction, hash-based and MinHash deduplication, thread reconstruction, and active-learning TAR into a system that reduces millions of documents to a targeted review set — with full audit trails that satisfy court requirements for defensibility.

pythonlegal-techeDiscoverydata-processing

See Also

  • Python Contract Analysis Nlp How Python reads through legal contracts to find the important parts, risky clauses, and hidden surprises before you sign
  • Python Legal Citation Extraction How Python finds and understands references to laws, court cases, and regulations buried inside legal documents
  • Python Legal Document Parsing How Python breaks apart complex legal documents into organized, searchable pieces that computers and people can actually use
  • Activation Functions Why neural networks need these tiny mathematical functions — and how ReLU's simplicity accidentally made deep learning possible.
  • Ai Agents Architecture How AI systems go from answering questions to actually doing things — the design patterns that turn language models into autonomous agents that browse, code, and plan.