Link Prediction with Python — Deep Dive

NetworkX provides built-in link prediction functions:

import networkx as nx
import numpy as np

G = nx.karate_club_graph()

# Compute scores for all non-edges
preds_cn = list(nx.common_neighbor_centrality(G))  # Common neighbors
preds_jc = list(nx.jaccard_coefficient(G))          # Jaccard
preds_aa = list(nx.adamic_adar_index(G))            # Adamic-Adar
preds_pa = list(nx.preferential_attachment(G))      # Preferential attachment

# Each returns (u, v, score) tuples for all non-edges
top_10_aa = sorted(preds_aa, key=lambda x: x[2], reverse=True)[:10]
for u, v, score in top_10_aa:
    print(f"({u}, {v}): Adamic-Adar = {score:.4f}")

Resource Allocation Index

A variant of Adamic-Adar that uses 1/degree instead of 1/log(degree):

preds_ra = list(nx.resource_allocation_index(G))

Resource Allocation penalizes high-degree shared neighbors more aggressively, often outperforming Adamic-Adar on social networks.

Temporal train/test split

Proper evaluation requires temporal splitting. For static graphs without timestamps, simulate by removing a percentage of edges:

from sklearn.model_selection import train_test_split

def temporal_split(G, test_fraction=0.2, seed=42):
    """Split edges into train and test sets."""
    edges = list(G.edges())
    train_edges, test_edges = train_test_split(
        edges, test_size=test_fraction, random_state=seed
    )

    G_train = G.copy()
    G_train.remove_edges_from(test_edges)

    # Remove isolated nodes from test set (can't predict without context)
    test_edges = [(u, v) for u, v in test_edges
                  if G_train.has_node(u) and G_train.has_node(v)
                  and G_train.degree(u) > 0 and G_train.degree(v) > 0]

    return G_train, test_edges

G_train, test_edges = temporal_split(G)
print(f"Train edges: {G_train.number_of_edges()}, Test edges: {len(test_edges)}")

Negative sampling

def sample_negatives(G, n_samples, seed=42):
    """Sample non-edges as negative examples."""
    rng = np.random.default_rng(seed)
    nodes = list(G.nodes())
    negatives = set()

    while len(negatives) < n_samples:
        u, v = rng.choice(nodes, size=2, replace=False)
        if u != v and not G.has_edge(u, v) and (u, v) not in negatives:
            negatives.add((u, v))

    return list(negatives)

def sample_hard_negatives(G, positive_edges, seed=42):
    """Sample negatives that are 2-3 hops apart (harder to classify)."""
    rng = np.random.default_rng(seed)
    hard_negs = set()

    for u, v in positive_edges:
        # Find nodes 2 hops from u that aren't connected
        two_hop = set()
        for neighbor in G.neighbors(u):
            two_hop.update(G.neighbors(neighbor))
        two_hop -= set(G.neighbors(u)) | {u}
        two_hop -= {n for n in two_hop if G.has_edge(u, n)}

        if two_hop:
            neg_v = rng.choice(list(two_hop))
            hard_negs.add((u, neg_v))

    return list(hard_negs)

Feature-engineered supervised model

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score, average_precision_score

def extract_pair_features(G, u, v):
    """Compute features for a node pair."""
    cn = len(list(nx.common_neighbors(G, u, v)))
    u_deg = G.degree(u)
    v_deg = G.degree(v)

    # Jaccard
    union = len(set(G.neighbors(u)) | set(G.neighbors(v)))
    jaccard = cn / union if union > 0 else 0

    # Adamic-Adar
    aa = sum(1.0 / np.log(G.degree(w)) for w in nx.common_neighbors(G, u, v)
             if G.degree(w) > 1)

    # Preferential attachment
    pa = u_deg * v_deg

    # Shortest path (capped for efficiency)
    try:
        sp = nx.shortest_path_length(G, u, v)
    except nx.NetworkXNoPath:
        sp = -1

    # Same community (using Louvain)
    # Pre-compute this outside this function for efficiency

    return [cn, jaccard, aa, pa, u_deg, v_deg, sp, abs(u_deg - v_deg)]

