eDiscovery Processing with Python — Deep Dive
Multi-format document processing
eDiscovery data includes dozens of file types. A robust processor handles them all through a unified interface:
import hashlib
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ProcessedDocument:
doc_id: str
source_path: str
file_type: str
extracted_text: str
metadata: dict
md5_hash: str
sha256_hash: str
word_count: int
attachments: list["ProcessedDocument"] = field(default_factory=list)
parent_id: str | None = None
class DocumentProcessor:
def process(self, file_path: Path) -> ProcessedDocument:
suffix = file_path.suffix.lower()
extractors = {
".pdf": self._extract_pdf,
".docx": self._extract_docx,
".msg": self._extract_msg,
".eml": self._extract_eml,
".xlsx": self._extract_xlsx,
".txt": self._extract_text,
".html": self._extract_html,
}
extractor = extractors.get(suffix, self._extract_text)
text, metadata = extractor(file_path)
# Generate hashes for deduplication
content_bytes = text.encode("utf-8")
md5 = hashlib.md5(content_bytes).hexdigest()
sha256 = hashlib.sha256(content_bytes).hexdigest()
return ProcessedDocument(
doc_id=sha256[:16],
source_path=str(file_path),
file_type=suffix,
extracted_text=text,
metadata=metadata,
md5_hash=md5,
sha256_hash=sha256,
word_count=len(text.split()),
)
def _extract_pdf(self, path: Path) -> tuple[str, dict]:
import pdfplumber
pages = []
metadata = {}
with pdfplumber.open(path) as pdf:
metadata = pdf.metadata or {}
for page in pdf.pages:
text = page.extract_text()
if text:
pages.append(text)
else:
# Fall back to OCR for scanned pages
pages.append(self._ocr_page(page))
return "\n\n".join(pages), metadata
def _ocr_page(self, page) -> str:
"""OCR a single PDF page using pytesseract."""
import pytesseract
from PIL import Image
img = page.to_image(resolution=300).original
return pytesseract.image_to_string(img)
def _extract_msg(self, path: Path) -> tuple[str, dict]:
"""Extract Outlook .msg files."""
import extract_msg
msg = extract_msg.Message(str(path))
metadata = {
"from": msg.sender,
"to": msg.to,
"cc": msg.cc,
"subject": msg.subject,
"date": str(msg.date),
"message_id": msg.messageId,
"in_reply_to": getattr(msg, "inReplyTo", None),
}
return msg.body or "", metadata
def _extract_eml(self, path: Path) -> tuple[str, dict]:
"""Extract .eml email files."""
import email
from email import policy
with open(path, "rb") as f:
msg = email.message_from_binary_file(f, policy=policy.default)
body = msg.get_body(preferencelist=("plain", "html"))
text = body.get_content() if body else ""
metadata = {
"from": msg["from"],
"to": msg["to"],
"cc": msg.get("cc", ""),
"subject": msg["subject"],
"date": msg["date"],
"message_id": msg["message-id"],
"in_reply_to": msg.get("in-reply-to", ""),
"references": msg.get("references", ""),
}
return text, metadata
def _extract_docx(self, path: Path) -> tuple[str, dict]:
from docx import Document
doc = Document(str(path))
text = "\n".join(p.text for p in doc.paragraphs)
props = doc.core_properties
metadata = {
"author": props.author,
"created": str(props.created),
"modified": str(props.modified),
"title": props.title,
}
return text, metadata
def _extract_xlsx(self, path: Path) -> tuple[str, dict]:
import openpyxl
wb = openpyxl.load_workbook(str(path), read_only=True)
sheets_text = []
for sheet in wb.sheetnames:
ws = wb[sheet]
rows = []
for row in ws.iter_rows(values_only=True):
rows.append(" | ".join(str(c) if c else "" for c in row))
sheets_text.append(f"[Sheet: {sheet}]\n" + "\n".join(rows))
return "\n\n".join(sheets_text), {"sheets": wb.sheetnames}
def _extract_html(self, path: Path) -> tuple[str, dict]:
from bs4 import BeautifulSoup
with open(path, "r", errors="ignore") as f:
soup = BeautifulSoup(f, "html.parser")
return soup.get_text(separator="\n"), {"title": soup.title.string if soup.title else ""}
def _extract_text(self, path: Path) -> tuple[str, dict]:
with open(path, "r", errors="ignore") as f:
return f.read(), {}
Deduplication engine
Exact deduplication uses hashes. Near-duplicate detection uses MinHash with LSH for efficiency across millions of documents:
from datasketch import MinHash, MinHashLSH
from dataclasses import dataclass
@dataclass
class DeduplicationResult:
unique_docs: list[ProcessedDocument]
exact_dupes: list[tuple[str, str]] # (kept_id, removed_id)
near_dupes: list[tuple[str, str, float]] # (id1, id2, similarity)
class DeduplicationEngine:
def __init__(self, similarity_threshold: float = 0.85):
self.threshold = similarity_threshold
self.lsh = MinHashLSH(threshold=similarity_threshold, num_perm=128)
self.minhashes: dict[str, MinHash] = {}
def _create_minhash(self, text: str) -> MinHash:
"""Create MinHash from document text using word shingles."""
m = MinHash(num_perm=128)
words = text.lower().split()
# Use 3-word shingles for better near-dupe detection
for i in range(len(words) - 2):
shingle = " ".join(words[i:i + 3])
m.update(shingle.encode("utf-8"))
return m
def deduplicate(
self, documents: list[ProcessedDocument]
) -> DeduplicationResult:
# Phase 1: exact deduplication by hash
seen_hashes: dict[str, ProcessedDocument] = {}
exact_dupes = []
unique_after_exact = []
for doc in documents:
if doc.sha256_hash in seen_hashes:
exact_dupes.append(
(seen_hashes[doc.sha256_hash].doc_id, doc.doc_id)
)
else:
seen_hashes[doc.sha256_hash] = doc
unique_after_exact.append(doc)
# Phase 2: near-duplicate detection with MinHash LSH
near_dupes = []
unique_final = []
suppressed = set()
for doc in unique_after_exact:
if doc.doc_id in suppressed:
continue
mh = self._create_minhash(doc.extracted_text)
self.minhashes[doc.doc_id] = mh
# Check for near-duplicates
try:
candidates = self.lsh.query(mh)
for cand_id in candidates:
if cand_id != doc.doc_id:
sim = self.minhashes[cand_id].jaccard(mh)
if sim >= self.threshold:
near_dupes.append((cand_id, doc.doc_id, sim))
suppressed.add(doc.doc_id)
break
except ValueError:
pass
if doc.doc_id not in suppressed:
self.lsh.insert(doc.doc_id, mh)
unique_final.append(doc)
return DeduplicationResult(
unique_docs=unique_final,
exact_dupes=exact_dupes,
near_dupes=near_dupes,
)
Email threading
Reconstructing email threads from individual messages lets reviewers see complete conversations:
from dataclasses import dataclass, field
@dataclass
class EmailThread:
thread_id: str
subject: str
messages: list[ProcessedDocument]
participants: set[str] = field(default_factory=set)
inclusive_message_id: str | None = None # the message containing full thread
@property
def date_range(self) -> tuple[str, str]:
dates = [m.metadata.get("date", "") for m in self.messages]
dates = [d for d in dates if d]
return (min(dates), max(dates)) if dates else ("", "")
class EmailThreader:
def build_threads(
self, emails: list[ProcessedDocument]
) -> list[EmailThread]:
"""Group emails into conversation threads."""
# Build message-id index
by_message_id: dict[str, ProcessedDocument] = {}
for email in emails:
msg_id = email.metadata.get("message_id", "")
if msg_id:
by_message_id[msg_id] = email
# Build thread graph using In-Reply-To and References headers
thread_map: dict[str, list[ProcessedDocument]] = {}
assigned: dict[str, str] = {} # doc_id -> thread_id
for email in emails:
# Find thread root by following reply chain
root_id = self._find_root(email, by_message_id)
if root_id not in thread_map:
thread_map[root_id] = []
thread_map[root_id].append(email)
assigned[email.doc_id] = root_id
# Build thread objects
threads = []
for thread_id, messages in thread_map.items():
messages.sort(key=lambda m: m.metadata.get("date", ""))
participants = set()
for m in messages:
participants.add(m.metadata.get("from", ""))
for field_name in ("to", "cc"):
val = m.metadata.get(field_name, "")
if val:
participants.update(
addr.strip() for addr in val.split(",")
)
participants.discard("")
thread = EmailThread(
thread_id=thread_id,
subject=messages[0].metadata.get("subject", ""),
messages=messages,
participants=participants,
)
# The inclusive message is the last reply (contains all quoted text)
if messages:
thread.inclusive_message_id = messages[-1].doc_id
threads.append(thread)
return threads
def _find_root(
self,
email: ProcessedDocument,
index: dict[str, ProcessedDocument],
) -> str:
"""Follow reply chain to find the thread root message ID."""
# Check References header first (lists all ancestors)
refs = email.metadata.get("references", "")
if refs:
ref_ids = refs.strip().split()
if ref_ids:
return ref_ids[0] # First reference is the root
# Fall back to In-Reply-To
reply_to = email.metadata.get("in_reply_to", "")
if reply_to and reply_to in index:
return self._find_root(index[reply_to], index)
# This message is a root
return email.metadata.get("message_id", email.doc_id)
Technology-Assisted Review (TAR)
TAR uses active learning to prioritize the most informative documents for human review:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.calibration import CalibratedClassifierCV
import numpy as np
class TAREngine:
"""Continuous Active Learning (CAL) for document review."""
def __init__(self):
self.vectorizer = TfidfVectorizer(
max_features=50000,
ngram_range=(1, 2),
stop_words="english",
)
self.classifier = CalibratedClassifierCV(
LogisticRegression(
C=1.0, max_iter=1000, class_weight="balanced"
),
cv=3,
)
self.is_fitted = False
self.all_texts: list[str] = []
self.all_ids: list[str] = []
self.labels: dict[str, int] = {} # doc_id -> 0/1
def initialize(self, documents: list[ProcessedDocument]):
"""Load all documents for TAR."""
self.all_texts = [doc.extracted_text for doc in documents]
self.all_ids = [doc.doc_id for doc in documents]
self.features = self.vectorizer.fit_transform(self.all_texts)
def add_labels(self, labels: dict[str, int]):
"""Add human review decisions. 1=relevant, 0=not relevant."""
self.labels.update(labels)
self._retrain()
def _retrain(self):
"""Retrain classifier on all labeled documents."""
if len(self.labels) < 10:
return
labeled_indices = [
self.all_ids.index(doc_id)
for doc_id in self.labels
if doc_id in self.all_ids
]
X_train = self.features[labeled_indices]
y_train = np.array([self.labels[self.all_ids[i]] for i in labeled_indices])
# Need both classes represented
if len(set(y_train)) < 2:
return
self.classifier.fit(X_train, y_train)
self.is_fitted = True
def get_next_batch(self, batch_size: int = 100) -> list[str]:
"""Return the most informative unlabeled documents for review."""
if not self.is_fitted:
# Before model is trained, return random sample
unlabeled = [
doc_id for doc_id in self.all_ids
if doc_id not in self.labels
]
return unlabeled[:batch_size]
# Score all unlabeled documents
unlabeled_indices = [
i for i, doc_id in enumerate(self.all_ids)
if doc_id not in self.labels
]
if not unlabeled_indices:
return []
X_unlabeled = self.features[unlabeled_indices]
probas = self.classifier.predict_proba(X_unlabeled)[:, 1]
# Uncertainty sampling: pick documents closest to decision boundary
uncertainty = np.abs(probas - 0.5)
ranked = np.argsort(uncertainty)[:batch_size]
return [self.all_ids[unlabeled_indices[i]] for i in ranked]
def predict_relevance(self) -> dict[str, float]:
"""Predict relevance probability for all unlabeled documents."""
if not self.is_fitted:
return {}
unlabeled_indices = [
i for i, doc_id in enumerate(self.all_ids)
if doc_id not in self.labels
]
X = self.features[unlabeled_indices]
probas = self.classifier.predict_proba(X)[:, 1]
return {
self.all_ids[unlabeled_indices[i]]: float(probas[i])
for i in range(len(unlabeled_indices))
}
def estimate_recall(self) -> float:
"""Estimate the proportion of relevant documents found so far."""
if not self.is_fitted:
return 0.0
predictions = self.predict_relevance()
predicted_relevant = sum(1 for p in predictions.values() if p > 0.5)
labeled_relevant = sum(1 for v in self.labels.values() if v == 1)
total_estimated = predicted_relevant + labeled_relevant
return labeled_relevant / total_estimated if total_estimated > 0 else 0.0
Production considerations
Chain of custody — Every processing step must be logged: which files were ingested, what hashes they produced, which were deduplicated, and what text was extracted. Courts require defensible processing, meaning you must demonstrate that no data was altered or lost.
Privilege detection — Attorney-client privileged documents must be identified and withheld from production. Python can flag potential privilege by detecting attorney names, law firm domains in email addresses, and privilege-indicating phrases (“attorney-client privileged,” “work product”).
Production formats — Opposing counsel may require specific formats: TIFF images with load files (Concordance, Relativity), native files, or PDF. Python generates these using Pillow for TIFF conversion and custom load file generators for review platform compatibility.
Proportionality — Federal Rule of Civil Procedure 26(b)(1) requires that discovery be proportional to the needs of the case. TAR helps demonstrate proportionality by showing statistical recall estimates — proving you’ve found a defensible percentage of relevant documents.
The one thing to remember: A production eDiscovery pipeline chains multi-format extraction, hash-based and MinHash deduplication, thread reconstruction, and active-learning TAR into a system that reduces millions of documents to a targeted review set — with full audit trails that satisfy court requirements for defensibility.
See Also
- Python Contract Analysis Nlp How Python reads through legal contracts to find the important parts, risky clauses, and hidden surprises before you sign
- Python Legal Citation Extraction How Python finds and understands references to laws, court cases, and regulations buried inside legal documents
- Python Legal Document Parsing How Python breaks apart complex legal documents into organized, searchable pieces that computers and people can actually use
- Activation Functions Why neural networks need these tiny mathematical functions — and how ReLU's simplicity accidentally made deep learning possible.
- Ai Agents Architecture How AI systems go from answering questions to actually doing things — the design patterns that turn language models into autonomous agents that browse, code, and plan.