Multivariate Time Series in Python — Deep Dive

VAR model estimation details

A VAR(p) model for K variables can be written:

Yₜ = c + A₁Yₜ₋₁ + A₂Yₜ₋₂ + … + AₚYₜ₋ₚ + εₜ

Where Yₜ is a K×1 vector, each Aᵢ is a K×K coefficient matrix, and εₜ ~ N(0, Σ). The total number of parameters is K²p + K (coefficients + intercepts), which grows quadratically with the number of variables.

Stability condition

For the VAR to be stationary, all eigenvalues of the companion matrix must lie inside the unit circle:

fitted = model.fit(maxlags=4)

# Check stability
print(f"Roots: {fitted.roots}")
is_stable = all(abs(r) > 1 for r in fitted.roots)
# Note: statsmodels reports inverse roots, so > 1 means stable
print(f"Model is stable: {is_stable}")

# Alternative
print(f"Is stable: {fitted.is_stable()}")

An unstable VAR produces explosive forecasts.

Residual diagnostics

from statsmodels.stats.stattools import durbin_watson

# Autocorrelation in residuals (per variable)
residuals = fitted.resid
dw = durbin_watson(residuals)
for i, col in enumerate(data.columns):
    print(f"{col}: Durbin-Watson = {dw[i]:.4f}")
    # Values near 2 = no autocorrelation

# Portmanteau test for multivariate residual autocorrelation
from statsmodels.tsa.vector_ar.var_model import VARResults

lb_test = fitted.test_whiteness(nlags=20, signif=0.05, adjusted=True)
print(f"Whiteness test p-value: {lb_test.pvalue:.4f}")

# Normality test
norm_test = fitted.test_normality()
print(f"Normality test p-value: {norm_test.pvalue:.4f}")

If residual tests fail, consider adding more lags, including exogenous variables, or switching to a different model class (VARMAX, VECM).

Structural VAR (SVAR) identification

Reduced-form VAR tells you correlations between shocks but not causation. Structural VAR imposes economic theory to identify causal relationships.

Short-run restrictions (Cholesky decomposition)

The simplest identification scheme assumes a recursive causal ordering — the first variable is not contemporaneously affected by any other, the second is only affected by the first, etc.:

# Order matters: first variable is most exogenous
data_ordered = data[["interest_rate", "gdp_growth", "inflation"]]

model = VAR(data_ordered)
fitted = model.fit(maxlags=4)

# Orthogonalized IRF uses Cholesky decomposition
irf = fitted.irf(periods=20)
irf.plot_cum_effects()  # cumulative impulse responses

Blanchard-Quah long-run restrictions

For identifying permanent vs transitory shocks:

from statsmodels.tsa.vector_ar.svar_model import SVAR

# A-model: contemporaneous restrictions
A = np.array([
    [1, 0, 0],
    ["E", 1, 0],     # "E" = estimate this parameter
    ["E", "E", 1],
])

svar_model = SVAR(data, svar_type="A", A=A)
svar_fitted = svar_model.fit(maxlags=4)

# Structural IRF
svar_irf = svar_fitted.irf(periods=20)

VECM: when variables are cointegrated

Johansen cointegration test

from statsmodels.tsa.vector_ar.vecm import coint_johansen

def find_cointegration_rank(data, max_rank=None, k_ar_diff=2):
    """Determine cointegration rank using Johansen trace test."""
    if max_rank is None:
        max_rank = data.shape[1]
    
    result = coint_johansen(data, det_order=0, k_ar_diff=k_ar_diff)
    
    ranks = []
    for i in range(max_rank):
        trace_stat = result.lr1[i]
        critical_5 = result.cvt[i, 1]  # 5% critical value
        rejected = trace_stat > critical_5
        ranks.append({
            "rank": i,
            "trace_stat": trace_stat,
            "critical_5%": critical_5,
            "reject_null": rejected,
        })
    
    # Cointegration rank = number of rejected nulls
    coint_rank = sum(1 for r in ranks if r["reject_null"])
    return coint_rank, ranks

VECM estimation and forecasting

from statsmodels.tsa.vector_ar.vecm import VECM

coint_rank, _ = find_cointegration_rank(data, k_ar_diff=2)

vecm = VECM(data, k_ar_diff=2, coint_rank=coint_rank, deterministic="ci")
vecm_fitted = vecm.fit()

# Cointegrating vectors
print("Cointegrating vectors (β):")
print(vecm_fitted.beta)

# Adjustment speeds (α)
print("Adjustment coefficients (α):")
print(vecm_fitted.alpha)

# Forecast
forecast = vecm_fitted.predict(steps=12)

The α coefficients reveal how quickly each variable adjusts to deviations from the long-run equilibrium. Large α means fast correction; small α means slow drift.

VARMAX: adding exogenous variables

When external factors influence the system but are not influenced by it:

from statsmodels.tsa.statespace.varmax import VARMAX

