Autocorrelation Analysis in Python — Deep Dive
Mathematical foundations
The autocorrelation at lag k is defined as:
ρ(k) = γ(k) / γ(0)
Where γ(k) = Cov(yₜ, yₜ₋ₖ) is the autocovariance at lag k and γ(0) = Var(yₜ). The sample autocorrelation:
r(k) = Σ(yₜ − ȳ)(yₜ₋ₖ − ȳ) / Σ(yₜ − ȳ)²
Under the null of white noise, r(k) is approximately N(0, 1/n) for large n, which is why the significance bands are ±1.96/√n.
Bartlett’s formula for non-white-noise series
When the true process is MA(q), the variance of r(k) for k > q is not simply 1/n. Bartlett’s formula gives:
Var(r(k)) ≈ (1/n)(1 + 2Σρ(i)²) for k > q
This means significance bands should be wider at higher lags when earlier lags are significant. Statsmodels accounts for this when you set bartlett_confint=True:
from statsmodels.tsa.stattools import acf
acf_vals, confint, qstat, pvalues = acf(
series.dropna(),
nlags=40,
alpha=0.05,
fft=True, # faster computation using FFT
bartlett_confint=True,
qstat=True, # also compute Ljung-Box statistics
)
Ljung-Box portmanteau test
Rather than eyeballing individual lags, the Ljung-Box test checks whether a group of autocorrelations are jointly significant:
Q = n(n+2) Σ r(k)² / (n−k)
Under H₀ (no autocorrelation up to lag m), Q follows χ²(m−p−q) where p and q are the fitted ARIMA orders.
from statsmodels.stats.diagnostic import acorr_ljungbox
# Test residuals from a fitted model
lb_results = acorr_ljungbox(
residuals.dropna(),
lags=range(1, 21),
model_df=3, # subtract df for ARIMA(1,1,1) = p+q = 2...
return_df=True,
)
print(lb_results[lb_results["lb_pvalue"] < 0.05])
If any group of lags has p < 0.05, the model has not captured all autocorrelation structure — consider adding more AR or MA terms.
Breusch-Godfrey test (alternative)
Better suited for regression residuals because it handles lagged dependent variables correctly:
from statsmodels.stats.diagnostic import acorr_breusch_godfrey
bg_stat, bg_p, _, _ = acorr_breusch_godfrey(fitted_model, nlags=10)
print(f"BG test p-value: {bg_p:.4f}")
Partial autocorrelation computation methods
The PACF can be computed via several algorithms, each with different numerical properties:
from statsmodels.tsa.stattools import pacf
# Yule-Walker (fast, may have numerical issues)
pacf_yw = pacf(series, nlags=30, method="yw")
# Yule-Walker modified (more stable)
pacf_ywm = pacf(series, nlags=30, method="ywm")
# OLS regression (most robust, slowest)
pacf_ols = pacf(series, nlags=30, method="ols")
# Levinson-Durbin (fast recursive algorithm)
pacf_ld = pacf(series, nlags=30, method="ld")
For most applications, ywm is the best default. Use ols when you need confidence intervals or when other methods produce unstable results.
Spectral density — autocorrelation in the frequency domain
The power spectral density (PSD) is the Fourier transform of the autocorrelation function. Peaks in the PSD correspond to dominant periodic components:
from scipy.signal import welch
import numpy as np
freqs, psd = welch(series.dropna().values, fs=1.0, nperseg=256)
# Convert frequencies to periods
periods = 1.0 / freqs[1:] # skip DC component
psd_no_dc = psd[1:]
# Find dominant periods
top_k = 5
top_indices = np.argsort(psd_no_dc)[-top_k:][::-1]
for idx in top_indices:
print(f"Period: {periods[idx]:.1f}, Power: {psd_no_dc[idx]:.4f}")
This approach is more reliable than visual ACF inspection for detecting subtle periodicities, especially when multiple seasonal patterns overlap.
Cross-correlation with pre-whitening
Raw cross-correlation between two series can produce spurious results when both series have autocorrelation. Pre-whitening removes the autocorrelation from one series and applies the same filter to the other:
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import ccf
def prewhitened_ccf(x, y, max_lag=30):
"""Compute cross-correlation after pre-whitening x."""
# Fit AR model to x
model = ARIMA(x, order=(5, 0, 0)) # AR(5) for whitening
fitted = model.fit()
# Get residuals (whitened x)
alpha_x = fitted.resid
# Apply same filter to y
ar_params = fitted.arparams
y_filtered = y.copy()
for i in range(len(ar_params)):
y_filtered = y_filtered - ar_params[i] * y.shift(i + 1)
y_filtered = y_filtered.dropna()
alpha_x = alpha_x[len(ar_params):]
# Align
min_len = min(len(alpha_x), len(y_filtered))
return ccf(alpha_x[:min_len], y_filtered[:min_len], adjusted=False)
# Positive lag k → x leads y by k periods
Pre-whitening is essential for transfer function model identification (ARIMAX/dynamic regression).
Automated feature extraction from ACF/PACF
For large-scale time series classification or clustering, extract summary features from the autocorrelation structure:
def acf_features(series, max_lag=50):
"""Extract features from ACF for time series classification."""
clean = series.dropna()
n = len(clean)
if n < max_lag + 10:
max_lag = max(n // 3, 5)
acf_vals = acf(clean, nlags=max_lag, fft=True)
pacf_vals = pacf(clean, nlags=min(max_lag, n // 2 - 1), method="ywm")
bound = 1.96 / np.sqrt(n)
features = {
# First autocorrelation (strength of lag-1 dependency)
"acf1": acf_vals[1],
# Sum of first 10 squared ACF values (overall predictability)
"acf_sum_sq_10": np.sum(acf_vals[1:11] ** 2),
# First significant PACF lag
"first_sig_pacf": next(
(i for i in range(1, len(pacf_vals)) if abs(pacf_vals[i]) > bound), 0
),
# Number of significant ACF lags
"n_sig_acf": sum(1 for v in acf_vals[1:] if abs(v) > bound),
# ACF at seasonal lags (if enough data)
"acf_7": acf_vals[7] if max_lag >= 7 else None,
"acf_12": acf_vals[12] if max_lag >= 12 else None,
# Decay rate (exponential fit to absolute ACF)
"acf_decay_rate": _acf_decay_rate(acf_vals[1:21]),
# Difference between first ACF and second ACF
"acf_diff_1_2": acf_vals[1] - acf_vals[2],
}
return features
def _acf_decay_rate(acf_vals):
"""Fit exponential decay to absolute ACF values."""
abs_acf = np.abs(acf_vals)
abs_acf = abs_acf[abs_acf > 0]
if len(abs_acf) < 3:
return 0.0
try:
log_acf = np.log(abs_acf[:10])
x = np.arange(len(log_acf))
slope = np.polyfit(x, log_acf, 1)[0]
return slope
except (ValueError, np.linalg.LinAlgError):
return 0.0
These features power the tsfeatures approach used in the FFORMS (Feature-based FORecast Model Selection) framework, which selects the best forecasting method for each series based on its statistical characteristics rather than trial-and-error.
Handling non-standard autocorrelation
Long memory processes
Some series have autocorrelation that decays hyperbolically (slowly) rather than exponentially. This indicates long memory and calls for ARFIMA (fractionally integrated ARIMA):
from statsmodels.tsa.stattools import acf
import numpy as np
def detect_long_memory(series, max_lag=100):
"""Check for long memory via ACF decay pattern."""
acf_vals = acf(series.dropna(), nlags=max_lag, fft=True)
abs_acf = np.abs(acf_vals[1:])
lags = np.arange(1, len(abs_acf) + 1)
# Fit log-log regression: if slope > -1, suggests long memory
mask = abs_acf > 0
log_lags = np.log(lags[mask])
log_acf = np.log(abs_acf[mask])
slope = np.polyfit(log_lags, log_acf, 1)[0]
return {
"log_log_slope": slope,
"long_memory": slope > -1.0,
"hurst_approx": (1 + slope) / 2,
}
Heteroscedastic autocorrelation
For financial returns, autocorrelation in squared returns indicates volatility clustering (ARCH effects), even when returns themselves show little autocorrelation:
returns = prices.pct_change().dropna()
# Little autocorrelation in returns
acf_returns = acf(returns, nlags=20)
# Strong autocorrelation in squared returns → volatility clustering
acf_squared = acf(returns ** 2, nlags=20)
# This calls for GARCH modeling, not ARIMA
Performance optimization
For large datasets (millions of observations), FFT-based ACF computation is essential:
from statsmodels.tsa.stattools import acf
# FFT is O(n log n) vs O(n²) for direct computation
acf_fast = acf(large_series, nlags=1000, fft=True)
For streaming data where you need to update ACF incrementally, maintain running statistics:
class IncrementalACF:
"""O(1) per-update ACF estimation for lag 1."""
def __init__(self):
self.n = 0
self.mean = 0.0
self.m2 = 0.0
self.lag1_sum = 0.0
self.prev = None
def update(self, value):
self.n += 1
delta = value - self.mean
self.mean += delta / self.n
self.m2 += delta * (value - self.mean)
if self.prev is not None:
self.lag1_sum += (value - self.mean) * (self.prev - self.mean)
self.prev = value
@property
def acf1(self):
if self.n < 3 or self.m2 == 0:
return 0.0
return self.lag1_sum / self.m2
The one thing to remember: Autocorrelation analysis extends far beyond plotting ACF/PACF — rigorous application involves portmanteau tests for model adequacy, spectral analysis for hidden periodicities, pre-whitening for cross-correlation, and feature extraction for large-scale automated model selection.
See Also
- Python Arima Forecasting How ARIMA models use patterns in past numbers to predict the future, explained like a bedtime story.
- Python Exponential Smoothing How exponential smoothing weighs recent events more heavily to predict what happens next, like trusting fresh memories more than old ones.
- Python Multivariate Time Series Why tracking multiple things at once gives you better predictions than tracking each one alone.
- Python Prophet Forecasting How Facebook's Prophet tool predicts the future by breaking data into easy-to-understand pieces.
- Python Seasonal Decomposition How Python breaks apart time data into trend, seasonal patterns, and leftover noise — like separating ingredients in a smoothie.