Anomaly Detection with Python — Deep Dive

Isolation Forest internals

Understanding how Isolation Forest works under the hood enables better tuning and custom extensions.

Each tree randomly selects a feature and a split value between the feature’s min and max. Anomalies sit in sparse regions and are isolated in fewer splits (shorter path length). The anomaly score is derived from the average path length across all trees, normalized by the expected path length for a dataset of that size:

import numpy as np

def expected_path_length(n: int) -> float:
    """Expected path length in an isolation tree for n samples."""
    if n <= 1:
        return 0
    return 2 * (np.log(n - 1) + 0.5772156649) - 2 * (n - 1) / n

def anomaly_score(avg_path_length: float, n_samples: int) -> float:
    """Score in [0, 1]; higher means more anomalous."""
    c = expected_path_length(n_samples)
    return 2 ** (-avg_path_length / c)

Scores close to 1 indicate anomalies; scores near 0.5 indicate normal points. This normalization makes scores comparable across different dataset sizes.

Extended Isolation Forest

Standard Isolation Forest uses axis-aligned splits, which creates artifacts in high-dimensional spaces. The Extended Isolation Forest uses random hyperplane splits:

# Using the eif library
from eif import iForest

model = iForest(
    data.values,
    ntrees=200,
    sample_size=256,
    ExtensionLevel=data.shape[1] - 1,  # full extension
)
scores = model.compute_paths(data.values)

Extended IF produces more reliable scores for correlated features and complex manifold structures.

Streaming anomaly detection

Batch algorithms assume all data is available upfront. Real systems need to detect anomalies in streaming data while adapting to concept drift.

Half-Space Trees for streaming

import numpy as np
from collections import deque

class HalfSpaceTree:
    """Lightweight streaming anomaly detector."""
    
    def __init__(self, n_features: int, max_depth: int = 8, window_size: int = 1000):
        self.n_features = n_features
        self.max_depth = max_depth
        self.window_size = window_size
        self.reference_window = deque(maxlen=window_size)
        self.trees = self._build_trees(n_trees=25)
    
    def _build_trees(self, n_trees):
        """Build random half-space partitions."""
        trees = []
        for _ in range(n_trees):
            splits = []
            for depth in range(self.max_depth):
                feature = np.random.randint(self.n_features)
                # Split points will be set when data arrives
                splits.append({"feature": feature, "value": None, "left_mass": 0, "right_mass": 0})
            trees.append(splits)
        return trees
    
    def update(self, point: np.ndarray):
        """Update model with a new streaming point."""
        self.reference_window.append(point)
        # Update split values and mass profiles periodically
    
    def score(self, point: np.ndarray) -> float:
        """Score a point; lower score = more anomalous."""
        total = 0
        for tree in self.trees:
            for split in tree:
                if split["value"] is not None:
                    if point[split["feature"]] < split["value"]:
                        total += split["left_mass"]
                    else:
                        total += split["right_mass"]
        return total / (len(self.trees) * self.max_depth)

ADWIN for drift detection

ADWIN (Adaptive Windowing) detects distributional changes in a data stream, signaling when the model should retrain:

class ADWIN:
    """Simplified ADWIN drift detector."""
    
    def __init__(self, delta: float = 0.002):
        self.delta = delta
        self.window = []
        self.total = 0.0
        self.variance = 0.0
    
    def update(self, value: float) -> bool:
        """Add value; returns True if drift detected."""
        self.window.append(value)
        self.total += value
        
        if len(self.window) < 10:
            return False
        
        # Check if any split point shows significant difference
        for i in range(1, len(self.window)):
            left = self.window[:i]
            right = self.window[i:]
            
            mean_left = sum(left) / len(left)
            mean_right = sum(right) / len(right)
            
            n = len(self.window)
            epsilon = np.sqrt(np.log(2 / self.delta) / (2 * min(len(left), len(right))))
            
            if abs(mean_left - mean_right) > epsilon:
                self.window = right  # drop old data
                return True
        
        return False

Ensemble anomaly scoring

Combining multiple detectors improves robustness — different algorithms catch different types of anomalies:

import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import MinMaxScaler

class AnomalyEnsemble:
    def __init__(self, contamination: float = 0.02):
        self.detectors = {
            "isolation_forest": IsolationForest(contamination=contamination, random_state=42),
            "lof": LocalOutlierFactor(n_neighbors=20, contamination=contamination, novelty=True),
        }
        self.scaler = MinMaxScaler()
    
    def fit(self, X: np.ndarray):
        for detector in self.detectors.values():
            detector.fit(X)
        return self
    
    def score(self, X: np.ndarray) -> np.ndarray:
        """Combined anomaly score; higher = more anomalous."""
        scores = {}
        
        for name, detector in self.detectors.items():
            raw = -detector.decision_function(X)  # negate so higher = more anomalous
            scores[name] = raw
        
        # Normalize each detector's scores to [0, 1]
        score_matrix = np.column_stack(list(scores.values()))
        normalized = self.scaler.fit_transform(score_matrix)
        
        # Average across detectors
        return normalized.mean(axis=1)
    
    def predict(self, X: np.ndarray, threshold: float = 0.7) -> np.ndarray:
        scores = self.score(X)
        return (scores > threshold).astype(int)

Time series anomaly detection

For sequential data, context matters. A value of 100 is normal during business hours but anomalous at 3 AM.

Seasonal-Trend decomposition with anomaly detection

