Knowledge Graph Construction with Python — Deep Dive

Architecture of a KG construction pipeline

A production knowledge graph pipeline has five stages: ingestion, preprocessing, extraction, resolution, and storage. Each stage feeds the next and can be run independently for debugging or incremental updates.

Raw Sources → Preprocessor → NER + RE → Entity Linker → Graph Store
     ↑                                                        ↓
     └──────────── Feedback / Quality Audit ←──────────────────┘

Stage 1: Ingestion and preprocessing

Before extraction, normalize text to improve downstream accuracy:

import spacy
from spacy.lang.en import English

nlp = spacy.load("en_core_web_trf")  # transformer-based pipeline

def preprocess(text: str) -> list[spacy.tokens.Doc]:
    """Split long text into sentence-level docs for extraction."""
    sentencizer = English()
    sentencizer.add_pipe("sentencizer")
    doc = sentencizer(text)
    sentences = [sent.text.strip() for sent in doc.sents if len(sent.text.strip()) > 20]
    return list(nlp.pipe(sentences, batch_size=64))

Sentence segmentation matters because relation extraction models perform better on single-sentence inputs than on paragraphs.

Stage 2: Named entity recognition

SpaCy’s transformer pipeline (en_core_web_trf) achieves ~90% F1 on the OntoNotes benchmark. For domain-specific entities, add custom patterns:

from spacy.matcher import Matcher

matcher = Matcher(nlp.vocab)

# Match chemical formulas like H2O, NaCl
pattern = [{"TEXT": {"REGEX": r"^[A-Z][a-z]?\d*(?:[A-Z][a-z]?\d*)*$"}}]
matcher.add("CHEMICAL", [pattern])

def extract_entities(doc):
    standard = [(ent.text, ent.label_, ent.start_char, ent.end_char) for ent in doc.ents]
    matches = matcher(doc)
    custom = [(doc[start:end].text, "CHEMICAL", doc[start].idx, doc[end-1].idx + len(doc[end-1]))
              for _, start, end in matches]
    return standard + custom

Entity type filtering

Not all entity types matter for your graph. Filter aggressively — PERSON, ORG, GPE, and PRODUCT cover most business knowledge graphs. DATE and CARDINAL entities often add noise.

Stage 3: Relation extraction

Rule-based approach

For well-defined domains, dependency parse patterns are reliable and fast:

from spacy.tokens import Doc

def extract_relations_rule_based(doc: Doc) -> list[tuple]:
    relations = []
    for token in doc:
        if token.dep_ == "nsubj" and token.head.pos_ == "VERB":
            subject = token
            verb = token.head
            # Find direct object
            objects = [child for child in verb.children if child.dep_ == "dobj"]
            for obj in objects:
                relations.append((subject.text, verb.lemma_, obj.text))
    return relations

This catches simple Subject-Verb-Object patterns but misses passive constructions and complex sentences.

Transformer-based approach

For higher accuracy, use a pre-trained relation extraction model:

from transformers import pipeline

re_pipeline = pipeline(
    "text2text-generation",
    model="Babelscape/rebel-large",
    device=0,
)

def extract_relations_transformer(text: str) -> list[dict]:
    output = re_pipeline(text, max_length=256, num_beams=3)
    raw = output[0]["generated_text"]
    return parse_rebel_output(raw)

def parse_rebel_output(text: str) -> list[dict]:
    """Parse REBEL model output into structured triples."""
    triples = []
    current = {}
    for token in text.replace("<s>", "").replace("</s>", "").split():
        if token == "<triplet>":
            current = {}
        elif token == "<subj>":
            current["subject"] = []
        elif token == "<obj>":
            current["object"] = []
        elif token == "<rel>":
            current["relation"] = []
        elif "subject" in current and "relation" not in current:
            current["subject"].append(token)
        elif "relation" in current and "object" not in current:
            current["relation"].append(token)
        elif "object" in current:
            current["object"].append(token)
    # Finalize last triple
    if current and all(k in current for k in ("subject", "relation", "object")):
        triples.append({
            "subject": " ".join(current["subject"]),
            "relation": " ".join(current["relation"]),
            "object": " ".join(current["object"]),
        })
    return triples