def build_dataset(G, positive_edges, negative_edges):
    X, y = [], []
    for u, v in positive_edges:
        X.append(extract_pair_features(G, u, v))
        y.append(1)
    for u, v in negative_edges:
        X.append(extract_pair_features(G, u, v))
        y.append(0)
    return np.array(X), np.array(y)

# Build train and test datasets
neg_train = sample_negatives(G_train, len(G_train.edges()))
neg_test = sample_negatives(G_train, len(test_edges))

X_train, y_train = build_dataset(G_train, list(G_train.edges())[:len(neg_train)], neg_train)
X_test, y_test = build_dataset(G_train, test_edges, neg_test)

# Train
clf = GradientBoostingClassifier(n_estimators=200, max_depth=5, random_state=42)
clf.fit(X_train, y_train)

# Evaluate
y_prob = clf.predict_proba(X_test)[:, 1]
print(f"AUC-ROC: {roc_auc_score(y_test, y_prob):.4f}")
print(f"Average Precision: {average_precision_score(y_test, y_prob):.4f}")

# Feature importance
feature_names = ["common_neighbors", "jaccard", "adamic_adar", "pref_attach",
                 "deg_u", "deg_v", "shortest_path", "deg_diff"]
for name, imp in sorted(zip(feature_names, clf.feature_importances_),
                        key=lambda x: -x[1]):
    print(f"  {name}: {imp:.4f}")

Using PyTorch Geometric for end-to-end learned link prediction:

import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.utils import negative_sampling
from sklearn.metrics import roc_auc_score

class LinkPredGNN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)

    def encode(self, x, edge_index):
        x = F.relu(self.conv1(x, edge_index))
        x = F.dropout(x, p=0.3, training=self.training)
        return self.conv2(x, edge_index)

    def decode(self, z, edge_label_index):
        src, dst = edge_label_index
        return (z[src] * z[dst]).sum(dim=-1)

    def forward(self, x, edge_index, edge_label_index):
        z = self.encode(x, edge_index)
        return self.decode(z, edge_label_index)

def train_link_pred(model, data, optimizer):
    model.train()
    optimizer.zero_grad()

    # Sample negative edges
    neg_edge_index = negative_sampling(
        edge_index=data.edge_index,
        num_nodes=data.num_nodes,
        num_neg_samples=data.edge_index.size(1),
    )

    edge_label_index = torch.cat([data.edge_index, neg_edge_index], dim=-1)
    edge_label = torch.cat([
        torch.ones(data.edge_index.size(1)),
        torch.zeros(neg_edge_index.size(1)),
    ])

    out = model(data.x, data.edge_index, edge_label_index)
    loss = F.binary_cross_entropy_with_logits(out, edge_label)
    loss.backward()
    optimizer.step()
    return loss.item()

Evaluation at scale

Hits@K implementation

def hits_at_k(scores, labels, k=10):
    """Fraction of true positives in top-K predictions."""
    ranked_indices = np.argsort(-scores)
    top_k_labels = labels[ranked_indices[:k]]
    return top_k_labels.sum() / min(k, labels.sum())

def mean_reciprocal_rank(scores, labels):
    """Average of 1/rank for each true positive."""
    ranked_indices = np.argsort(-scores)
    ranked_labels = labels[ranked_indices]
    reciprocals = []
    for i, label in enumerate(ranked_labels):
        if label == 1:
            reciprocals.append(1.0 / (i + 1))
    return np.mean(reciprocals) if reciprocals else 0.0

Standard k-fold doesn’t work — edges are dependent. Use repeated random splits:

