Time Series Cross-Validation in Python — Deep Dive

The information leakage taxonomy

Understanding the different types of leakage is critical for designing valid evaluation:

1. Direct temporal leakage

Using future values as features. Caught by basic train/test splits.

2. Feature computation leakage

Computing rolling statistics (mean, std) using the entire dataset instead of only past values:

# WRONG — uses future values in the rolling window
df["rolling_mean"] = df["value"].rolling(30).mean()

# CORRECT — compute only on training data, then extend
def safe_rolling_features(train, test, window=30):
    combined = pd.concat([train, test])
    features = combined.rolling(window, min_periods=1).mean()
    # Only use features where the window doesn't extend into test
    return features

3. Target encoding leakage

Using target statistics (group means) computed from the full dataset. Each fold must recompute these from training data only.

4. Model selection leakage

Choosing hyperparameters using the test set, then reporting test performance. Requires nested cross-validation.

Nested time series cross-validation

The outer loop evaluates model performance. The inner loop selects hyperparameters:

from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error
import numpy as np

def nested_time_series_cv(X, y, model_class, param_grid, 
                           outer_splits=5, inner_splits=3):
    """Nested CV for unbiased model evaluation with hyperparameter selection."""
    outer_cv = TimeSeriesSplit(n_splits=outer_splits)
    outer_scores = []
    
    for fold, (train_idx, test_idx) in enumerate(outer_cv.split(X)):
        X_train_outer = X.iloc[train_idx]
        y_train_outer = y.iloc[train_idx]
        X_test = X.iloc[test_idx]
        y_test = y.iloc[test_idx]
        
        # Inner loop: hyperparameter selection
        inner_cv = TimeSeriesSplit(n_splits=inner_splits)
        best_score = np.inf
        best_params = None
        
        for params in _param_combinations(param_grid):
            inner_scores = []
            for inner_train, inner_val in inner_cv.split(X_train_outer):
                model = model_class(**params)
                model.fit(X_train_outer.iloc[inner_train], 
                         y_train_outer.iloc[inner_train])
                preds = model.predict(X_train_outer.iloc[inner_val])
                score = mean_absolute_error(
                    y_train_outer.iloc[inner_val], preds
                )
                inner_scores.append(score)
            
            avg_score = np.mean(inner_scores)
            if avg_score < best_score:
                best_score = avg_score
                best_params = params
        
        # Outer evaluation with best hyperparameters
        model = model_class(**best_params)
        model.fit(X_train_outer, y_train_outer)
        preds = model.predict(X_test)
        outer_score = mean_absolute_error(y_test, preds)
        outer_scores.append(outer_score)
        
        print(f"Fold {fold}: MAE={outer_score:.4f}, params={best_params}")
    
    return {
        "mean_mae": np.mean(outer_scores),
        "std_mae": np.std(outer_scores),
        "fold_scores": outer_scores,
    }

def _param_combinations(grid):
    """Generate all parameter combinations from a grid."""
    import itertools
    keys = grid.keys()
    values = grid.values()
    for combo in itertools.product(*values):
        yield dict(zip(keys, combo))

Combinatorial Purged Cross-Validation (CPCV)

Developed by Marcos López de Prado for financial applications, CPCV generates more test paths than standard k-fold while respecting temporal ordering:

def purged_kfold_cv(X, y, n_splits=5, embargo_pct=0.01):
    """Purged k-fold CV with embargo for time series.
    
    Purging: removes training observations whose labels overlap 
    with test labels in time.
    Embargo: adds a buffer after the test set to prevent leakage 
    from slow-decaying autocorrelation.
    """
    n = len(X)
    embargo_size = int(n * embargo_pct)
    fold_size = n // n_splits
    
    for i in range(n_splits):
        test_start = i * fold_size
        test_end = min((i + 1) * fold_size, n)
        
        # Embargo: exclude observations right after the test set
        embargo_end = min(test_end + embargo_size, n)
        
        # Training indices: everything except test + embargo
        train_idx = list(range(0, test_start)) + list(range(embargo_end, n))
        test_idx = list(range(test_start, test_end))
        
        if len(train_idx) == 0 or len(test_idx) == 0:
            continue
            
        yield train_idx, test_idx

The purging removes training observations that could leak information into the test period (important when labels span multiple time steps). The embargo adds a gap after the test period to handle autocorrelated features.

Walk-forward optimization

The standard approach for production trading systems and automated forecasting:

