Plagiarism Detection in Python — Deep Dive

A production plagiarism detection system combines multiple similarity algorithms at different granularities. This guide implements each layer in Python and shows how to integrate them into a complete pipeline.

Text Preprocessing

Consistent normalization is critical — minor formatting differences should not affect detection:

import re
import unicodedata

def normalize_text(text: str) -> str:
    """Normalize text for plagiarism comparison."""
    # Unicode normalization
    text = unicodedata.normalize("NFKC", text)
    # Lowercase
    text = text.lower()
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    # Expand common contractions
    contractions = {"don't": "do not", "won't": "will not", "can't": "cannot",
                    "it's": "it is", "they're": "they are", "we're": "we are"}
    for contraction, expansion in contractions.items():
        text = text.replace(contraction, expansion)
    return text

def tokenize_words(text: str) -> list[str]:
    """Split into word tokens, removing punctuation."""
    return re.findall(r'\b\w+\b', normalize_text(text))

Winnowing Algorithm for Fingerprinting

Winnowing guarantees detection of any shared substring longer than a threshold while keeping fingerprint sets compact:

import hashlib

def hash_ngram(ngram: tuple[str, ...]) -> int:
    """Hash an n-gram to an integer."""
    text = " ".join(ngram)
    return int(hashlib.md5(text.encode()).hexdigest()[:8], 16)

def generate_ngrams(tokens: list[str], n: int = 5) -> list[tuple]:
    """Generate overlapping n-grams from token list."""
    return [tuple(tokens[i:i+n]) for i in range(len(tokens) - n + 1)]

def winnow(hashes: list[int], window_size: int = 4) -> set[tuple[int, int]]:
    """
    Select fingerprints using the Winnowing algorithm.
    Returns set of (hash_value, position) tuples.
    """
    if len(hashes) < window_size:
        return {(h, i) for i, h in enumerate(hashes)}

    fingerprints = set()
    prev_min_idx = -1

    for i in range(len(hashes) - window_size + 1):
        window = hashes[i:i + window_size]
        # Find rightmost minimum in window
        min_val = min(window)
        min_idx = i + len(window) - 1 - window[::-1].index(min_val)

        if min_idx != prev_min_idx:
            fingerprints.add((hashes[min_idx], min_idx))
            prev_min_idx = min_idx

    return fingerprints

def document_fingerprint(text: str, n: int = 5, window: int = 4) -> set[int]:
    """Create a Winnowing fingerprint set for a document."""
    tokens = tokenize_words(text)
    ngrams = generate_ngrams(tokens, n)
    hashes = [hash_ngram(ng) for ng in ngrams]
    fingerprints = winnow(hashes, window)
    return {fp[0] for fp in fingerprints}

def jaccard_similarity(fp1: set[int], fp2: set[int]) -> float:
    """Jaccard similarity between two fingerprint sets."""
    if not fp1 or not fp2:
        return 0.0
    intersection = len(fp1 & fp2)
    union = len(fp1 | fp2)
    return intersection / union

TF-IDF Document Comparison

For corpus-level similarity detection:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class TFIDFDetector:
    def __init__(self):
        self.vectorizer = TfidfVectorizer(
            max_features=50000,
            ngram_range=(1, 2),
            stop_words="english",
            sublinear_tf=True,
        )
        self.corpus_vectors = None
        self.corpus_ids = []

    def index_corpus(self, documents: list[dict]):
        """Build TF-IDF index from corpus documents."""
        texts = [normalize_text(doc["text"]) for doc in documents]
        self.corpus_ids = [doc["id"] for doc in documents]
        self.corpus_vectors = self.vectorizer.fit_transform(texts)

    def find_similar(self, query_text: str, top_k: int = 10,
                     threshold: float = 0.3) -> list[dict]:
        """Find corpus documents similar to query."""
        query_normalized = normalize_text(query_text)
        query_vec = self.vectorizer.transform([query_normalized])
        similarities = cosine_similarity(query_vec, self.corpus_vectors)[0]

        top_indices = np.argsort(similarities)[::-1][:top_k]
        results = []
        for idx in top_indices:
            sim = similarities[idx]
            if sim >= threshold:
                results.append({
                    "doc_id": self.corpus_ids[idx],
                    "similarity": float(sim),
                })
        return results

Sentence-Level Semantic Matching

For paraphrase detection, compare individual sentences using embeddings:

from sentence_transformers import SentenceTransformer, util
import numpy as np

class SemanticMatcher:
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def find_matching_passages(self, submitted: str, source: str,
                                threshold: float = 0.8) -> list[dict]:
        """Find semantically similar sentence pairs between two documents."""
        import spacy
        nlp_local = spacy.load("en_core_web_sm")

        sub_sents = [s.text.strip() for s in nlp_local(submitted).sents if len(s.text.split()) > 5]
        src_sents = [s.text.strip() for s in nlp_local(source).sents if len(s.text.split()) > 5]

        if not sub_sents or not src_sents:
            return []

        sub_embs = self.model.encode(sub_sents, convert_to_tensor=True)
        src_embs = self.model.encode(src_sents, convert_to_tensor=True)

        sim_matrix = util.cos_sim(sub_embs, src_embs)

        matches = []
        for i in range(len(sub_sents)):
            max_sim_idx = int(sim_matrix[i].argmax())
            max_sim = float(sim_matrix[i][max_sim_idx])
            if max_sim >= threshold:
                matches.append({
                    "submitted_sentence": sub_sents[i],
                    "source_sentence": src_sents[max_sim_idx],
                    "similarity": max_sim,
                    "submitted_index": i,
                    "source_index": max_sim_idx,
                })

        return sorted(matches, key=lambda m: -m["similarity"])

Passage Alignment and Highlighting

After finding matching sentences, group them into contiguous passages:

