Python Process Mining — Deep Dive

Event Log Preparation

Real-world event logs are messy. Before mining, you need to clean and structure the data:

import pandas as pd
import pm4py

# Load from CSV (most common source)
df = pd.read_csv("tickets.csv")

# Required columns mapping
df = df.rename(columns={
    "ticket_id": "case:concept:name",
    "action": "concept:name",
    "action_time": "time:timestamp",
    "agent": "org:resource",
})

# Parse timestamps
df["time:timestamp"] = pd.to_datetime(df["time:timestamp"], utc=True)

# Sort by case and timestamp (critical for correct mining)
df = df.sort_values(["case:concept:name", "time:timestamp"])

# Convert to pm4py event log
log = pm4py.convert_to_event_log(df)

# Basic statistics
print(f"Cases: {len(log)}")
print(f"Events: {sum(len(trace) for trace in log)}")
print(f"Activities: {len(pm4py.get_event_attribute_values(log, 'concept:name'))}")

Filtering Noisy Logs

# Remove cases with fewer than 3 events (incomplete)
log_filtered = pm4py.filter_log(
    lambda trace: len(trace) >= 3, log
)

# Keep only the most common 80% of trace variants
from pm4py.algo.filtering.log.variants import variants_filter
log_filtered = variants_filter.apply(
    log,
    parameters={"decreasingFactor": 0.8}
)

# Filter by time range
log_filtered = pm4py.filter_time_range(
    log,
    "2026-01-01 00:00:00",
    "2026-03-31 23:59:59",
    mode="traces_contained",
)

# Filter by specific activities
log_filtered = pm4py.filter_event_attribute_values(
    log,
    attribute_key="concept:name",
    values=["Submit", "Review", "Approve", "Close"],
    level="event",
    retain=True,
)

Process Discovery with Algorithm Comparison

import pm4py

# Alpha Miner — original algorithm
net_alpha, im_alpha, fm_alpha = pm4py.discover_petri_net_alpha(log)

# Inductive Miner — recommended default
net_inductive, im_ind, fm_ind = pm4py.discover_petri_net_inductive(log)

# Inductive Miner with noise threshold
net_inductive_08, im_08, fm_08 = pm4py.discover_petri_net_inductive(
    log, noise_threshold=0.2  # filter out 20% infrequent behavior
)

# Heuristics Miner
net_heuristics, im_heur, fm_heur = pm4py.discover_petri_net_heuristics(
    log,
    dependency_threshold=0.5,
    and_threshold=0.65,
    loop_two_threshold=0.5,
)

# Directly-Follows Graph (simplest visualization)
dfg, start_activities, end_activities = pm4py.discover_dfg(log)
pm4py.view_dfg(dfg, start_activities, end_activities)

# Process Tree (hierarchical model)
tree = pm4py.discover_process_tree_inductive(log)
pm4py.view_process_tree(tree)

Algorithm Selection Guide

ScenarioAlgorithmWhy
Clean, structured logAlpha MinerSimple, interpretable
Noisy real-world logInductive Miner (noise=0.2)Handles noise gracefully
Very messy log, need quick overviewHeuristics MinerFrequency-based filtering
Need formal model for verificationInductive MinerGuarantees sound model
Quick visualization for stakeholdersDFGFast, intuitive

Conformance Checking in Detail

Token-Based Replay

from pm4py.algo.conformance.tokenreplay import algorithm as token_replay

replayed_traces = token_replay.apply(
    log, net_inductive, im_ind, fm_ind
)

# Per-case fitness
for i, result in enumerate(replayed_traces[:5]):
    print(f"Case {i}: "
          f"missing={result['missing_tokens']}, "
          f"remaining={result['remaining_tokens']}, "
          f"produced={result['produced_tokens']}, "
          f"consumed={result['consumed_tokens']}")

# Overall fitness
fitness = pm4py.fitness_token_based_replay(log, net_inductive, im_ind, fm_ind)
print(f"Log fitness: {fitness['log_fitness']:.4f}")
print(f"Average trace fitness: {fitness['average_trace_fitness']:.4f}")

