Fraud Detection Patterns with Python — Deep Dive

Real-time feature computation

Production fraud systems must score transactions in under 100 milliseconds. The bottleneck is not model inference (typically 1–5ms for gradient boosting) but feature computation, especially velocity and aggregation features that depend on recent history.

In-memory feature store with Redis

import redis
import json
import time
from dataclasses import dataclass

@dataclass
class TransactionFeatures:
    amount: float
    txn_count_1h: int
    txn_count_24h: int
    amount_sum_1h: float
    amount_sum_24h: float
    unique_merchants_24h: int
    avg_amount_30d: float
    max_amount_30d: float

class FeatureStore:
    def __init__(self, redis_client: redis.Redis):
        self.r = redis_client
    
    def update_and_compute(self, user_id: str, amount: float, merchant: str) -> TransactionFeatures:
        now = time.time()
        txn_key = f"txn:{user_id}"
        merchant_key = f"merchants:{user_id}"
        
        # Store transaction with timestamp as score
        self.r.zadd(txn_key, {f"{now}:{amount}": now})
        self.r.zadd(merchant_key, {merchant: now})
        
        # Expire old data (keep 30 days)
        cutoff_30d = now - 30 * 86400
        self.r.zremrangebyscore(txn_key, 0, cutoff_30d)
        self.r.zremrangebyscore(merchant_key, 0, cutoff_30d)
        
        # Compute features from sorted sets
        cutoff_1h = now - 3600
        cutoff_24h = now - 86400
        
        txns_1h = self.r.zrangebyscore(txn_key, cutoff_1h, now)
        txns_24h = self.r.zrangebyscore(txn_key, cutoff_24h, now)
        txns_30d = self.r.zrangebyscore(txn_key, cutoff_30d, now)
        merchants_24h = self.r.zrangebyscore(merchant_key, cutoff_24h, now)
        
        amounts_1h = [float(t.decode().split(":")[1]) for t in txns_1h]
        amounts_24h = [float(t.decode().split(":")[1]) for t in txns_24h]
        amounts_30d = [float(t.decode().split(":")[1]) for t in txns_30d]
        
        return TransactionFeatures(
            amount=amount,
            txn_count_1h=len(amounts_1h),
            txn_count_24h=len(amounts_24h),
            amount_sum_1h=sum(amounts_1h),
            amount_sum_24h=sum(amounts_24h),
            unique_merchants_24h=len(set(merchants_24h)),
            avg_amount_30d=sum(amounts_30d) / max(len(amounts_30d), 1),
            max_amount_30d=max(amounts_30d) if amounts_30d else 0,
        )

Redis sorted sets with timestamp scores allow O(log N) range queries for any time window, keeping feature computation under 5ms.

Graph-based fraud detection

Fraudsters operate in networks — stolen cards are tested on the same devices, mules share addresses, and synthetic identities are linked by phone numbers or emails. Graph analysis exposes these connections.

Building the transaction graph

import networkx as nx
import pandas as pd
from collections import defaultdict

class FraudGraph:
    def __init__(self):
        self.G = nx.Graph()
    
    def add_transaction(self, card_id: str, device_id: str, ip: str, merchant: str):
        """Link entities that appear in the same transaction."""
        entities = [
            ("card", card_id),
            ("device", device_id),
            ("ip", ip),
            ("merchant", merchant),
        ]
        
        for entity_type, entity_id in entities:
            node = f"{entity_type}:{entity_id}"
            self.G.add_node(node, type=entity_type)
        
        # Connect entities within the same transaction
        for i, (t1, id1) in enumerate(entities):
            for t2, id2 in entities[i + 1:]:
                n1, n2 = f"{t1}:{id1}", f"{t2}:{id2}"
                if self.G.has_edge(n1, n2):
                    self.G[n1][n2]["weight"] += 1
                else:
                    self.G.add_edge(n1, n2, weight=1)
    
    def card_risk_features(self, card_id: str) -> dict:
        """Extract graph-based risk features for a card."""
        node = f"card:{card_id}"
        if node not in self.G:
            return {"connected_cards": 0, "fraud_neighbor_ratio": 0}
        
        # Cards connected through shared devices or IPs
        connected_cards = set()
        for neighbor in self.G.neighbors(node):
            for second_hop in self.G.neighbors(neighbor):
                if second_hop.startswith("card:") and second_hop != node:
                    connected_cards.add(second_hop)
        
        # Fraud ratio among connected cards
        flagged = sum(
            1 for c in connected_cards
            if self.G.nodes[c].get("flagged", False)
        )
        
        return {
            "connected_cards": len(connected_cards),
            "fraud_neighbor_ratio": flagged / max(len(connected_cards), 1),
            "degree_centrality": nx.degree_centrality(self.G).get(node, 0),
        }

Graph features often provide the highest lift in fraud detection models because they capture organized fraud rings that transaction-level features miss entirely.

Multi-layer scoring architecture

Production systems use cascading layers for efficiency:

from enum import Enum
from dataclasses import dataclass

class Decision(Enum):
    APPROVE = "approve"
    REVIEW = "review"
    DECLINE = "decline"

@dataclass
class ScoringResult:
    decision: Decision
    score: float
    rules_triggered: list[str]
    model_score: float
    graph_score: float
    latency_ms: float

