Knowledge Graph Construction with Python — Deep Dive
Architecture of a KG construction pipeline
A production knowledge graph pipeline has five stages: ingestion, preprocessing, extraction, resolution, and storage. Each stage feeds the next and can be run independently for debugging or incremental updates.
Raw Sources → Preprocessor → NER + RE → Entity Linker → Graph Store
↑ ↓
└──────────── Feedback / Quality Audit ←──────────────────┘
Stage 1: Ingestion and preprocessing
Before extraction, normalize text to improve downstream accuracy:
import spacy
from spacy.lang.en import English
nlp = spacy.load("en_core_web_trf") # transformer-based pipeline
def preprocess(text: str) -> list[spacy.tokens.Doc]:
"""Split long text into sentence-level docs for extraction."""
sentencizer = English()
sentencizer.add_pipe("sentencizer")
doc = sentencizer(text)
sentences = [sent.text.strip() for sent in doc.sents if len(sent.text.strip()) > 20]
return list(nlp.pipe(sentences, batch_size=64))
Sentence segmentation matters because relation extraction models perform better on single-sentence inputs than on paragraphs.
Stage 2: Named entity recognition
SpaCy’s transformer pipeline (en_core_web_trf) achieves ~90% F1 on the OntoNotes benchmark. For domain-specific entities, add custom patterns:
from spacy.matcher import Matcher
matcher = Matcher(nlp.vocab)
# Match chemical formulas like H2O, NaCl
pattern = [{"TEXT": {"REGEX": r"^[A-Z][a-z]?\d*(?:[A-Z][a-z]?\d*)*$"}}]
matcher.add("CHEMICAL", [pattern])
def extract_entities(doc):
standard = [(ent.text, ent.label_, ent.start_char, ent.end_char) for ent in doc.ents]
matches = matcher(doc)
custom = [(doc[start:end].text, "CHEMICAL", doc[start].idx, doc[end-1].idx + len(doc[end-1]))
for _, start, end in matches]
return standard + custom
Entity type filtering
Not all entity types matter for your graph. Filter aggressively — PERSON, ORG, GPE, and PRODUCT cover most business knowledge graphs. DATE and CARDINAL entities often add noise.
Stage 3: Relation extraction
Rule-based approach
For well-defined domains, dependency parse patterns are reliable and fast:
from spacy.tokens import Doc
def extract_relations_rule_based(doc: Doc) -> list[tuple]:
relations = []
for token in doc:
if token.dep_ == "nsubj" and token.head.pos_ == "VERB":
subject = token
verb = token.head
# Find direct object
objects = [child for child in verb.children if child.dep_ == "dobj"]
for obj in objects:
relations.append((subject.text, verb.lemma_, obj.text))
return relations
This catches simple Subject-Verb-Object patterns but misses passive constructions and complex sentences.
Transformer-based approach
For higher accuracy, use a pre-trained relation extraction model:
from transformers import pipeline
re_pipeline = pipeline(
"text2text-generation",
model="Babelscape/rebel-large",
device=0,
)
def extract_relations_transformer(text: str) -> list[dict]:
output = re_pipeline(text, max_length=256, num_beams=3)
raw = output[0]["generated_text"]
return parse_rebel_output(raw)
def parse_rebel_output(text: str) -> list[dict]:
"""Parse REBEL model output into structured triples."""
triples = []
current = {}
for token in text.replace("<s>", "").replace("</s>", "").split():
if token == "<triplet>":
current = {}
elif token == "<subj>":
current["subject"] = []
elif token == "<obj>":
current["object"] = []
elif token == "<rel>":
current["relation"] = []
elif "subject" in current and "relation" not in current:
current["subject"].append(token)
elif "relation" in current and "object" not in current:
current["relation"].append(token)
elif "object" in current:
current["object"].append(token)
# Finalize last triple
if current and all(k in current for k in ("subject", "relation", "object")):
triples.append({
"subject": " ".join(current["subject"]),
"relation": " ".join(current["relation"]),
"object": " ".join(current["object"]),
})
return triples
The REBEL model from Babelscape handles over 200 relation types from Wikidata and works well for general-purpose knowledge extraction.
Stage 4: Entity resolution and linking
String similarity baseline
from rapidfuzz import fuzz
def find_canonical(mention: str, known_entities: list[str], threshold: float = 85.0) -> str | None:
best_match = None
best_score = 0
for entity in known_entities:
score = fuzz.token_sort_ratio(mention, entity)
if score > best_score and score >= threshold:
best_score = score
best_match = entity
return best_match
Wikidata entity linking
For linking to a global knowledge base, query the Wikidata API:
import httpx
async def link_to_wikidata(mention: str) -> dict | None:
async with httpx.AsyncClient() as client:
resp = await client.get(
"https://www.wikidata.org/w/api.php",
params={
"action": "wbsearchentities",
"search": mention,
"language": "en",
"format": "json",
"limit": 3,
},
)
results = resp.json().get("search", [])
if results:
top = results[0]
return {"qid": top["id"], "label": top["label"], "description": top.get("description", "")}
return None
Disambiguation requires context. “Paris” could be the city, Paris Hilton, or the mythological figure. Use the surrounding entities and document topic to pick the right candidate.
Stage 5: Graph assembly
Using RDFLib for small graphs
from rdflib import Graph, Literal, Namespace, URIRef
from rdflib.namespace import RDF, RDFS
EX = Namespace("http://example.org/")
g = Graph()
def add_triple(subject: str, predicate: str, obj: str):
s = URIRef(EX[subject.replace(" ", "_")])
p = URIRef(EX[predicate.replace(" ", "_")])
o = URIRef(EX[obj.replace(" ", "_")])
g.add((s, p, o))
add_triple("Marie_Curie", "born_in", "Warsaw")
add_triple("Warsaw", "located_in", "Poland")
# Query with SPARQL
results = g.query("""
SELECT ?person ?country WHERE {
?person ex:born_in ?city .
?city ex:located_in ?country .
}
""", initNs={"ex": EX})
Using Neo4j for production graphs
def store_triples(tx, triples: list[dict]):
tx.run("""
UNWIND $triples AS t
MERGE (s:Entity {name: t.subject})
MERGE (o:Entity {name: t.object})
WITH s, o, t
CALL apoc.merge.relationship(s, t.relation, {}, {}, o, {}) YIELD rel
RETURN count(rel)
""", triples=triples)
Incremental updates
Production knowledge graphs need continuous updates. A change-detection pipeline compares new extractions against existing triples:
def compute_delta(existing: set[tuple], new: set[tuple]) -> dict:
return {
"additions": new - existing,
"deletions": existing - new,
"unchanged": existing & new,
}
Track provenance (source URL, extraction date, confidence score) on each triple so you can audit where facts came from and prioritize high-confidence triples during conflicts.
Quality assurance
Automated checks catch common extraction errors:
- Type consistency — If “born_in” always connects a PERSON to a GPE, flag triples where it connects two ORGs.
- Cardinality checks — A person has one birth date. Multiple “born_on” triples for the same person indicate an error.
- Temporal consistency — A person can’t work at a company before they were born.
- Confidence thresholds — Discard triples below a model confidence of 0.7 for automated pipelines; route marginal cases to human review.
Performance considerations
- Batch extraction — Process documents in batches of 64-128 through SpaCy’s
nlp.pipe()for GPU utilization. - Parallel ingestion — Use
concurrent.futuresor Celery to parallelize document processing. - Graph indexing — Create indexes on entity names before bulk insertion. Without indexes,
MERGEoperations degrade from O(log n) to O(n). - Memory management — RDFLib graphs above 10 million triples consume significant RAM. Switch to a dedicated triple store at that scale.
One thing to remember: The hardest part of knowledge graph construction isn’t the graph database — it’s the extraction pipeline. Invest in entity resolution and relation extraction quality, and the graph practically builds itself.
See Also
- Python Neo4j Integration How Python talks to a database that thinks in connections instead of rows and columns.
- Python Property Graph Modeling How Python designs rich maps of connected data where every dot and line can carry extra details.
- Python Rdf Sparql Queries How Python reads and asks questions about the web's universal language for describing things and their connections.
- Python Arima Forecasting How ARIMA models use patterns in past numbers to predict the future, explained like a bedtime story.
- Python Autocorrelation Analysis How today's number is connected to yesterday's, and why that connection is the secret weapon of time series analysis.