Educational Data Mining in Python — Deep Dive

Educational data mining combines domain-specific feature engineering with standard machine learning to extract actionable insights from educational data. This guide implements the core EDM techniques in Python.

Data Loading and Preprocessing

The Open University Learning Analytics Dataset (OULAD) is the standard benchmark:

import pandas as pd
import numpy as np
from pathlib import Path

class OULADLoader:
    """Load and preprocess the Open University Learning Analytics Dataset."""

    def __init__(self, data_dir: str):
        self.dir = Path(data_dir)

    def load(self) -> dict[str, pd.DataFrame]:
        tables = {}
        for name in ["studentInfo", "studentRegistration", "studentAssessment",
                      "studentVle", "assessments", "vle", "courses"]:
            path = self.dir / f"{name}.csv"
            if path.exists():
                tables[name] = pd.read_csv(path)
        return tables

    def build_features(self, tables: dict) -> pd.DataFrame:
        """Build student-level feature matrix from raw tables."""
        info = tables["studentInfo"].copy()

        # Encode outcome
        info["dropout"] = (info["final_result"].isin(["Withdrawn", "Fail"])).astype(int)

        # VLE engagement features
        vle = tables["studentVle"]
        vle_features = vle.groupby(["id_student", "code_module", "code_presentation"]).agg(
            total_clicks=("sum_click", "sum"),
            active_days=("date", "nunique"),
            unique_activities=("id_site", "nunique"),
            first_activity_day=("date", "min"),
            last_activity_day=("date", "max"),
        ).reset_index()

        # Assessment features
        assess = tables["studentAssessment"].merge(
            tables["assessments"], on="id_assessment"
        )
        assess_features = assess.groupby(
            ["id_student", "code_module", "code_presentation"]
        ).agg(
            avg_score=("score", "mean"),
            score_std=("score", "std"),
            assessments_submitted=("score", "count"),
            avg_submission_lag=("date_submitted", lambda x: x.mean()),
        ).reset_index()
        assess_features["score_std"] = assess_features["score_std"].fillna(0)

        # Merge features
        features = info.merge(vle_features, on=["id_student", "code_module", "code_presentation"],
                              how="left")
        features = features.merge(assess_features, on=["id_student", "code_module", "code_presentation"],
                                  how="left")

        # Fill missing engagement features with 0
        fill_cols = ["total_clicks", "active_days", "unique_activities",
                     "avg_score", "assessments_submitted"]
        features[fill_cols] = features[fill_cols].fillna(0)

        return features

Dropout Prediction with Temporal Features

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GroupShuffleSplit
from sklearn.metrics import roc_auc_score, classification_report
from sklearn.preprocessing import LabelEncoder