class WalkForwardOptimizer:
    """Walk-forward analysis with periodic re-optimization."""
    
    def __init__(self, model_fn, param_grid, train_window, test_window,
                 reoptimize_every=None):
        self.model_fn = model_fn
        self.param_grid = param_grid
        self.train_window = train_window
        self.test_window = test_window
        self.reoptimize_every = reoptimize_every or test_window
    
    def run(self, data, target_col):
        """Execute walk-forward analysis."""
        results = []
        n = len(data)
        best_params = None
        steps_since_optimization = self.reoptimize_every
        
        start = self.train_window
        while start + self.test_window <= n:
            train = data.iloc[start - self.train_window:start]
            test = data.iloc[start:start + self.test_window]
            
            # Re-optimize if needed
            if steps_since_optimization >= self.reoptimize_every:
                best_params = self._optimize(train, target_col)
                steps_since_optimization = 0
            
            # Fit and predict
            model = self.model_fn(**best_params)
            model.fit(train.drop(columns=[target_col]), train[target_col])
            preds = model.predict(test.drop(columns=[target_col]))
            
            fold_result = {
                "start": data.index[start],
                "end": data.index[min(start + self.test_window - 1, n - 1)],
                "predictions": preds,
                "actuals": test[target_col].values,
                "mae": np.mean(np.abs(test[target_col].values - preds)),
                "params": best_params,
            }
            results.append(fold_result)
            
            start += self.test_window
            steps_since_optimization += self.test_window
        
        return results
    
    def _optimize(self, train_data, target_col):
        """Find best params on training data using inner CV."""
        inner_cv = TimeSeriesSplit(n_splits=3)
        X = train_data.drop(columns=[target_col])
        y = train_data[target_col]
        
        best_score = np.inf
        best_params = None
        
        for params in _param_combinations(self.param_grid):
            scores = []
            for train_idx, val_idx in inner_cv.split(X):
                model = self.model_fn(**params)
                model.fit(X.iloc[train_idx], y.iloc[train_idx])
                preds = model.predict(X.iloc[val_idx])
                scores.append(np.mean(np.abs(y.iloc[val_idx] - preds)))
            
            avg = np.mean(scores)
            if avg < best_score:
                best_score = avg
                best_params = params
        
        return best_params

Forecast evaluation with prediction intervals

Point forecast accuracy is not enough. Evaluate prediction intervals too:

def evaluate_prediction_intervals(actuals, lower, upper, nominal_coverage=0.95):
    """Evaluate calibration and sharpness of prediction intervals."""
    n = len(actuals)
    
    # Coverage: what fraction of actuals fall within the interval?
    covered = np.sum((actuals >= lower) & (actuals <= upper))
    empirical_coverage = covered / n
    
    # Sharpness: how wide are the intervals?
    widths = upper - lower
    mean_width = np.mean(widths)
    
    # Winkler score: penalizes both miscoverage and wide intervals
    alpha = 1 - nominal_coverage
    winkler_scores = widths.copy()
    below = actuals < lower
    above = actuals > upper
    winkler_scores[below] += (2 / alpha) * (lower[below] - actuals[below])
    winkler_scores[above] += (2 / alpha) * (actuals[above] - upper[above])
    
    return {
        "empirical_coverage": empirical_coverage,
        "nominal_coverage": nominal_coverage,
        "coverage_gap": empirical_coverage - nominal_coverage,
        "mean_interval_width": mean_width,
        "mean_winkler_score": np.mean(winkler_scores),
        "is_well_calibrated": abs(empirical_coverage - nominal_coverage) < 0.05,
    }

Backtesting framework for production

class ProductionBacktester:
    """Full backtesting framework with multiple models and metrics."""
    
    def __init__(self, models, metrics, cv_strategy):
        self.models = models  # dict of name -> model_fn
        self.metrics = metrics  # dict of name -> metric_fn
        self.cv_strategy = cv_strategy
    
    def run(self, X, y):
        """Run backtest across all models and folds."""
        results = []
        
        for fold_idx, (train_idx, test_idx) in enumerate(
            self.cv_strategy.split(X)
        ):
            X_train, y_train = X.iloc[train_idx], y.iloc[train_idx]
            X_test, y_test = X.iloc[test_idx], y.iloc[test_idx]
            
            for model_name, model_fn in self.models.items():
                model = model_fn()
                
                try:
                    model.fit(X_train, y_train)
                    predictions = model.predict(X_test)
                    
                    fold_metrics = {"fold": fold_idx, "model": model_name}
                    for metric_name, metric_fn in self.metrics.items():
                        fold_metrics[metric_name] = metric_fn(
                            y_test.values, predictions
                        )
                    
                    results.append(fold_metrics)
                    
                except Exception as e:
                    results.append({
                        "fold": fold_idx,
                        "model": model_name,
                        "error": str(e),
                    })
        
        return pd.DataFrame(results)
    
    def summary(self, results_df):
        """Aggregate results by model."""
        metric_cols = [c for c in results_df.columns 
                      if c not in ["fold", "model", "error"]]
        
        summary = results_df.groupby("model")[metric_cols].agg(["mean", "std"])
        return summary

Statistical comparison of models

After cross-validation, use the Diebold-Mariano test to determine if one model is significantly better than another:

from scipy import stats

def diebold_mariano_test(errors_1, errors_2, horizon=1):
    """Test if model 1 has significantly different accuracy from model 2."""
    d = errors_1 ** 2 - errors_2 ** 2  # loss differential
    
    mean_d = np.mean(d)
    
    # Newey-West variance estimator for autocorrelated loss differentials
    n = len(d)
    gamma_0 = np.var(d)
    gamma_sum = 0
    for k in range(1, horizon):
        gamma_k = np.cov(d[k:], d[:-k])[0, 1]
        gamma_sum += gamma_k
    
    var_d = (gamma_0 + 2 * gamma_sum) / n
    
    if var_d <= 0:
        return {"dm_stat": np.inf, "p_value": 0.0}
    
    dm_stat = mean_d / np.sqrt(var_d)
    p_value = 2 * stats.norm.sf(abs(dm_stat))
    
    return {
        "dm_stat": dm_stat,
        "p_value": p_value,
        "model_1_better": mean_d < 0,
        "significant": p_value < 0.05,
    }

The one thing to remember: Production-grade time series evaluation requires more than basic expanding-window splits — it demands purging and embargo for feature leakage, nested CV for unbiased hyperparameter selection, prediction interval calibration, and statistical tests to confirm that apparent performance differences are real and not just noise.

pythontime-seriescross-validationmodel-evaluation

See Also