import pandas as pd
import numpy as np
from statsmodels.tsa.seasonal import STL

def stl_anomaly_detection(
    series: pd.Series,
    period: int = 24,
    threshold_sigma: float = 3.0,
) -> pd.DataFrame:
    """Detect anomalies in time series using STL decomposition."""
    stl = STL(series, period=period, robust=True)
    result = stl.fit()
    
    residuals = result.resid
    median = residuals.median()
    mad = np.median(np.abs(residuals - median))
    modified_z = 0.6745 * (residuals - median) / mad
    
    return pd.DataFrame({
        "value": series,
        "trend": result.trend,
        "seasonal": result.seasonal,
        "residual": residuals,
        "z_score": modified_z,
        "is_anomaly": np.abs(modified_z) > threshold_sigma,
    })

LSTM-based sequence anomaly detection

import torch
import torch.nn as nn

class LSTMAnomalyDetector(nn.Module):
    def __init__(self, n_features: int, hidden_size: int = 64, n_layers: int = 2):
        super().__init__()
        self.lstm = nn.LSTM(n_features, hidden_size, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, n_features)
    
    def forward(self, x):
        # Predict next step from sequence
        lstm_out, _ = self.lstm(x)
        return self.fc(lstm_out[:, -1, :])
    
    def anomaly_score(self, x, next_actual):
        """Reconstruction error as anomaly score."""
        predicted = self.forward(x)
        return torch.mean((predicted - next_actual) ** 2, dim=1)

Train on normal sequences, then flag time steps where prediction error exceeds a threshold. The threshold is typically set at the 95th or 99th percentile of training errors.

Evaluation without labels

The fundamental challenge: how do you measure performance when you do not know which points are actually anomalous?

Internal metrics

  • Silhouette score on anomaly clusters: do flagged anomalies form coherent groups?
  • Score distribution analysis: a good detector produces a bimodal score distribution (clear separation between normal and anomalous).
  • Stability: run the detector multiple times with different random seeds. Consistent anomaly flags indicate reliability.

Semi-supervised evaluation

When you have a few labeled anomalies:

def evaluate_detector(scores: np.ndarray, labels: np.ndarray) -> dict:
    """Evaluate with partial labels using ranking metrics."""
    from sklearn.metrics import roc_auc_score, average_precision_score
    
    return {
        "auc_roc": roc_auc_score(labels, scores),
        "average_precision": average_precision_score(labels, scores),
        "precision_at_k": precision_at_k(scores, labels, k=sum(labels)),
    }

def precision_at_k(scores, labels, k):
    """Precision among the top-k scored points."""
    top_k_idx = np.argsort(scores)[-k:]
    return labels[top_k_idx].mean()

Average Precision is generally more informative than AUC-ROC for anomaly detection because it focuses on the ranking quality at the top of the score list, where decisions are made.

Production deployment architecture

from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class AnomalyAlert:
    timestamp: datetime
    score: float
    features: dict
    detector: str
    context: str

class AnomalyPipeline:
    def __init__(self, ensemble: AnomalyEnsemble, threshold: float = 0.7):
        self.ensemble = ensemble
        self.threshold = threshold
        self.alert_buffer = []
        self.suppression_window_sec = 300  # suppress duplicate alerts
        self.last_alert_time = {}
    
    def process(self, point: np.ndarray, metadata: dict) -> AnomalyAlert | None:
        score = float(self.ensemble.score(point.reshape(1, -1))[0])
        
        if score < self.threshold:
            return None
        
        # Suppress repeated alerts for the same entity
        entity = metadata.get("entity_id", "default")
        now = datetime.now()
        
        if entity in self.last_alert_time:
            elapsed = (now - self.last_alert_time[entity]).total_seconds()
            if elapsed < self.suppression_window_sec:
                return None
        
        self.last_alert_time[entity] = now
        
        alert = AnomalyAlert(
            timestamp=now,
            score=score,
            features=metadata,
            detector="ensemble",
            context=f"Score {score:.3f} exceeds threshold {self.threshold}",
        )
        
        self.alert_buffer.append(alert)
        return alert

Key production concerns:

  • Alert fatigue: use suppression windows and escalation tiers (warning, critical, emergency).
  • Feedback loops: when analysts mark alerts as true/false positives, use that feedback to adjust thresholds.
  • Feature monitoring: track feature distributions over time to detect data quality issues before they trigger false anomalies.
  • Model versioning: track which model version produced each alert for auditability.

The one thing to remember: Production anomaly detection is a system, not an algorithm — it requires streaming capability, ensemble scoring for coverage, drift detection for adaptation, and careful alert management to remain useful without overwhelming the humans who act on it.

pythondata-scienceanomaly-detectionmachine-learning

See Also

  • Anomaly Detection How AI spots the one thing that doesn't belong — the technique behind credit card fraud detection, medical diagnosis, and industrial quality control.
  • Activation Functions Why neural networks need these tiny mathematical functions — and how ReLU's simplicity accidentally made deep learning possible.
  • Ai Agents Architecture How AI systems go from answering questions to actually doing things — the design patterns that turn language models into autonomous agents that browse, code, and plan.
  • Ai Agents ChatGPT answers questions. AI agents actually do things — browse the web, write code, send emails, and keep going until the job is done. Here's the difference.
  • Ai Ethics Why building AI fairly is harder than it sounds — bias, accountability, privacy, and who gets to decide what AI is allowed to do.