The REBEL model from Babelscape handles over 200 relation types from Wikidata and works well for general-purpose knowledge extraction.

Stage 4: Entity resolution and linking

String similarity baseline

from rapidfuzz import fuzz

def find_canonical(mention: str, known_entities: list[str], threshold: float = 85.0) -> str | None:
    best_match = None
    best_score = 0
    for entity in known_entities:
        score = fuzz.token_sort_ratio(mention, entity)
        if score > best_score and score >= threshold:
            best_score = score
            best_match = entity
    return best_match

Wikidata entity linking

For linking to a global knowledge base, query the Wikidata API:

import httpx

async def link_to_wikidata(mention: str) -> dict | None:
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            "https://www.wikidata.org/w/api.php",
            params={
                "action": "wbsearchentities",
                "search": mention,
                "language": "en",
                "format": "json",
                "limit": 3,
            },
        )
        results = resp.json().get("search", [])
        if results:
            top = results[0]
            return {"qid": top["id"], "label": top["label"], "description": top.get("description", "")}
    return None

Disambiguation requires context. “Paris” could be the city, Paris Hilton, or the mythological figure. Use the surrounding entities and document topic to pick the right candidate.

Stage 5: Graph assembly

Using RDFLib for small graphs

from rdflib import Graph, Literal, Namespace, URIRef
from rdflib.namespace import RDF, RDFS

EX = Namespace("http://example.org/")
g = Graph()

def add_triple(subject: str, predicate: str, obj: str):
    s = URIRef(EX[subject.replace(" ", "_")])
    p = URIRef(EX[predicate.replace(" ", "_")])
    o = URIRef(EX[obj.replace(" ", "_")])
    g.add((s, p, o))

add_triple("Marie_Curie", "born_in", "Warsaw")
add_triple("Warsaw", "located_in", "Poland")

# Query with SPARQL
results = g.query("""
    SELECT ?person ?country WHERE {
        ?person ex:born_in ?city .
        ?city ex:located_in ?country .
    }
""", initNs={"ex": EX})

Using Neo4j for production graphs

def store_triples(tx, triples: list[dict]):
    tx.run("""
        UNWIND $triples AS t
        MERGE (s:Entity {name: t.subject})
        MERGE (o:Entity {name: t.object})
        WITH s, o, t
        CALL apoc.merge.relationship(s, t.relation, {}, {}, o, {}) YIELD rel
        RETURN count(rel)
    """, triples=triples)

Incremental updates

Production knowledge graphs need continuous updates. A change-detection pipeline compares new extractions against existing triples:

def compute_delta(existing: set[tuple], new: set[tuple]) -> dict:
    return {
        "additions": new - existing,
        "deletions": existing - new,
        "unchanged": existing & new,
    }

Track provenance (source URL, extraction date, confidence score) on each triple so you can audit where facts came from and prioritize high-confidence triples during conflicts.

Quality assurance

Automated checks catch common extraction errors:

  • Type consistency — If “born_in” always connects a PERSON to a GPE, flag triples where it connects two ORGs.
  • Cardinality checks — A person has one birth date. Multiple “born_on” triples for the same person indicate an error.
  • Temporal consistency — A person can’t work at a company before they were born.
  • Confidence thresholds — Discard triples below a model confidence of 0.7 for automated pipelines; route marginal cases to human review.

Performance considerations

  • Batch extraction — Process documents in batches of 64-128 through SpaCy’s nlp.pipe() for GPU utilization.
  • Parallel ingestion — Use concurrent.futures or Celery to parallelize document processing.
  • Graph indexing — Create indexes on entity names before bulk insertion. Without indexes, MERGE operations degrade from O(log n) to O(n).
  • Memory management — RDFLib graphs above 10 million triples consume significant RAM. Switch to a dedicated triple store at that scale.

One thing to remember: The hardest part of knowledge graph construction isn’t the graph database — it’s the extraction pipeline. Invest in entity resolution and relation extraction quality, and the graph practically builds itself.

pythonknowledge-graphsdata-engineering

See Also