Educational Data Mining in Python — Deep Dive
Educational data mining combines domain-specific feature engineering with standard machine learning to extract actionable insights from educational data. This guide implements the core EDM techniques in Python.
Data Loading and Preprocessing
The Open University Learning Analytics Dataset (OULAD) is the standard benchmark:
import pandas as pd
import numpy as np
from pathlib import Path
class OULADLoader:
"""Load and preprocess the Open University Learning Analytics Dataset."""
def __init__(self, data_dir: str):
self.dir = Path(data_dir)
def load(self) -> dict[str, pd.DataFrame]:
tables = {}
for name in ["studentInfo", "studentRegistration", "studentAssessment",
"studentVle", "assessments", "vle", "courses"]:
path = self.dir / f"{name}.csv"
if path.exists():
tables[name] = pd.read_csv(path)
return tables
def build_features(self, tables: dict) -> pd.DataFrame:
"""Build student-level feature matrix from raw tables."""
info = tables["studentInfo"].copy()
# Encode outcome
info["dropout"] = (info["final_result"].isin(["Withdrawn", "Fail"])).astype(int)
# VLE engagement features
vle = tables["studentVle"]
vle_features = vle.groupby(["id_student", "code_module", "code_presentation"]).agg(
total_clicks=("sum_click", "sum"),
active_days=("date", "nunique"),
unique_activities=("id_site", "nunique"),
first_activity_day=("date", "min"),
last_activity_day=("date", "max"),
).reset_index()
# Assessment features
assess = tables["studentAssessment"].merge(
tables["assessments"], on="id_assessment"
)
assess_features = assess.groupby(
["id_student", "code_module", "code_presentation"]
).agg(
avg_score=("score", "mean"),
score_std=("score", "std"),
assessments_submitted=("score", "count"),
avg_submission_lag=("date_submitted", lambda x: x.mean()),
).reset_index()
assess_features["score_std"] = assess_features["score_std"].fillna(0)
# Merge features
features = info.merge(vle_features, on=["id_student", "code_module", "code_presentation"],
how="left")
features = features.merge(assess_features, on=["id_student", "code_module", "code_presentation"],
how="left")
# Fill missing engagement features with 0
fill_cols = ["total_clicks", "active_days", "unique_activities",
"avg_score", "assessments_submitted"]
features[fill_cols] = features[fill_cols].fillna(0)
return features
Dropout Prediction with Temporal Features
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import roc_auc_score, classification_report
from sklearn.preprocessing import LabelEncoder
class DropoutPredictor:
def __init__(self):
self.model = GradientBoostingClassifier(
n_estimators=300, max_depth=5,
learning_rate=0.05, subsample=0.8,
min_samples_leaf=50,
)
self.feature_cols = None
self.encoders = {}
def prepare_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""Prepare feature matrix with encoding and derived features."""
features = df.copy()
# Encode categorical variables
cat_cols = ["gender", "region", "highest_education", "imd_band",
"age_band", "disability"]
for col in cat_cols:
if col in features.columns:
if col not in self.encoders:
self.encoders[col] = LabelEncoder()
features[col] = self.encoders[col].fit_transform(
features[col].fillna("unknown").astype(str))
else:
features[col] = self.encoders[col].transform(
features[col].fillna("unknown").astype(str))
# Derived features
features["clicks_per_day"] = features["total_clicks"] / features["active_days"].clip(lower=1)
features["engagement_span"] = features["last_activity_day"] - features["first_activity_day"]
features["activity_ratio"] = features["active_days"] / features["engagement_span"].clip(lower=1)
# Credits as numeric
if "studied_credits" in features.columns:
features["studied_credits"] = pd.to_numeric(features["studied_credits"], errors="coerce").fillna(60)
self.feature_cols = [c for c in features.columns
if c not in {"id_student", "code_module", "code_presentation",
"final_result", "dropout", "num_of_prev_attempts"}
and features[c].dtype in ["int64", "float64"]]
return features
def train_evaluate(self, features: pd.DataFrame) -> dict:
"""Train with temporal split — past presentations train, recent test."""
features = self.prepare_features(features)
X = features[self.feature_cols].fillna(0)
y = features["dropout"]
# Group by presentation for temporal splitting
groups = features["code_presentation"]
splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_idx, test_idx in splitter.split(X, y, groups):
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
self.model.fit(X_train, y_train)
y_pred_proba = self.model.predict_proba(X_test)[:, 1]
y_pred = (y_pred_proba >= 0.5).astype(int)
return {
"auc_roc": round(roc_auc_score(y_test, y_pred_proba), 4),
"report": classification_report(y_test, y_pred, output_dict=True),
"feature_importance": self._feature_importance(),
}
def _feature_importance(self) -> list[dict]:
importances = self.model.feature_importances_
return sorted(
[{"feature": f, "importance": round(float(i), 4)}
for f, i in zip(self.feature_cols, importances)],
key=lambda x: -x["importance"]
)[:15]
Learning Behavior Clustering
Discover student archetypes from engagement patterns:
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
class LearnerClustering:
def __init__(self, n_clusters: int = 4):
self.n_clusters = n_clusters
self.scaler = StandardScaler()
self.model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
def cluster_students(self, features: pd.DataFrame,
behavior_cols: list[str] = None) -> pd.DataFrame:
"""Cluster students by learning behavior patterns."""
if behavior_cols is None:
behavior_cols = ["total_clicks", "active_days", "clicks_per_day",
"unique_activities", "avg_score", "assessments_submitted",
"activity_ratio"]
available = [c for c in behavior_cols if c in features.columns]
X = features[available].fillna(0)
X_scaled = self.scaler.fit_transform(X)
# Find optimal k using silhouette score
best_k = self.n_clusters
best_score = -1
for k in range(2, 7):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_scaled)
score = silhouette_score(X_scaled, labels, sample_size=min(5000, len(X)))
if score > best_score:
best_score = score
best_k = k
self.model = KMeans(n_clusters=best_k, random_state=42, n_init=10)
features = features.copy()
features["cluster"] = self.model.fit_predict(X_scaled)
return features
def describe_clusters(self, features: pd.DataFrame) -> list[dict]:
"""Generate human-readable cluster descriptions."""
descriptions = []
behavior_cols = ["total_clicks", "active_days", "clicks_per_day",
"avg_score", "assessments_submitted", "activity_ratio"]
available = [c for c in behavior_cols if c in features.columns]
global_means = features[available].mean()
for cluster_id in sorted(features["cluster"].unique()):
cluster_data = features[features["cluster"] == cluster_id]
cluster_means = cluster_data[available].mean()
# Compare to global means
above = [col for col in available
if cluster_means[col] > global_means[col] * 1.2]
below = [col for col in available
if cluster_means[col] < global_means[col] * 0.8]
dropout_rate = cluster_data["dropout"].mean() if "dropout" in cluster_data else None
descriptions.append({
"cluster_id": int(cluster_id),
"size": len(cluster_data),
"pct_of_total": round(len(cluster_data) / len(features) * 100, 1),
"above_average": above,
"below_average": below,
"dropout_rate": round(dropout_rate, 3) if dropout_rate is not None else None,
"avg_metrics": {col: round(float(cluster_means[col]), 2) for col in available},
})
return descriptions
Sequential Pattern Mining
Discover common learning action sequences:
from collections import Counter, defaultdict
class SequentialPatternMiner:
def __init__(self, min_support: float = 0.05):
self.min_support = min_support
def mine_patterns(self, event_sequences: list[list[str]],
max_length: int = 4) -> list[dict]:
"""
Find frequent subsequences in student action sequences.
event_sequences: list of student sessions, each a list of action types
"""
n_students = len(event_sequences)
min_count = int(n_students * self.min_support)
# Count n-grams of different lengths
patterns = []
for length in range(2, max_length + 1):
ngram_counts = Counter()
ngram_students = defaultdict(set)
for student_idx, seq in enumerate(event_sequences):
seen = set()
for i in range(len(seq) - length + 1):
ngram = tuple(seq[i:i + length])
if ngram not in seen:
ngram_counts[ngram] += 1
ngram_students[ngram].add(student_idx)
seen.add(ngram)
for ngram, count in ngram_counts.items():
if count >= min_count:
patterns.append({
"pattern": list(ngram),
"support": round(count / n_students, 4),
"count": count,
"length": length,
})
return sorted(patterns, key=lambda p: -p["support"])
def correlate_with_outcome(self, event_sequences: list[list[str]],
outcomes: list[int],
patterns: list[dict]) -> list[dict]:
"""Find patterns that correlate with success or failure."""
enriched = []
for pattern_info in patterns:
pattern = tuple(pattern_info["pattern"])
success_count = 0
failure_count = 0
for seq, outcome in zip(event_sequences, outcomes):
# Check if pattern appears in sequence
for i in range(len(seq) - len(pattern) + 1):
if tuple(seq[i:i + len(pattern)]) == pattern:
if outcome == 0:
success_count += 1
else:
failure_count += 1
break
total = success_count + failure_count
if total > 0:
success_rate = success_count / total
enriched.append({
**pattern_info,
"success_rate": round(success_rate, 3),
"success_count": success_count,
"failure_count": failure_count,
"lift": round(success_rate / max(1 - np.mean(outcomes), 0.01), 3),
})
return sorted(enriched, key=lambda p: -abs(p.get("lift", 1) - 1))
Fairness-Aware Modeling
Ensure predictions do not discriminate against protected groups:
from sklearn.metrics import roc_auc_score
class FairnessAuditor:
def __init__(self, protected_attribute: str):
self.protected = protected_attribute
def audit(self, df: pd.DataFrame, predictions: np.ndarray,
actuals: np.ndarray) -> dict:
"""Audit predictions for fairness across protected groups."""
groups = df[self.protected].unique()
results = {"overall_auc": round(roc_auc_score(actuals, predictions), 4)}
group_metrics = {}
for group in groups:
mask = df[self.protected] == group
if mask.sum() < 30:
continue
g_pred = predictions[mask]
g_actual = actuals[mask]
group_metrics[str(group)] = {
"size": int(mask.sum()),
"base_rate": round(float(g_actual.mean()), 4),
"mean_prediction": round(float(g_pred.mean()), 4),
"auc": round(roc_auc_score(g_actual, g_pred), 4) if len(set(g_actual)) > 1 else None,
"flag_rate": round(float((g_pred >= 0.5).mean()), 4),
}
results["groups"] = group_metrics
# Check demographic parity
flag_rates = [m["flag_rate"] for m in group_metrics.values()]
if flag_rates:
results["demographic_parity_ratio"] = round(
min(flag_rates) / max(max(flag_rates), 0.001), 3
)
results["fair"] = results["demographic_parity_ratio"] >= 0.8
return results
def mitigate_threshold(self, df: pd.DataFrame, predictions: np.ndarray,
target_parity: float = 0.9) -> dict[str, float]:
"""Find group-specific thresholds that achieve demographic parity."""
groups = df[self.protected].unique()
thresholds = {}
# Find threshold per group that achieves similar flag rates
target_rate = float((predictions >= 0.5).mean())
for group in groups:
mask = df[self.protected] == group
g_pred = np.sort(predictions[mask])[::-1]
n_flag = int(len(g_pred) * target_rate)
thresholds[str(group)] = float(g_pred[min(n_flag, len(g_pred) - 1)])
return thresholds
Production Pipeline
class EDMPipeline:
"""End-to-end educational data mining pipeline."""
def __init__(self, data_dir: str):
self.loader = OULADLoader(data_dir)
self.predictor = DropoutPredictor()
self.clusterer = LearnerClustering()
self.pattern_miner = SequentialPatternMiner()
def run_analysis(self) -> dict:
"""Execute full EDM analysis pipeline."""
# Load and prepare data
tables = self.loader.load()
features = self.loader.build_features(tables)
results = {}
# Dropout prediction
pred_results = self.predictor.train_evaluate(features)
results["prediction"] = {
"auc": pred_results["auc_roc"],
"top_features": pred_results["feature_importance"][:5],
}
# Behavior clustering
clustered = self.clusterer.cluster_students(features)
results["clusters"] = self.clusterer.describe_clusters(clustered)
# Fairness audit
auditor = FairnessAuditor("gender")
prepared = self.predictor.prepare_features(features)
X = prepared[self.predictor.feature_cols].fillna(0)
preds = self.predictor.model.predict_proba(X)[:, 1]
results["fairness"] = auditor.audit(features, preds, features["dropout"].values)
return results
The one thing to remember: Educational data mining in Python combines standard ML techniques (classification, clustering, sequence mining) with education-specific considerations — temporal validation, nested data structures, and fairness constraints — to produce insights that are not just statistically valid but genuinely useful for improving student outcomes.
See Also
- Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
- Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
- Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
- Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
- Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.