class DropoutPredictor:
    def __init__(self):
        self.model = GradientBoostingClassifier(
            n_estimators=300, max_depth=5,
            learning_rate=0.05, subsample=0.8,
            min_samples_leaf=50,
        )
        self.feature_cols = None
        self.encoders = {}

    def prepare_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Prepare feature matrix with encoding and derived features."""
        features = df.copy()

        # Encode categorical variables
        cat_cols = ["gender", "region", "highest_education", "imd_band",
                    "age_band", "disability"]
        for col in cat_cols:
            if col in features.columns:
                if col not in self.encoders:
                    self.encoders[col] = LabelEncoder()
                    features[col] = self.encoders[col].fit_transform(
                        features[col].fillna("unknown").astype(str))
                else:
                    features[col] = self.encoders[col].transform(
                        features[col].fillna("unknown").astype(str))

        # Derived features
        features["clicks_per_day"] = features["total_clicks"] / features["active_days"].clip(lower=1)
        features["engagement_span"] = features["last_activity_day"] - features["first_activity_day"]
        features["activity_ratio"] = features["active_days"] / features["engagement_span"].clip(lower=1)

        # Credits as numeric
        if "studied_credits" in features.columns:
            features["studied_credits"] = pd.to_numeric(features["studied_credits"], errors="coerce").fillna(60)

        self.feature_cols = [c for c in features.columns
                            if c not in {"id_student", "code_module", "code_presentation",
                                        "final_result", "dropout", "num_of_prev_attempts"}
                            and features[c].dtype in ["int64", "float64"]]

        return features

    def train_evaluate(self, features: pd.DataFrame) -> dict:
        """Train with temporal split — past presentations train, recent test."""
        features = self.prepare_features(features)
        X = features[self.feature_cols].fillna(0)
        y = features["dropout"]

        # Group by presentation for temporal splitting
        groups = features["code_presentation"]
        splitter = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

        for train_idx, test_idx in splitter.split(X, y, groups):
            X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

        self.model.fit(X_train, y_train)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        y_pred = (y_pred_proba >= 0.5).astype(int)

        return {
            "auc_roc": round(roc_auc_score(y_test, y_pred_proba), 4),
            "report": classification_report(y_test, y_pred, output_dict=True),
            "feature_importance": self._feature_importance(),
        }

    def _feature_importance(self) -> list[dict]:
        importances = self.model.feature_importances_
        return sorted(
            [{"feature": f, "importance": round(float(i), 4)}
             for f, i in zip(self.feature_cols, importances)],
            key=lambda x: -x["importance"]
        )[:15]

Learning Behavior Clustering

Discover student archetypes from engagement patterns:

from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

class LearnerClustering:
    def __init__(self, n_clusters: int = 4):
        self.n_clusters = n_clusters
        self.scaler = StandardScaler()
        self.model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)

    def cluster_students(self, features: pd.DataFrame,
                         behavior_cols: list[str] = None) -> pd.DataFrame:
        """Cluster students by learning behavior patterns."""
        if behavior_cols is None:
            behavior_cols = ["total_clicks", "active_days", "clicks_per_day",
                           "unique_activities", "avg_score", "assessments_submitted",
                           "activity_ratio"]

        available = [c for c in behavior_cols if c in features.columns]
        X = features[available].fillna(0)
        X_scaled = self.scaler.fit_transform(X)

        # Find optimal k using silhouette score
        best_k = self.n_clusters
        best_score = -1
        for k in range(2, 7):
            km = KMeans(n_clusters=k, random_state=42, n_init=10)
            labels = km.fit_predict(X_scaled)
            score = silhouette_score(X_scaled, labels, sample_size=min(5000, len(X)))
            if score > best_score:
                best_score = score
                best_k = k

        self.model = KMeans(n_clusters=best_k, random_state=42, n_init=10)
        features = features.copy()
        features["cluster"] = self.model.fit_predict(X_scaled)

        return features

    def describe_clusters(self, features: pd.DataFrame) -> list[dict]:
        """Generate human-readable cluster descriptions."""
        descriptions = []
        behavior_cols = ["total_clicks", "active_days", "clicks_per_day",
                        "avg_score", "assessments_submitted", "activity_ratio"]
        available = [c for c in behavior_cols if c in features.columns]

        global_means = features[available].mean()

        for cluster_id in sorted(features["cluster"].unique()):
            cluster_data = features[features["cluster"] == cluster_id]
            cluster_means = cluster_data[available].mean()

            # Compare to global means
            above = [col for col in available
                    if cluster_means[col] > global_means[col] * 1.2]
            below = [col for col in available
                    if cluster_means[col] < global_means[col] * 0.8]

            dropout_rate = cluster_data["dropout"].mean() if "dropout" in cluster_data else None

            descriptions.append({
                "cluster_id": int(cluster_id),
                "size": len(cluster_data),
                "pct_of_total": round(len(cluster_data) / len(features) * 100, 1),
                "above_average": above,
                "below_average": below,
                "dropout_rate": round(dropout_rate, 3) if dropout_rate is not None else None,
                "avg_metrics": {col: round(float(cluster_means[col]), 2) for col in available},
            })

        return descriptions

Sequential Pattern Mining

Discover common learning action sequences:

from collections import Counter, defaultdict

class SequentialPatternMiner:
    def __init__(self, min_support: float = 0.05):
        self.min_support = min_support

    def mine_patterns(self, event_sequences: list[list[str]],
                      max_length: int = 4) -> list[dict]:
        """
        Find frequent subsequences in student action sequences.
        event_sequences: list of student sessions, each a list of action types
        """
        n_students = len(event_sequences)
        min_count = int(n_students * self.min_support)

        # Count n-grams of different lengths
        patterns = []
        for length in range(2, max_length + 1):
            ngram_counts = Counter()
            ngram_students = defaultdict(set)

            for student_idx, seq in enumerate(event_sequences):
                seen = set()
                for i in range(len(seq) - length + 1):
                    ngram = tuple(seq[i:i + length])
                    if ngram not in seen:
                        ngram_counts[ngram] += 1
                        ngram_students[ngram].add(student_idx)
                        seen.add(ngram)

            for ngram, count in ngram_counts.items():
                if count >= min_count:
                    patterns.append({
                        "pattern": list(ngram),
                        "support": round(count / n_students, 4),
                        "count": count,
                        "length": length,
                    })

        return sorted(patterns, key=lambda p: -p["support"])

    def correlate_with_outcome(self, event_sequences: list[list[str]],
                                outcomes: list[int],
                                patterns: list[dict]) -> list[dict]:
        """Find patterns that correlate with success or failure."""
        enriched = []
        for pattern_info in patterns:
            pattern = tuple(pattern_info["pattern"])

            success_count = 0
            failure_count = 0
            for seq, outcome in zip(event_sequences, outcomes):
                # Check if pattern appears in sequence
                for i in range(len(seq) - len(pattern) + 1):
                    if tuple(seq[i:i + len(pattern)]) == pattern:
                        if outcome == 0:
                            success_count += 1
                        else:
                            failure_count += 1
                        break

            total = success_count + failure_count
            if total > 0:
                success_rate = success_count / total
                enriched.append({
                    **pattern_info,
                    "success_rate": round(success_rate, 3),
                    "success_count": success_count,
                    "failure_count": failure_count,
                    "lift": round(success_rate / max(1 - np.mean(outcomes), 0.01), 3),
                })

        return sorted(enriched, key=lambda p: -abs(p.get("lift", 1) - 1))

Fairness-Aware Modeling

Ensure predictions do not discriminate against protected groups:

from sklearn.metrics import roc_auc_score

class FairnessAuditor:
    def __init__(self, protected_attribute: str):
        self.protected = protected_attribute

    def audit(self, df: pd.DataFrame, predictions: np.ndarray,
              actuals: np.ndarray) -> dict:
        """Audit predictions for fairness across protected groups."""
        groups = df[self.protected].unique()
        results = {"overall_auc": round(roc_auc_score(actuals, predictions), 4)}
        group_metrics = {}

        for group in groups:
            mask = df[self.protected] == group
            if mask.sum() < 30:
                continue

            g_pred = predictions[mask]
            g_actual = actuals[mask]

            group_metrics[str(group)] = {
                "size": int(mask.sum()),
                "base_rate": round(float(g_actual.mean()), 4),
                "mean_prediction": round(float(g_pred.mean()), 4),
                "auc": round(roc_auc_score(g_actual, g_pred), 4) if len(set(g_actual)) > 1 else None,
                "flag_rate": round(float((g_pred >= 0.5).mean()), 4),
            }

        results["groups"] = group_metrics

        # Check demographic parity
        flag_rates = [m["flag_rate"] for m in group_metrics.values()]
        if flag_rates:
            results["demographic_parity_ratio"] = round(
                min(flag_rates) / max(max(flag_rates), 0.001), 3
            )
            results["fair"] = results["demographic_parity_ratio"] >= 0.8

        return results

    def mitigate_threshold(self, df: pd.DataFrame, predictions: np.ndarray,
                            target_parity: float = 0.9) -> dict[str, float]:
        """Find group-specific thresholds that achieve demographic parity."""
        groups = df[self.protected].unique()
        thresholds = {}

        # Find threshold per group that achieves similar flag rates
        target_rate = float((predictions >= 0.5).mean())

        for group in groups:
            mask = df[self.protected] == group
            g_pred = np.sort(predictions[mask])[::-1]
            n_flag = int(len(g_pred) * target_rate)
            thresholds[str(group)] = float(g_pred[min(n_flag, len(g_pred) - 1)])

        return thresholds

Production Pipeline

class EDMPipeline:
    """End-to-end educational data mining pipeline."""

    def __init__(self, data_dir: str):
        self.loader = OULADLoader(data_dir)
        self.predictor = DropoutPredictor()
        self.clusterer = LearnerClustering()
        self.pattern_miner = SequentialPatternMiner()

    def run_analysis(self) -> dict:
        """Execute full EDM analysis pipeline."""
        # Load and prepare data
        tables = self.loader.load()
        features = self.loader.build_features(tables)

        results = {}

        # Dropout prediction
        pred_results = self.predictor.train_evaluate(features)
        results["prediction"] = {
            "auc": pred_results["auc_roc"],
            "top_features": pred_results["feature_importance"][:5],
        }

        # Behavior clustering
        clustered = self.clusterer.cluster_students(features)
        results["clusters"] = self.clusterer.describe_clusters(clustered)

        # Fairness audit
        auditor = FairnessAuditor("gender")
        prepared = self.predictor.prepare_features(features)
        X = prepared[self.predictor.feature_cols].fillna(0)
        preds = self.predictor.model.predict_proba(X)[:, 1]
        results["fairness"] = auditor.audit(features, preds, features["dropout"].values)

        return results

The one thing to remember: Educational data mining in Python combines standard ML techniques (classification, clustering, sequence mining) with education-specific considerations — temporal validation, nested data structures, and fairness constraints — to produce insights that are not just statistically valid but genuinely useful for improving student outcomes.

pythoneducational-data-miningeducation-technologydata-science

See Also

  • Python Adaptive Learning Systems How Python builds learning apps that adjust to each student like a personal tutor who knows exactly what you need next.
  • Python Airflow Learn Airflow as a timetable manager that makes sure data tasks run in the right order every day.
  • Python Altair Learn Altair through the idea of drawing charts by describing rules, not by hand-placing every visual element.
  • Python Automated Grading How Python grades homework and exams automatically, from simple answer keys to understanding written essays.
  • Python Batch Vs Stream Processing Batch processing is like doing laundry once a week; stream processing is like a self-cleaning shirt that cleans itself constantly.