def repeated_evaluation(G, n_repeats=10):
    """Evaluate with multiple random train/test splits."""
    aucs = []
    for seed in range(n_repeats):
        G_train, test_edges = temporal_split(G, test_fraction=0.2, seed=seed)
        neg_test = sample_negatives(G_train, len(test_edges), seed=seed)

        # Compute Adamic-Adar scores on train graph
        scores_pos = [sum(1/np.log(G_train.degree(w))
                         for w in nx.common_neighbors(G_train, u, v)
                         if G_train.degree(w) > 1)
                      for u, v in test_edges]
        scores_neg = [sum(1/np.log(G_train.degree(w))
                         for w in nx.common_neighbors(G_train, u, v)
                         if G_train.degree(w) > 1)
                      for u, v in neg_test]

        labels = [1]*len(scores_pos) + [0]*len(scores_neg)
        scores = scores_pos + scores_neg
        aucs.append(roc_auc_score(labels, scores))

    return np.mean(aucs), np.std(aucs)

Production considerations

Candidate generation

Scoring all O(n²) pairs is infeasible for large graphs. Use candidate generation:

def generate_candidates(G, max_distance=3, max_candidates_per_node=100):
    """Generate candidate pairs within k hops."""
    candidates = set()
    for node in G.nodes():
        # BFS up to max_distance
        visited = {node}
        frontier = {node}
        for _ in range(max_distance):
            next_frontier = set()
            for n in frontier:
                next_frontier.update(G.neighbors(n))
            next_frontier -= visited
            for candidate in next_frontier:
                if not G.has_edge(node, candidate):
                    candidates.add((min(node, candidate), max(node, candidate)))
            visited.update(next_frontier)
            frontier = next_frontier
    return list(candidates)

Caching and incremental updates

Pre-compute features that are expensive to calculate:

from functools import lru_cache

class LinkPredictorService:
    def __init__(self, G):
        self.G = G
        self._precompute_communities()

    def _precompute_communities(self):
        from networkx.algorithms.community import louvain_communities
        communities = louvain_communities(self.G, seed=42)
        self.node_community = {}
        for i, comm in enumerate(communities):
            for node in comm:
                self.node_community[node] = i

    def predict_top_k(self, node, k=10):
        candidates = self._get_candidates(node)
        scored = [(v, self._score(node, v)) for v in candidates]
        scored.sort(key=lambda x: -x[1])
        return scored[:k]

    def _score(self, u, v):
        cn = len(list(nx.common_neighbors(self.G, u, v)))
        same_comm = 1 if self.node_community.get(u) == self.node_community.get(v) else 0
        aa = sum(1/np.log(self.G.degree(w))
                 for w in nx.common_neighbors(self.G, u, v)
                 if self.G.degree(w) > 1)
        return aa + 0.5 * same_comm

Method comparison on common benchmarks

MethodCora AUCCiteSeer AUCPPI AUC
Common Neighbors0.810.780.72
Adamic-Adar0.850.820.76
Node2Vec + Dot0.890.860.81
GCN Link Pred0.910.890.85
SEAL (subgraph GNN)0.930.910.88

GNN-based methods consistently outperform heuristics, but the gap narrows on dense networks where common neighbors already capture most of the signal.

One thing to remember: Link prediction is only as good as its evaluation. Always use temporal splits (train on the past, test on the future), never random splits. A model that performs well on random splits may perform terribly in production because it learned from data leakage.

pythonmachine-learninggraph-theory

See Also

  • Python Community Detection How Python finds hidden groups in networks — friend circles, customer segments, and research clusters — just by looking at who connects to whom.
  • Python Graph Embeddings How Python turns tangled webs of connections into neat lists of numbers that computers can actually work with.
  • Python Graph Neural Networks How Python teaches computers to learn from connections — not just data points — by combining neural networks with graph structures.
  • Python Networkx Graph Analysis How Python maps connections between things — friends, roads, websites — and finds hidden patterns in those connections.
  • Activation Functions Why neural networks need these tiny mathematical functions — and how ReLU's simplicity accidentally made deep learning possible.