Anomaly Detection with Python — Deep Dive
Isolation Forest internals
Understanding how Isolation Forest works under the hood enables better tuning and custom extensions.
Each tree randomly selects a feature and a split value between the feature’s min and max. Anomalies sit in sparse regions and are isolated in fewer splits (shorter path length). The anomaly score is derived from the average path length across all trees, normalized by the expected path length for a dataset of that size:
import numpy as np
def expected_path_length(n: int) -> float:
"""Expected path length in an isolation tree for n samples."""
if n <= 1:
return 0
return 2 * (np.log(n - 1) + 0.5772156649) - 2 * (n - 1) / n
def anomaly_score(avg_path_length: float, n_samples: int) -> float:
"""Score in [0, 1]; higher means more anomalous."""
c = expected_path_length(n_samples)
return 2 ** (-avg_path_length / c)
Scores close to 1 indicate anomalies; scores near 0.5 indicate normal points. This normalization makes scores comparable across different dataset sizes.
Extended Isolation Forest
Standard Isolation Forest uses axis-aligned splits, which creates artifacts in high-dimensional spaces. The Extended Isolation Forest uses random hyperplane splits:
# Using the eif library
from eif import iForest
model = iForest(
data.values,
ntrees=200,
sample_size=256,
ExtensionLevel=data.shape[1] - 1, # full extension
)
scores = model.compute_paths(data.values)
Extended IF produces more reliable scores for correlated features and complex manifold structures.
Streaming anomaly detection
Batch algorithms assume all data is available upfront. Real systems need to detect anomalies in streaming data while adapting to concept drift.
Half-Space Trees for streaming
import numpy as np
from collections import deque
class HalfSpaceTree:
"""Lightweight streaming anomaly detector."""
def __init__(self, n_features: int, max_depth: int = 8, window_size: int = 1000):
self.n_features = n_features
self.max_depth = max_depth
self.window_size = window_size
self.reference_window = deque(maxlen=window_size)
self.trees = self._build_trees(n_trees=25)
def _build_trees(self, n_trees):
"""Build random half-space partitions."""
trees = []
for _ in range(n_trees):
splits = []
for depth in range(self.max_depth):
feature = np.random.randint(self.n_features)
# Split points will be set when data arrives
splits.append({"feature": feature, "value": None, "left_mass": 0, "right_mass": 0})
trees.append(splits)
return trees
def update(self, point: np.ndarray):
"""Update model with a new streaming point."""
self.reference_window.append(point)
# Update split values and mass profiles periodically
def score(self, point: np.ndarray) -> float:
"""Score a point; lower score = more anomalous."""
total = 0
for tree in self.trees:
for split in tree:
if split["value"] is not None:
if point[split["feature"]] < split["value"]:
total += split["left_mass"]
else:
total += split["right_mass"]
return total / (len(self.trees) * self.max_depth)
ADWIN for drift detection
ADWIN (Adaptive Windowing) detects distributional changes in a data stream, signaling when the model should retrain:
class ADWIN:
"""Simplified ADWIN drift detector."""
def __init__(self, delta: float = 0.002):
self.delta = delta
self.window = []
self.total = 0.0
self.variance = 0.0
def update(self, value: float) -> bool:
"""Add value; returns True if drift detected."""
self.window.append(value)
self.total += value
if len(self.window) < 10:
return False
# Check if any split point shows significant difference
for i in range(1, len(self.window)):
left = self.window[:i]
right = self.window[i:]
mean_left = sum(left) / len(left)
mean_right = sum(right) / len(right)
n = len(self.window)
epsilon = np.sqrt(np.log(2 / self.delta) / (2 * min(len(left), len(right))))
if abs(mean_left - mean_right) > epsilon:
self.window = right # drop old data
return True
return False
Ensemble anomaly scoring
Combining multiple detectors improves robustness — different algorithms catch different types of anomalies:
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler
class AnomalyEnsemble:
def __init__(self, contamination: float = 0.02):
self.detectors = {
"isolation_forest": IsolationForest(contamination=contamination, random_state=42),
"lof": LocalOutlierFactor(n_neighbors=20, contamination=contamination, novelty=True),
}
self.scaler = MinMaxScaler()
def fit(self, X: np.ndarray):
for detector in self.detectors.values():
detector.fit(X)
return self
def score(self, X: np.ndarray) -> np.ndarray:
"""Combined anomaly score; higher = more anomalous."""
scores = {}
for name, detector in self.detectors.items():
raw = -detector.decision_function(X) # negate so higher = more anomalous
scores[name] = raw
# Normalize each detector's scores to [0, 1]
score_matrix = np.column_stack(list(scores.values()))
normalized = self.scaler.fit_transform(score_matrix)
# Average across detectors
return normalized.mean(axis=1)
def predict(self, X: np.ndarray, threshold: float = 0.7) -> np.ndarray:
scores = self.score(X)
return (scores > threshold).astype(int)
Time series anomaly detection
For sequential data, context matters. A value of 100 is normal during business hours but anomalous at 3 AM.
Seasonal-Trend decomposition with anomaly detection
import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import STL
def stl_anomaly_detection(
series: pd.Series,
period: int = 24,
threshold_sigma: float = 3.0,
) -> pd.DataFrame:
"""Detect anomalies in time series using STL decomposition."""
stl = STL(series, period=period, robust=True)
result = stl.fit()
residuals = result.resid
median = residuals.median()
mad = np.median(np.abs(residuals - median))
modified_z = 0.6745 * (residuals - median) / mad
return pd.DataFrame({
"value": series,
"trend": result.trend,
"seasonal": result.seasonal,
"residual": residuals,
"z_score": modified_z,
"is_anomaly": np.abs(modified_z) > threshold_sigma,
})
LSTM-based sequence anomaly detection
import torch
import torch.nn as nn
class LSTMAnomalyDetector(nn.Module):
def __init__(self, n_features: int, hidden_size: int = 64, n_layers: int = 2):
super().__init__()
self.lstm = nn.LSTM(n_features, hidden_size, n_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, n_features)
def forward(self, x):
# Predict next step from sequence
lstm_out, _ = self.lstm(x)
return self.fc(lstm_out[:, -1, :])
def anomaly_score(self, x, next_actual):
"""Reconstruction error as anomaly score."""
predicted = self.forward(x)
return torch.mean((predicted - next_actual) ** 2, dim=1)
Train on normal sequences, then flag time steps where prediction error exceeds a threshold. The threshold is typically set at the 95th or 99th percentile of training errors.
Evaluation without labels
The fundamental challenge: how do you measure performance when you do not know which points are actually anomalous?
Internal metrics
- Silhouette score on anomaly clusters: do flagged anomalies form coherent groups?
- Score distribution analysis: a good detector produces a bimodal score distribution (clear separation between normal and anomalous).
- Stability: run the detector multiple times with different random seeds. Consistent anomaly flags indicate reliability.
Semi-supervised evaluation
When you have a few labeled anomalies:
def evaluate_detector(scores: np.ndarray, labels: np.ndarray) -> dict:
"""Evaluate with partial labels using ranking metrics."""
from sklearn.metrics import roc_auc_score, average_precision_score
return {
"auc_roc": roc_auc_score(labels, scores),
"average_precision": average_precision_score(labels, scores),
"precision_at_k": precision_at_k(scores, labels, k=sum(labels)),
}
def precision_at_k(scores, labels, k):
"""Precision among the top-k scored points."""
top_k_idx = np.argsort(scores)[-k:]
return labels[top_k_idx].mean()
Average Precision is generally more informative than AUC-ROC for anomaly detection because it focuses on the ranking quality at the top of the score list, where decisions are made.
Production deployment architecture
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class AnomalyAlert:
timestamp: datetime
score: float
features: dict
detector: str
context: str
class AnomalyPipeline:
def __init__(self, ensemble: AnomalyEnsemble, threshold: float = 0.7):
self.ensemble = ensemble
self.threshold = threshold
self.alert_buffer = []
self.suppression_window_sec = 300 # suppress duplicate alerts
self.last_alert_time = {}
def process(self, point: np.ndarray, metadata: dict) -> AnomalyAlert | None:
score = float(self.ensemble.score(point.reshape(1, -1))[0])
if score < self.threshold:
return None
# Suppress repeated alerts for the same entity
entity = metadata.get("entity_id", "default")
now = datetime.now()
if entity in self.last_alert_time:
elapsed = (now - self.last_alert_time[entity]).total_seconds()
if elapsed < self.suppression_window_sec:
return None
self.last_alert_time[entity] = now
alert = AnomalyAlert(
timestamp=now,
score=score,
features=metadata,
detector="ensemble",
context=f"Score {score:.3f} exceeds threshold {self.threshold}",
)
self.alert_buffer.append(alert)
return alert
Key production concerns:
- Alert fatigue: use suppression windows and escalation tiers (warning, critical, emergency).
- Feedback loops: when analysts mark alerts as true/false positives, use that feedback to adjust thresholds.
- Feature monitoring: track feature distributions over time to detect data quality issues before they trigger false anomalies.
- Model versioning: track which model version produced each alert for auditability.
The one thing to remember: Production anomaly detection is a system, not an algorithm — it requires streaming capability, ensemble scoring for coverage, drift detection for adaptation, and careful alert management to remain useful without overwhelming the humans who act on it.
See Also
- Anomaly Detection How AI spots the one thing that doesn't belong — the technique behind credit card fraud detection, medical diagnosis, and industrial quality control.
- Activation Functions Why neural networks need these tiny mathematical functions — and how ReLU's simplicity accidentally made deep learning possible.
- Ai Agents Architecture How AI systems go from answering questions to actually doing things — the design patterns that turn language models into autonomous agents that browse, code, and plan.
- Ai Agents ChatGPT answers questions. AI agents actually do things — browse the web, write code, send emails, and keep going until the job is done. Here's the difference.
- Ai Ethics Why building AI fairly is harder than it sounds — bias, accountability, privacy, and who gets to decide what AI is allowed to do.