Python Process Mining — Deep Dive
Event Log Preparation
Real-world event logs are messy. Before mining, you need to clean and structure the data:
import pandas as pd
import pm4py
# Load from CSV (most common source)
df = pd.read_csv("tickets.csv")
# Required columns mapping
df = df.rename(columns={
"ticket_id": "case:concept:name",
"action": "concept:name",
"action_time": "time:timestamp",
"agent": "org:resource",
})
# Parse timestamps
df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], utc=True)
# Sort by case and timestamp (critical for correct mining)
df = df.sort_values(["case:concept:name", "time:timestamp"])
# Convert to pm4py event log
log = pm4py.convert_to_event_log(df)
# Basic statistics
print(f"Cases: {len(log)}")
print(f"Events: {sum(len(trace) for trace in log)}")
print(f"Activities: {len(pm4py.get_event_attribute_values(log, 'concept:name'))}")
Filtering Noisy Logs
# Remove cases with fewer than 3 events (incomplete)
log_filtered = pm4py.filter_log(
lambda trace: len(trace) >= 3, log
)
# Keep only the most common 80% of trace variants
from pm4py.algo.filtering.log.variants import variants_filter
log_filtered = variants_filter.apply(
log,
parameters={"decreasingFactor": 0.8}
)
# Filter by time range
log_filtered = pm4py.filter_time_range(
log,
"2026-01-01 00:00:00",
"2026-03-31 23:59:59",
mode="traces_contained",
)
# Filter by specific activities
log_filtered = pm4py.filter_event_attribute_values(
log,
attribute_key="concept:name",
values=["Submit", "Review", "Approve", "Close"],
level="event",
retain=True,
)
Process Discovery with Algorithm Comparison
import pm4py
# Alpha Miner — original algorithm
net_alpha, im_alpha, fm_alpha = pm4py.discover_petri_net_alpha(log)
# Inductive Miner — recommended default
net_inductive, im_ind, fm_ind = pm4py.discover_petri_net_inductive(log)
# Inductive Miner with noise threshold
net_inductive_08, im_08, fm_08 = pm4py.discover_petri_net_inductive(
log, noise_threshold=0.2 # filter out 20% infrequent behavior
)
# Heuristics Miner
net_heuristics, im_heur, fm_heur = pm4py.discover_petri_net_heuristics(
log,
dependency_threshold=0.5,
and_threshold=0.65,
loop_two_threshold=0.5,
)
# Directly-Follows Graph (simplest visualization)
dfg, start_activities, end_activities = pm4py.discover_dfg(log)
pm4py.view_dfg(dfg, start_activities, end_activities)
# Process Tree (hierarchical model)
tree = pm4py.discover_process_tree_inductive(log)
pm4py.view_process_tree(tree)
Algorithm Selection Guide
| Scenario | Algorithm | Why |
|---|---|---|
| Clean, structured log | Alpha Miner | Simple, interpretable |
| Noisy real-world log | Inductive Miner (noise=0.2) | Handles noise gracefully |
| Very messy log, need quick overview | Heuristics Miner | Frequency-based filtering |
| Need formal model for verification | Inductive Miner | Guarantees sound model |
| Quick visualization for stakeholders | DFG | Fast, intuitive |
Conformance Checking in Detail
Token-Based Replay
from pm4py.algo.conformance.tokenreplay import algorithm as token_replay
replayed_traces = token_replay.apply(
log, net_inductive, im_ind, fm_ind
)
# Per-case fitness
for i, result in enumerate(replayed_traces[:5]):
print(f"Case {i}: "
f"missing={result['missing_tokens']}, "
f"remaining={result['remaining_tokens']}, "
f"produced={result['produced_tokens']}, "
f"consumed={result['consumed_tokens']}")
# Overall fitness
fitness = pm4py.fitness_token_based_replay(log, net_inductive, im_ind, fm_ind)
print(f"Log fitness: {fitness['log_fitness']:.4f}")
print(f"Average trace fitness: {fitness['average_trace_fitness']:.4f}")
Alignment-Based Conformance
from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments
aligned_traces = alignments.apply(log, net_inductive, im_ind, fm_ind)
# Each alignment shows optimal correspondence between log and model
for i, alignment in enumerate(aligned_traces[:3]):
print(f"\nCase {i}:")
print(f" Cost: {alignment['cost']}")
print(f" Fitness: {alignment['fitness']}")
for log_move, model_move in alignment['alignment']:
if log_move == model_move:
print(f" ✓ {log_move}")
elif log_move == ">>":
print(f" ⚠ Model move (skipped in log): {model_move}")
elif model_move == ">>":
print(f" ⚠ Log move (not in model): {log_move}")
Four Quality Dimensions
# Fitness — how much behavior is captured?
fitness = pm4py.fitness_token_based_replay(log, net_inductive, im_ind, fm_ind)
# Precision — does the model allow too much?
precision = pm4py.precision_token_based_replay(log, net_inductive, im_ind, fm_ind)
# Simplicity — is the model understandable?
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_eval
simplicity = simplicity_eval.apply(net_inductive)
# Generalization — will it work for future cases?
from pm4py.algo.evaluation.generalization import algorithm as gen_eval
generalization = gen_eval.apply(log, net_inductive, im_ind, fm_ind)
print(f"Fitness: {fitness['average_trace_fitness']:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Simplicity: {simplicity:.4f}")
print(f"Generalization: {generalization:.4f}")
Performance Mining
# Add performance data to the DFG
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_vis
# Performance DFG shows median time between activities
dfg_perf = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)
# Visualize with time annotations
gviz = dfg_vis.apply(
dfg_perf,
variant=dfg_vis.Variants.PERFORMANCE,
parameters={"format": "png"},
)
dfg_vis.save(gviz, "performance_dfg.png")
Bottleneck Detection
def find_bottlenecks(log, top_n=5):
"""Identify the slowest transitions between activities."""
from pm4py.statistics.sojourn_time.log import get as sojourn_get
# Sojourn time: how long cases spend in each activity
sojourn_times = sojourn_get.apply(
log, parameters={"timestamp_key": "time:timestamp"}
)
# Sort by median time
sorted_activities = sorted(
sojourn_times.items(),
key=lambda x: x[1] if isinstance(x[1], (int, float)) else 0,
reverse=True,
)
print(f"Top {top_n} bottleneck activities:")
for activity, time_val in sorted_activities[:top_n]:
if isinstance(time_val, (int, float)):
hours = time_val / 3600
print(f" {activity}: {hours:.1f} hours median sojourn time")
return sorted_activities
bottlenecks = find_bottlenecks(log)
Waiting Time vs Processing Time
def analyze_waiting_times(df):
"""Separate waiting time from processing time per activity."""
df = df.sort_values(["case:concept:name", "time:timestamp"])
results = []
for case_id, group in df.groupby("case:concept:name"):
events = group.to_dict("records")
for i in range(1, len(events)):
prev = events[i - 1]
curr = events[i]
waiting = (curr["time:timestamp"] - prev["time:timestamp"]).total_seconds()
results.append({
"case": case_id,
"from_activity": prev["concept:name"],
"to_activity": curr["concept:name"],
"waiting_seconds": waiting,
})
waiting_df = pd.DataFrame(results)
# Aggregate by transition
transition_stats = waiting_df.groupby(
["from_activity", "to_activity"]
)["waiting_seconds"].agg(["median", "mean", "std", "count"])
return transition_stats.sort_values("median", ascending=False)
Social Network Analysis
# Handover of work: who passes work to whom?
from pm4py.algo.organizational_mining.sna import algorithm as sna
# Build handover matrix
handover = sna.apply(log, variant=sna.Variants.HANDOVER_OF_WORK)
# Visualize as network
from pm4py.visualization.sna import visualizer as sna_vis
gviz = sna_vis.apply(handover, variant=sna_vis.Variants.PYVIS)
sna_vis.save(gviz, "handover_network.html")
# Working together: who works on the same cases?
working_together = sna.apply(log, variant=sna.Variants.WORKING_TOGETHER)
Building a Continuous Monitoring Pipeline
import pm4py
import pandas as pd
from datetime import datetime, timedelta
class ProcessMonitor:
def __init__(self, reference_model, reference_im, reference_fm):
self.model = reference_model
self.im = reference_im
self.fm = reference_fm
self.baseline_fitness = None
def set_baseline(self, log):
"""Establish baseline metrics from historical data."""
fitness = pm4py.fitness_token_based_replay(
log, self.model, self.im, self.fm
)
self.baseline_fitness = fitness["average_trace_fitness"]
print(f"Baseline fitness: {self.baseline_fitness:.4f}")
def check_period(self, recent_log) -> dict:
"""Check a recent period against the baseline."""
fitness = pm4py.fitness_token_based_replay(
recent_log, self.model, self.im, self.fm
)
current_fitness = fitness["average_trace_fitness"]
# Detect deviating cases
replayed = pm4py.conformance_diagnostics_token_based_replay(
recent_log, self.model, self.im, self.fm
)
deviating = [
r for r in replayed
if r.get("trace_fitness", 1.0) < 0.8
]
drift_detected = (
self.baseline_fitness is not None
and abs(current_fitness - self.baseline_fitness) > 0.05
)
return {
"period_fitness": current_fitness,
"baseline_fitness": self.baseline_fitness,
"drift_detected": drift_detected,
"deviating_cases": len(deviating),
"total_cases": len(recent_log),
"deviation_rate": len(deviating) / max(len(recent_log), 1),
}
# Usage
monitor = ProcessMonitor(net_inductive, im_ind, fm_ind)
monitor.set_baseline(historical_log)
# Run weekly
report = monitor.check_period(this_week_log)
if report["drift_detected"]:
print(f"⚠ Process drift detected! "
f"Fitness dropped from {report['baseline_fitness']:.3f} "
f"to {report['period_fitness']:.3f}")
Performance at Scale
| Log Size | Discovery (Inductive) | Conformance (Token) | Conformance (Alignment) |
|---|---|---|---|
| 1K cases | < 1 sec | < 1 sec | ~5 sec |
| 10K cases | ~3 sec | ~2 sec | ~2 min |
| 100K cases | ~30 sec | ~20 sec | ~30 min |
| 1M cases | ~5 min | ~3 min | Hours (use sampling) |
For large logs, use Parquet format and filter before loading:
# Read directly from Parquet (much faster than CSV/XES)
df = pm4py.read_parquet("events.parquet")
# Sample for alignment-based conformance
import random
sampled_indices = random.sample(range(len(log)), min(5000, len(log)))
sampled_log = pm4py.objects.log.obj.EventLog([log[i] for i in sampled_indices])
One thing to remember: Process mining with pm4py turns raw event logs into actionable insights — discover the real process, measure conformance against rules, find bottlenecks with performance mining, and set up continuous monitoring to detect process drift before it causes problems.
See Also
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
- Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
- Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
- Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.
- Python 312 New Features Python 3.12 made type hints shorter, f-strings more powerful, and started preparing Python's engine for a world without the GIL.