def align_passages(matches: list[dict], gap_tolerance: int = 2) -> list[dict]:
    """Group consecutive matching sentences into passage-level alignments."""
    if not matches:
        return []

    sorted_matches = sorted(matches, key=lambda m: m["submitted_index"])

    passages = []
    current = {
        "submitted_start": sorted_matches[0]["submitted_index"],
        "submitted_end": sorted_matches[0]["submitted_index"],
        "source_start": sorted_matches[0]["source_index"],
        "source_end": sorted_matches[0]["source_index"],
        "matches": [sorted_matches[0]],
        "avg_similarity": sorted_matches[0]["similarity"],
    }

    for match in sorted_matches[1:]:
        sub_gap = match["submitted_index"] - current["submitted_end"]
        src_gap = abs(match["source_index"] - current["source_end"])

        if sub_gap <= gap_tolerance and src_gap <= gap_tolerance:
            current["submitted_end"] = match["submitted_index"]
            current["source_end"] = match["source_index"]
            current["matches"].append(match)
            current["avg_similarity"] = np.mean([m["similarity"] for m in current["matches"]])
        else:
            passages.append(current)
            current = {
                "submitted_start": match["submitted_index"],
                "submitted_end": match["submitted_index"],
                "source_start": match["source_index"],
                "source_end": match["source_index"],
                "matches": [match],
                "avg_similarity": match["similarity"],
            }

    passages.append(current)
    return [p for p in passages if len(p["matches"]) >= 2]

Complete Detection Pipeline

class PlagiarismDetector:
    def __init__(self):
        self.tfidf = TFIDFDetector()
        self.semantic = SemanticMatcher()

    def check_document(self, submitted_text: str,
                       corpus: list[dict]) -> dict:
        """Run full plagiarism check pipeline."""
        # Stage 1: Fast fingerprint screening
        sub_fp = document_fingerprint(submitted_text)
        candidates = []
        for doc in corpus:
            doc_fp = document_fingerprint(doc["text"])
            jacc = jaccard_similarity(sub_fp, doc_fp)
            if jacc > 0.05:
                candidates.append({**doc, "fingerprint_sim": jacc})

        # Stage 2: TF-IDF ranking of candidates
        if not candidates:
            self.tfidf.index_corpus(corpus)
            tfidf_results = self.tfidf.find_similar(submitted_text, top_k=20)
            candidate_ids = {r["doc_id"] for r in tfidf_results}
            candidates = [d for d in corpus if d["id"] in candidate_ids]

        # Stage 3: Semantic passage matching on top candidates
        all_matches = []
        for candidate in candidates[:10]:
            matches = self.semantic.find_matching_passages(
                submitted_text, candidate["text"], threshold=0.75
            )
            if matches:
                passages = align_passages(matches)
                all_matches.append({
                    "source_id": candidate["id"],
                    "source_title": candidate.get("title", "Unknown"),
                    "passages": passages,
                    "max_similarity": max(m["similarity"] for m in matches),
                    "matched_sentences": len(matches),
                })

        # Calculate overall score
        total_sentences = len(submitted_text.split("."))
        matched_sentences = sum(m["matched_sentences"] for m in all_matches)
        similarity_pct = min(100, (matched_sentences / max(total_sentences, 1)) * 100)

        return {
            "similarity_percentage": round(similarity_pct, 1),
            "sources": sorted(all_matches, key=lambda m: -m["max_similarity"]),
            "verdict": self._classify(similarity_pct),
        }

    def _classify(self, pct: float) -> str:
        if pct < 10:
            return "low_similarity"
        elif pct < 25:
            return "moderate_similarity"
        elif pct < 50:
            return "high_similarity"
        else:
            return "very_high_similarity"

Scaling with MinHash LSH

For large corpora (millions of documents), exact fingerprint comparison is too slow. MinHash with Locality-Sensitive Hashing provides approximate nearest neighbor search:

from datasketch import MinHash, MinHashLSH

def build_minhash_index(corpus: list[dict], num_perm: int = 128,
                        threshold: float = 0.3) -> MinHashLSH:
    """Build an LSH index for fast approximate similarity search."""
    lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)

    for doc in corpus:
        tokens = tokenize_words(doc["text"])
        ngrams = {" ".join(ng) for ng in generate_ngrams(tokens, n=5)}
        mh = MinHash(num_perm=num_perm)
        for ng in ngrams:
            mh.update(ng.encode("utf-8"))
        lsh.insert(doc["id"], mh)

    return lsh

def query_minhash(text: str, lsh: MinHashLSH, num_perm: int = 128) -> list[str]:
    """Find candidate similar documents using LSH."""
    tokens = tokenize_words(text)
    ngrams = {" ".join(ng) for ng in generate_ngrams(tokens, n=5)}
    mh = MinHash(num_perm=num_perm)
    for ng in ngrams:
        mh.update(ng.encode("utf-8"))
    return lsh.query(mh)

MinHash LSH reduces candidate retrieval from O(n) to approximately O(1) per query, making it feasible to check against corpora of 10 million+ documents. The tradeoff is approximate results — some truly similar documents may be missed, and the threshold must be tuned empirically.

Source Code Plagiarism

For programming assignments, token-based comparison replaces word-based methods. Tools like MOSS (Measure of Software Similarity) tokenize source code, strip variable names and comments, and compare token sequences. Python’s ast module can normalize code structure before comparison, catching renamed-variable plagiarism that text-based tools miss.

The one thing to remember: Effective plagiarism detection layers fast approximate methods (fingerprinting, MinHash LSH) for candidate retrieval with precise semantic methods (embeddings, passage alignment) for detailed comparison — and the system’s output is always a similarity report for human interpretation, not an automated verdict.

pythonplagiarism-detectionnlpeducation-technology

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.