class FraudScoringPipeline:
    def __init__(self, rules_engine, ml_model, graph_scorer, feature_store):
        self.rules = rules_engine
        self.model = ml_model
        self.graph = graph_scorer
        self.features = feature_store
    
    def score(self, transaction: dict) -> ScoringResult:
        import time
        start = time.time()
        
        # Layer 1: Hard rules (fast, deterministic)
        rule_result = self.rules.evaluate(transaction)
        if rule_result.hard_decline:
            return ScoringResult(
                decision=Decision.DECLINE,
                score=1.0,
                rules_triggered=rule_result.triggered,
                model_score=0,
                graph_score=0,
                latency_ms=(time.time() - start) * 1000,
            )
        
        # Layer 2: Feature computation + ML model
        features = self.features.update_and_compute(
            transaction["user_id"],
            transaction["amount"],
            transaction["merchant"],
        )
        model_score = self.model.predict_proba(features)
        
        # Layer 3: Graph risk (only for medium+ scores to save latency)
        graph_score = 0.0
        if model_score > 0.3:
            graph_score = self.graph.score(transaction)
        
        # Combine scores
        final_score = 0.6 * model_score + 0.3 * graph_score + 0.1 * rule_result.soft_score
        
        if final_score > 0.8:
            decision = Decision.DECLINE
        elif final_score > 0.5:
            decision = Decision.REVIEW
        else:
            decision = Decision.APPROVE
        
        return ScoringResult(
            decision=decision,
            score=final_score,
            rules_triggered=rule_result.triggered,
            model_score=model_score,
            graph_score=graph_score,
            latency_ms=(time.time() - start) * 1000,
        )

The cascading design ensures most legitimate transactions are approved quickly (Layer 1 passes them through), while suspicious ones get progressively deeper analysis.

Handling adversarial concept drift

Fraudsters actively probe detection systems. When a new model is deployed, fraud patterns shift within weeks. Three strategies for resilience:

Champion-challenger framework

class ChampionChallenger:
    def __init__(self, champion_model, challenger_models: list, traffic_split: float = 0.05):
        self.champion = champion_model
        self.challengers = challenger_models
        self.traffic_split = traffic_split
    
    def score(self, features, transaction_id: str) -> tuple[float, str]:
        """Score with champion; additionally score with challengers for comparison."""
        champion_score = self.champion.predict_proba(features)
        
        # Shadow-score a fraction of traffic with challengers
        import hashlib
        hash_val = int(hashlib.md5(transaction_id.encode()).hexdigest(), 16)
        
        if (hash_val % 100) < self.traffic_split * 100:
            for challenger in self.challengers:
                challenger_score = challenger.predict_proba(features)
                # Log both scores for offline comparison
                self._log_comparison(transaction_id, champion_score, challenger_score)
        
        return champion_score, "champion"
    
    def _log_comparison(self, txn_id, champion_score, challenger_score):
        pass  # Write to analytics pipeline

Feature drift monitoring

import numpy as np
from scipy.stats import ks_2samp

def detect_feature_drift(
    reference: np.ndarray,
    current: np.ndarray,
    feature_names: list[str],
    p_threshold: float = 0.01,
) -> list[dict]:
    """Kolmogorov-Smirnov test for distribution shift per feature."""
    drift_report = []
    
    for i, name in enumerate(feature_names):
        stat, p_value = ks_2samp(reference[:, i], current[:, i])
        
        if p_value < p_threshold:
            drift_report.append({
                "feature": name,
                "ks_statistic": stat,
                "p_value": p_value,
                "reference_mean": np.mean(reference[:, i]),
                "current_mean": np.mean(current[:, i]),
                "drift_magnitude": abs(np.mean(current[:, i]) - np.mean(reference[:, i])),
            })
    
    return sorted(drift_report, key=lambda x: x["ks_statistic"], reverse=True)

Adaptive retraining schedule

Instead of fixed retraining intervals, trigger retraining when:

  • Feature drift is detected above threshold.
  • Model precision drops below a target (based on analyst feedback).
  • A new fraud pattern is identified by the investigations team.

Model explainability for compliance

Financial regulators (OCC, FCA, GDPR) require that decisions affecting customers are explainable. SHAP values provide per-prediction explanations:

import shap

def explain_decision(model, features: np.ndarray, feature_names: list[str]) -> dict:
    """Generate SHAP explanation for a single transaction."""
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(features.reshape(1, -1))
    
    # Sort features by importance for this prediction
    contributions = sorted(
        zip(feature_names, shap_values[0]),
        key=lambda x: abs(x[1]),
        reverse=True,
    )
    
    return {
        "top_factors": [
            {"feature": name, "contribution": float(val)}
            for name, val in contributions[:5]
        ],
        "base_value": float(explainer.expected_value),
        "prediction": float(explainer.expected_value + sum(shap_values[0])),
    }

Example output: “This transaction was flagged because: (1) amount is 15× user average (+0.35), (2) new device (+0.22), (3) 8 transactions in last hour (+0.18).” This level of transparency satisfies regulatory requirements and helps analysts prioritize their review queue.

Metrics and reporting

Track system health with:

  • Detection rate at fixed false-positive rate: “We catch 92% of fraud while only flagging 0.5% of legitimate transactions.”
  • Time to detection: median time between fraud occurrence and flag.
  • Dollar recovery rate: percentage of fraud dollars prevented.
  • Analyst efficiency: true positive rate in the review queue (should be above 20–30%).
  • Model decay curve: detection rate over time since last retraining.

The one thing to remember: Production fraud detection is a multi-layered, continuously evolving system — rules for speed, ML for pattern recognition, graphs for network analysis — with adversarial resilience built in through monitoring, drift detection, and rapid retraining cycles.

pythonfinancefraud-detectionmachine-learning

See Also