Alignment-Based Conformance

from pm4py.algo.conformance.alignments.petri_net import algorithm as alignments

aligned_traces = alignments.apply(log, net_inductive, im_ind, fm_ind)

# Each alignment shows optimal correspondence between log and model
for i, alignment in enumerate(aligned_traces[:3]):
    print(f"\nCase {i}:")
    print(f"  Cost: {alignment['cost']}")
    print(f"  Fitness: {alignment['fitness']}")
    for log_move, model_move in alignment['alignment']:
        if log_move == model_move:
            print(f"  ✓ {log_move}")
        elif log_move == ">>":
            print(f"  ⚠ Model move (skipped in log): {model_move}")
        elif model_move == ">>":
            print(f"  ⚠ Log move (not in model): {log_move}")

Four Quality Dimensions

# Fitness — how much behavior is captured?
fitness = pm4py.fitness_token_based_replay(log, net_inductive, im_ind, fm_ind)

# Precision — does the model allow too much?
precision = pm4py.precision_token_based_replay(log, net_inductive, im_ind, fm_ind)

# Simplicity — is the model understandable?
from pm4py.algo.evaluation.simplicity import algorithm as simplicity_eval
simplicity = simplicity_eval.apply(net_inductive)

# Generalization — will it work for future cases?
from pm4py.algo.evaluation.generalization import algorithm as gen_eval
generalization = gen_eval.apply(log, net_inductive, im_ind, fm_ind)

print(f"Fitness:        {fitness['average_trace_fitness']:.4f}")
print(f"Precision:      {precision:.4f}")
print(f"Simplicity:     {simplicity:.4f}")
print(f"Generalization: {generalization:.4f}")

Performance Mining

# Add performance data to the DFG
from pm4py.algo.discovery.dfg import algorithm as dfg_discovery
from pm4py.visualization.dfg import visualizer as dfg_vis

# Performance DFG shows median time between activities
dfg_perf = dfg_discovery.apply(log, variant=dfg_discovery.Variants.PERFORMANCE)

# Visualize with time annotations
gviz = dfg_vis.apply(
    dfg_perf,
    variant=dfg_vis.Variants.PERFORMANCE,
    parameters={"format": "png"},
)
dfg_vis.save(gviz, "performance_dfg.png")

Bottleneck Detection

def find_bottlenecks(log, top_n=5):
    """Identify the slowest transitions between activities."""
    from pm4py.statistics.sojourn_time.log import get as sojourn_get

    # Sojourn time: how long cases spend in each activity
    sojourn_times = sojourn_get.apply(
        log, parameters={"timestamp_key": "time:timestamp"}
    )

    # Sort by median time
    sorted_activities = sorted(
        sojourn_times.items(),
        key=lambda x: x[1] if isinstance(x[1], (int, float)) else 0,
        reverse=True,
    )

    print(f"Top {top_n} bottleneck activities:")
    for activity, time_val in sorted_activities[:top_n]:
        if isinstance(time_val, (int, float)):
            hours = time_val / 3600
            print(f"  {activity}: {hours:.1f} hours median sojourn time")

    return sorted_activities

bottlenecks = find_bottlenecks(log)

Waiting Time vs Processing Time

def analyze_waiting_times(df):
    """Separate waiting time from processing time per activity."""
    df = df.sort_values(["case:concept:name", "time:timestamp"])

    results = []
    for case_id, group in df.groupby("case:concept:name"):
        events = group.to_dict("records")
        for i in range(1, len(events)):
            prev = events[i - 1]
            curr = events[i]
            waiting = (curr["time:timestamp"] - prev["time:timestamp"]).total_seconds()
            results.append({
                "case": case_id,
                "from_activity": prev["concept:name"],
                "to_activity": curr["concept:name"],
                "waiting_seconds": waiting,
            })

    waiting_df = pd.DataFrame(results)

    # Aggregate by transition
    transition_stats = waiting_df.groupby(
        ["from_activity", "to_activity"]
    )["waiting_seconds"].agg(["median", "mean", "std", "count"])

    return transition_stats.sort_values("median", ascending=False)

