Fraud Detection Patterns with Python — Deep Dive
Real-time feature computation
Production fraud systems must score transactions in under 100 milliseconds. The bottleneck is not model inference (typically 1–5ms for gradient boosting) but feature computation, especially velocity and aggregation features that depend on recent history.
In-memory feature store with Redis
import redis
import json
import time
from dataclasses import dataclass
@dataclass
class TransactionFeatures:
amount: float
txn_count_1h: int
txn_count_24h: int
amount_sum_1h: float
amount_sum_24h: float
unique_merchants_24h: int
avg_amount_30d: float
max_amount_30d: float
class FeatureStore:
def __init__(self, redis_client: redis.Redis):
self.r = redis_client
def update_and_compute(self, user_id: str, amount: float, merchant: str) -> TransactionFeatures:
now = time.time()
txn_key = f"txn:{user_id}"
merchant_key = f"merchants:{user_id}"
# Store transaction with timestamp as score
self.r.zadd(txn_key, {f"{now}:{amount}": now})
self.r.zadd(merchant_key, {merchant: now})
# Expire old data (keep 30 days)
cutoff_30d = now - 30 * 86400
self.r.zremrangebyscore(txn_key, 0, cutoff_30d)
self.r.zremrangebyscore(merchant_key, 0, cutoff_30d)
# Compute features from sorted sets
cutoff_1h = now - 3600
cutoff_24h = now - 86400
txns_1h = self.r.zrangebyscore(txn_key, cutoff_1h, now)
txns_24h = self.r.zrangebyscore(txn_key, cutoff_24h, now)
txns_30d = self.r.zrangebyscore(txn_key, cutoff_30d, now)
merchants_24h = self.r.zrangebyscore(merchant_key, cutoff_24h, now)
amounts_1h = [float(t.decode().split(":")[1]) for t in txns_1h]
amounts_24h = [float(t.decode().split(":")[1]) for t in txns_24h]
amounts_30d = [float(t.decode().split(":")[1]) for t in txns_30d]
return TransactionFeatures(
amount=amount,
txn_count_1h=len(amounts_1h),
txn_count_24h=len(amounts_24h),
amount_sum_1h=sum(amounts_1h),
amount_sum_24h=sum(amounts_24h),
unique_merchants_24h=len(set(merchants_24h)),
avg_amount_30d=sum(amounts_30d) / max(len(amounts_30d), 1),
max_amount_30d=max(amounts_30d) if amounts_30d else 0,
)
Redis sorted sets with timestamp scores allow O(log N) range queries for any time window, keeping feature computation under 5ms.
Graph-based fraud detection
Fraudsters operate in networks — stolen cards are tested on the same devices, mules share addresses, and synthetic identities are linked by phone numbers or emails. Graph analysis exposes these connections.
Building the transaction graph
import networkx as nx
import pandas as pd
from collections import defaultdict
class FraudGraph:
def __init__(self):
self.G = nx.Graph()
def add_transaction(self, card_id: str, device_id: str, ip: str, merchant: str):
"""Link entities that appear in the same transaction."""
entities = [
("card", card_id),
("device", device_id),
("ip", ip),
("merchant", merchant),
]
for entity_type, entity_id in entities:
node = f"{entity_type}:{entity_id}"
self.G.add_node(node, type=entity_type)
# Connect entities within the same transaction
for i, (t1, id1) in enumerate(entities):
for t2, id2 in entities[i + 1:]:
n1, n2 = f"{t1}:{id1}", f"{t2}:{id2}"
if self.G.has_edge(n1, n2):
self.G[n1][n2]["weight"] += 1
else:
self.G.add_edge(n1, n2, weight=1)
def card_risk_features(self, card_id: str) -> dict:
"""Extract graph-based risk features for a card."""
node = f"card:{card_id}"
if node not in self.G:
return {"connected_cards": 0, "fraud_neighbor_ratio": 0}
# Cards connected through shared devices or IPs
connected_cards = set()
for neighbor in self.G.neighbors(node):
for second_hop in self.G.neighbors(neighbor):
if second_hop.startswith("card:") and second_hop != node:
connected_cards.add(second_hop)
# Fraud ratio among connected cards
flagged = sum(
1 for c in connected_cards
if self.G.nodes[c].get("flagged", False)
)
return {
"connected_cards": len(connected_cards),
"fraud_neighbor_ratio": flagged / max(len(connected_cards), 1),
"degree_centrality": nx.degree_centrality(self.G).get(node, 0),
}
Graph features often provide the highest lift in fraud detection models because they capture organized fraud rings that transaction-level features miss entirely.
Multi-layer scoring architecture
Production systems use cascading layers for efficiency:
from enum import Enum
from dataclasses import dataclass
class Decision(Enum):
APPROVE = "approve"
REVIEW = "review"
DECLINE = "decline"
@dataclass
class ScoringResult:
decision: Decision
score: float
rules_triggered: list[str]
model_score: float
graph_score: float
latency_ms: float
class FraudScoringPipeline:
def __init__(self, rules_engine, ml_model, graph_scorer, feature_store):
self.rules = rules_engine
self.model = ml_model
self.graph = graph_scorer
self.features = feature_store
def score(self, transaction: dict) -> ScoringResult:
import time
start = time.time()
# Layer 1: Hard rules (fast, deterministic)
rule_result = self.rules.evaluate(transaction)
if rule_result.hard_decline:
return ScoringResult(
decision=Decision.DECLINE,
score=1.0,
rules_triggered=rule_result.triggered,
model_score=0,
graph_score=0,
latency_ms=(time.time() - start) * 1000,
)
# Layer 2: Feature computation + ML model
features = self.features.update_and_compute(
transaction["user_id"],
transaction["amount"],
transaction["merchant"],
)
model_score = self.model.predict_proba(features)
# Layer 3: Graph risk (only for medium+ scores to save latency)
graph_score = 0.0
if model_score > 0.3:
graph_score = self.graph.score(transaction)
# Combine scores
final_score = 0.6 * model_score + 0.3 * graph_score + 0.1 * rule_result.soft_score
if final_score > 0.8:
decision = Decision.DECLINE
elif final_score > 0.5:
decision = Decision.REVIEW
else:
decision = Decision.APPROVE
return ScoringResult(
decision=decision,
score=final_score,
rules_triggered=rule_result.triggered,
model_score=model_score,
graph_score=graph_score,
latency_ms=(time.time() - start) * 1000,
)
The cascading design ensures most legitimate transactions are approved quickly (Layer 1 passes them through), while suspicious ones get progressively deeper analysis.
Handling adversarial concept drift
Fraudsters actively probe detection systems. When a new model is deployed, fraud patterns shift within weeks. Three strategies for resilience:
Champion-challenger framework
class ChampionChallenger:
def __init__(self, champion_model, challenger_models: list, traffic_split: float = 0.05):
self.champion = champion_model
self.challengers = challenger_models
self.traffic_split = traffic_split
def score(self, features, transaction_id: str) -> tuple[float, str]:
"""Score with champion; additionally score with challengers for comparison."""
champion_score = self.champion.predict_proba(features)
# Shadow-score a fraction of traffic with challengers
import hashlib
hash_val = int(hashlib.md5(transaction_id.encode()).hexdigest(), 16)
if (hash_val % 100) < self.traffic_split * 100:
for challenger in self.challengers:
challenger_score = challenger.predict_proba(features)
# Log both scores for offline comparison
self._log_comparison(transaction_id, champion_score, challenger_score)
return champion_score, "champion"
def _log_comparison(self, txn_id, champion_score, challenger_score):
pass # Write to analytics pipeline
Feature drift monitoring
import numpy as np
from scipy.stats import ks_2samp
def detect_feature_drift(
reference: np.ndarray,
current: np.ndarray,
feature_names: list[str],
p_threshold: float = 0.01,
) -> list[dict]:
"""Kolmogorov-Smirnov test for distribution shift per feature."""
drift_report = []
for i, name in enumerate(feature_names):
stat, p_value = ks_2samp(reference[:, i], current[:, i])
if p_value < p_threshold:
drift_report.append({
"feature": name,
"ks_statistic": stat,
"p_value": p_value,
"reference_mean": np.mean(reference[:, i]),
"current_mean": np.mean(current[:, i]),
"drift_magnitude": abs(np.mean(current[:, i]) - np.mean(reference[:, i])),
})
return sorted(drift_report, key=lambda x: x["ks_statistic"], reverse=True)
Adaptive retraining schedule
Instead of fixed retraining intervals, trigger retraining when:
- Feature drift is detected above threshold.
- Model precision drops below a target (based on analyst feedback).
- A new fraud pattern is identified by the investigations team.
Model explainability for compliance
Financial regulators (OCC, FCA, GDPR) require that decisions affecting customers are explainable. SHAP values provide per-prediction explanations:
import shap
def explain_decision(model, features: np.ndarray, feature_names: list[str]) -> dict:
"""Generate SHAP explanation for a single transaction."""
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(features.reshape(1, -1))
# Sort features by importance for this prediction
contributions = sorted(
zip(feature_names, shap_values[0]),
key=lambda x: abs(x[1]),
reverse=True,
)
return {
"top_factors": [
{"feature": name, "contribution": float(val)}
for name, val in contributions[:5]
],
"base_value": float(explainer.expected_value),
"prediction": float(explainer.expected_value + sum(shap_values[0])),
}
Example output: “This transaction was flagged because: (1) amount is 15× user average (+0.35), (2) new device (+0.22), (3) 8 transactions in last hour (+0.18).” This level of transparency satisfies regulatory requirements and helps analysts prioritize their review queue.
Metrics and reporting
Track system health with:
- Detection rate at fixed false-positive rate: “We catch 92% of fraud while only flagging 0.5% of legitimate transactions.”
- Time to detection: median time between fraud occurrence and flag.
- Dollar recovery rate: percentage of fraud dollars prevented.
- Analyst efficiency: true positive rate in the review queue (should be above 20–30%).
- Model decay curve: detection rate over time since last retraining.
The one thing to remember: Production fraud detection is a multi-layered, continuously evolving system — rules for speed, ML for pattern recognition, graphs for network analysis — with adversarial resilience built in through monitoring, drift detection, and rapid retraining cycles.
See Also
- Python Backtesting Trading Strategies Why traders use Python to test their ideas on old data before risking real money, in plain language.
- Python Portfolio Optimization How Python helps you pick the right mix of investments so you get the best return for the risk you are willing to take.
- Python Quantitative Finance How Python helps people use math and data to make smarter money decisions, explained without any jargon.
- Python Risk Analysis Monte Carlo How rolling a virtual dice thousands of times helps investors understand what could go wrong with their money.
- Python Technical Indicators What technical indicators are and how Python calculates them, explained like you have never seen a stock chart.