# endogenous: variables that interact
# exogenous: external drivers
model = VARMAX(
    endog=data[["sales", "inventory"]],
    exog=data[["temperature", "holiday_flag"]],
    order=(2, 0),  # VAR(2) — no MA terms
    trend="c",
)
fitted = model.fit(disp=False, maxiter=200)

# Forecast requires future exogenous values
future_exog = pd.DataFrame({
    "temperature": temp_forecast,
    "holiday_flag": holiday_calendar,
})
forecast = fitted.forecast(steps=len(future_exog), exog=future_exog)

Dynamic Factor Models

When many variables share common underlying drivers, dynamic factor models reduce dimensionality:

from statsmodels.tsa.statespace.dynamic_factor import DynamicFactor

# Extract 2 common factors from 10 variables
model = DynamicFactor(
    data,           # 10 columns
    k_factors=2,    # number of latent factors
    factor_order=2, # AR order for factor dynamics
)
fitted = model.fit(disp=False, maxiter=500)

# Factors
factors = fitted.factors.filtered
print(f"Factor 1 explained variance: {factors.iloc[:, 0].var():.4f}")

# Factor loadings — how each variable relates to factors
print(fitted.coefficients_of_estimation)

This is the multivariate time series analog of PCA, with the added benefit that factors evolve dynamically.

Multivariate forecasting pipeline

import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_absolute_error

class MultivariateForecaster:
    """End-to-end multivariate forecasting pipeline."""
    
    def __init__(self, max_lags=12):
        self.max_lags = max_lags
        self.diff_orders = {}
        self.model = None
        self.fitted = None
    
    def _make_stationary(self, data):
        """Difference each column to achieve stationarity."""
        stationary = data.copy()
        for col in data.columns:
            d = 0
            series = data[col]
            while adfuller(series.dropna())[1] > 0.05 and d < 2:
                series = series.diff().dropna()
                d += 1
                stationary[col] = data[col].diff(d)
            self.diff_orders[col] = d
        return stationary.dropna()
    
    def fit(self, data):
        """Fit VAR model with automatic differencing and lag selection."""
        stationary = self._make_stationary(data)
        
        self.model = VAR(stationary)
        lag_order = self.model.select_order(maxlags=self.max_lags)
        optimal_lag = lag_order.bic
        
        self.fitted = self.model.fit(optimal_lag)
        self.last_values = data.tail(max(self.diff_orders.values()) + optimal_lag)
        
        return self
    
    def forecast(self, steps):
        """Produce forecasts and invert differencing."""
        raw_forecast = self.fitted.forecast(
            self.fitted.endog[-self.fitted.k_ar:], steps=steps
        )
        
        forecast_df = pd.DataFrame(
            raw_forecast,
            columns=self.fitted.endog_names,
        )
        
        # Invert differencing (approximate — for exact, track cumulative sums)
        for col in forecast_df.columns:
            d = self.diff_orders.get(col, 0)
            if d > 0:
                last_level = self.last_values[col].iloc[-1]
                forecast_df[col] = forecast_df[col].cumsum() + last_level
        
        return forecast_df
    
    def diagnostics(self):
        """Run model diagnostics."""
        return {
            "is_stable": self.fitted.is_stable(),
            "whiteness_pvalue": self.fitted.test_whiteness(nlags=20).pvalue,
            "normality_pvalue": self.fitted.test_normality().pvalue,
            "lag_order": self.fitted.k_ar,
            "diff_orders": self.diff_orders,
        }

Dimensionality challenges

The curse of dimensionality hits multivariate time series hard. A VAR(4) with 10 variables has 10×10×4 + 10 = 410 parameters. Solutions:

  1. Variable selection — use Granger causality to prune irrelevant variables before fitting.
  2. Regularized VAR — apply Lasso or Ridge penalties to shrink small coefficients:
from sklearn.linear_model import LassoCV
import numpy as np

def sparse_var(data, max_lag=4):
    """Fit VAR with Lasso regularization for variable selection."""
    cols = data.columns
    results = {}
    
    for target in cols:
        # Build lagged features
        X_parts = []
        for lag in range(1, max_lag + 1):
            lagged = data.shift(lag).add_suffix(f"_lag{lag}")
            X_parts.append(lagged)
        
        X = pd.concat(X_parts, axis=1).dropna()
        y = data[target].loc[X.index]
        
        lasso = LassoCV(cv=5, max_iter=5000)
        lasso.fit(X, y)
        
        nonzero = np.sum(lasso.coef_ != 0)
        results[target] = {
            "n_features": len(lasso.coef_),
            "n_selected": nonzero,
            "alpha": lasso.alpha_,
            "r2": lasso.score(X, y),
        }
    
    return results
  1. Factor models — reduce K variables to k << K factors, then model the factors with a small VAR.

The one thing to remember: Multivariate time series modeling is fundamentally about understanding and exploiting the dynamic interactions between variables — but the quadratic growth in parameters demands careful variable selection, cointegration analysis, and diagnostic validation to avoid models that overfit in-sample and fail spectacularly out-of-sample.

pythontime-seriesmultivariateforecasting

See Also