Social Network Analysis

# Handover of work: who passes work to whom?
from pm4py.algo.organizational_mining.sna import algorithm as sna

# Build handover matrix
handover = sna.apply(log, variant=sna.Variants.HANDOVER_OF_WORK)

# Visualize as network
from pm4py.visualization.sna import visualizer as sna_vis
gviz = sna_vis.apply(handover, variant=sna_vis.Variants.PYVIS)
sna_vis.save(gviz, "handover_network.html")

# Working together: who works on the same cases?
working_together = sna.apply(log, variant=sna.Variants.WORKING_TOGETHER)

Building a Continuous Monitoring Pipeline

import pm4py
import pandas as pd
from datetime import datetime, timedelta

class ProcessMonitor:
    def __init__(self, reference_model, reference_im, reference_fm):
        self.model = reference_model
        self.im = reference_im
        self.fm = reference_fm
        self.baseline_fitness = None

    def set_baseline(self, log):
        """Establish baseline metrics from historical data."""
        fitness = pm4py.fitness_token_based_replay(
            log, self.model, self.im, self.fm
        )
        self.baseline_fitness = fitness["average_trace_fitness"]
        print(f"Baseline fitness: {self.baseline_fitness:.4f}")

    def check_period(self, recent_log) -> dict:
        """Check a recent period against the baseline."""
        fitness = pm4py.fitness_token_based_replay(
            recent_log, self.model, self.im, self.fm
        )
        current_fitness = fitness["average_trace_fitness"]

        # Detect deviating cases
        replayed = pm4py.conformance_diagnostics_token_based_replay(
            recent_log, self.model, self.im, self.fm
        )
        deviating = [
            r for r in replayed
            if r.get("trace_fitness", 1.0) < 0.8
        ]

        drift_detected = (
            self.baseline_fitness is not None
            and abs(current_fitness - self.baseline_fitness) > 0.05
        )

        return {
            "period_fitness": current_fitness,
            "baseline_fitness": self.baseline_fitness,
            "drift_detected": drift_detected,
            "deviating_cases": len(deviating),
            "total_cases": len(recent_log),
            "deviation_rate": len(deviating) / max(len(recent_log), 1),
        }

# Usage
monitor = ProcessMonitor(net_inductive, im_ind, fm_ind)
monitor.set_baseline(historical_log)

# Run weekly
report = monitor.check_period(this_week_log)
if report["drift_detected"]:
    print(f"⚠ Process drift detected! "
          f"Fitness dropped from {report['baseline_fitness']:.3f} "
          f"to {report['period_fitness']:.3f}")

Performance at Scale

Log SizeDiscovery (Inductive)Conformance (Token)Conformance (Alignment)
1K cases< 1 sec< 1 sec~5 sec
10K cases~3 sec~2 sec~2 min
100K cases~30 sec~20 sec~30 min
1M cases~5 min~3 minHours (use sampling)

For large logs, use Parquet format and filter before loading:

# Read directly from Parquet (much faster than CSV/XES)
df = pm4py.read_parquet("events.parquet")

# Sample for alignment-based conformance
import random
sampled_indices = random.sample(range(len(log)), min(5000, len(log)))
sampled_log = pm4py.objects.log.obj.EventLog([log[i] for i in sampled_indices])

One thing to remember: Process mining with pm4py turns raw event logs into actionable insights — discover the real process, measure conformance against rules, find bottlenecks with performance mining, and set up continuous monitoring to detect process drift before it causes problems.

pythonprocess-miningdata-analysis

See Also

  • Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
  • Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
  • Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
  • Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.
  • Python 312 New Features Python 3.12 made type hints shorter, f-strings more powerful, and started preparing Python's engine for a world without the GIL.