Python Petri Nets — Deep Dive
Petri Net Implementation from Scratch
from dataclasses import dataclass, field
from typing import Dict, List, Set, Tuple
@dataclass
class Place:
name: str
tokens: int = 0
@dataclass
class Transition:
name: str
inputs: Dict[str, int] = field(default_factory=dict) # place_name → weight
outputs: Dict[str, int] = field(default_factory=dict) # place_name → weight
class PetriNet:
def __init__(self):
self.places: Dict[str, Place] = {}
self.transitions: Dict[str, Transition] = {}
def add_place(self, name: str, tokens: int = 0) -> Place:
place = Place(name=name, tokens=tokens)
self.places[name] = place
return place
def add_transition(
self,
name: str,
inputs: Dict[str, int],
outputs: Dict[str, int],
) -> Transition:
transition = Transition(name=name, inputs=inputs, outputs=outputs)
self.transitions[name] = transition
return transition
def is_enabled(self, transition_name: str) -> bool:
"""Check if a transition can fire."""
t = self.transitions[transition_name]
return all(
self.places[place].tokens >= weight
for place, weight in t.inputs.items()
)
def fire(self, transition_name: str) -> bool:
"""Fire a transition if enabled. Returns True if fired."""
if not self.is_enabled(transition_name):
return False
t = self.transitions[transition_name]
# Remove tokens from input places
for place, weight in t.inputs.items():
self.places[place].tokens -= weight
# Add tokens to output places
for place, weight in t.outputs.items():
self.places[place].tokens += weight
return True
def marking(self) -> Dict[str, int]:
"""Return current marking (token distribution)."""
return {name: place.tokens for name, place in self.places.items()}
def enabled_transitions(self) -> List[str]:
"""Return all currently enabled transitions."""
return [name for name in self.transitions if self.is_enabled(name)]
def simulate(self, steps: int = 100, strategy: str = "random") -> List[str]:
"""Run simulation, returning sequence of fired transitions."""
import random
history = []
for _ in range(steps):
enabled = self.enabled_transitions()
if not enabled:
break
if strategy == "random":
choice = random.choice(enabled)
else:
choice = enabled[0] # deterministic
self.fire(choice)
history.append(choice)
return history
# Example: Producer-Consumer with bounded buffer
net = PetriNet()
# Places
net.add_place("producer_ready", tokens=1)
net.add_place("buffer_space", tokens=3) # buffer capacity = 3
net.add_place("buffer_items", tokens=0)
net.add_place("consumer_ready", tokens=1)
# Transitions
net.add_transition(
"produce",
inputs={"producer_ready": 1, "buffer_space": 1},
outputs={"producer_ready": 1, "buffer_items": 1},
)
net.add_transition(
"consume",
inputs={"consumer_ready": 1, "buffer_items": 1},
outputs={"consumer_ready": 1, "buffer_space": 1},
)
# Simulate
print(f"Initial: {net.marking()}")
for _ in range(5):
enabled = net.enabled_transitions()
if enabled:
net.fire(enabled[0])
print(f"Fired: {enabled[0]}, Marking: {net.marking()}")
Reachability Analysis
The reachability graph enumerates all possible markings:
from collections import deque
def build_reachability_graph(net: PetriNet) -> dict:
"""Build the complete reachability graph."""
initial = tuple(sorted(net.marking().items()))
graph = {} # marking → {transition_name: resulting_marking}
queue = deque([initial])
visited = set()
while queue:
current_marking = queue.popleft()
if current_marking in visited:
continue
visited.add(current_marking)
# Restore marking
for name, tokens in current_marking:
net.places[name].tokens = tokens
graph[current_marking] = {}
for t_name in net.enabled_transitions():
# Save state
saved = net.marking()
net.fire(t_name)
new_marking = tuple(sorted(net.marking().items()))
graph[current_marking][t_name] = new_marking
if new_marking not in visited:
queue.append(new_marking)
# Restore state
for name, tokens in saved.items():
net.places[name].tokens = tokens
return graph
def check_deadlock_freedom(graph: dict) -> Tuple[bool, list]:
"""Check if any reachable marking is a deadlock."""
deadlocks = [
marking for marking, transitions in graph.items()
if not transitions
]
return len(deadlocks) == 0, deadlocks
def check_boundedness(graph: dict) -> Dict[str, int]:
"""Find the maximum tokens in each place across all reachable markings."""
max_tokens = {}
for marking in graph:
for name, tokens in marking:
max_tokens[name] = max(max_tokens.get(name, 0), tokens)
return max_tokens
Colored Petri Nets with SNAKES
The SNAKES library supports colored tokens with Python expressions on arcs:
import snakes.plugins
snakes.plugins.load("gv", "snakes.nets", "nets")
from nets import PetriNet, Place, Transition, Expression, Variable
# Create a colored Petri net
net = PetriNet("order_processing")
# Places with typed tokens
net.add_place(Place("new_orders", [("ORD-001", 59.99), ("ORD-002", 129.00)]))
net.add_place(Place("validated", []))
net.add_place(Place("rejected", []))
# Transition with guard condition
net.add_transition(Transition("validate", guard=Expression("amount > 0")))
# Arcs with variable bindings
net.add_input("new_orders", "validate", Variable("order"))
net.add_output("validated", "validate",
Expression("order if order[1] < 100 else None"))
# Fire with specific binding
binding = {"order": ("ORD-001", 59.99), "amount": 59.99}
net.transition("validate").fire(binding)
SNAKES is particularly useful for academic modeling and prototyping concurrent protocols where token data matters.
Process Mining: Discovering Petri Nets from Event Logs
pm4py can discover Petri net models from real execution logs:
import pm4py
# Load event log
log = pm4py.read_xes("process_log.xes")
# Discover Petri net using Alpha Miner
net, initial_marking, final_marking = pm4py.discover_petri_net_alpha(log)
# Or use Inductive Miner (more robust)
net, initial_marking, final_marking = pm4py.discover_petri_net_inductive(log)
# Visualize
pm4py.view_petri_net(net, initial_marking, final_marking)
# Conformance checking — does the log fit the model?
fitness = pm4py.fitness_token_based_replay(log, net, initial_marking, final_marking)
print(f"Fitness: {fitness['average_trace_fitness']:.2%}")
# Find precision — does the model allow behavior not in the log?
precision = pm4py.precision_token_based_replay(log, net, initial_marking, final_marking)
print(f"Precision: {precision:.2%}")
Modeling Patterns
Mutual Exclusion (Mutex)
net = PetriNet()
net.add_place("mutex", tokens=1) # shared resource
net.add_place("process_a_idle", tokens=1)
net.add_place("process_a_critical", tokens=0)
net.add_place("process_b_idle", tokens=1)
net.add_place("process_b_critical", tokens=0)
# Process A enters critical section
net.add_transition("a_enter",
inputs={"process_a_idle": 1, "mutex": 1},
outputs={"process_a_critical": 1})
net.add_transition("a_exit",
inputs={"process_a_critical": 1},
outputs={"process_a_idle": 1, "mutex": 1})
# Process B enters critical section
net.add_transition("b_enter",
inputs={"process_b_idle": 1, "mutex": 1},
outputs={"process_b_critical": 1})
net.add_transition("b_exit",
inputs={"process_b_critical": 1},
outputs={"process_b_idle": 1, "mutex": 1})
# The single token in "mutex" guarantees mutual exclusion
Fork-Join Parallelism
net = PetriNet()
net.add_place("start", tokens=1)
net.add_place("task_a", tokens=0)
net.add_place("task_b", tokens=0)
net.add_place("task_c", tokens=0)
net.add_place("all_done", tokens=0)
# Fork: one transition produces tokens for parallel tasks
net.add_transition("fork",
inputs={"start": 1},
outputs={"task_a": 1, "task_b": 1, "task_c": 1})
# Individual completions
net.add_place("a_done", tokens=0)
net.add_place("b_done", tokens=0)
net.add_place("c_done", tokens=0)
net.add_transition("do_a", inputs={"task_a": 1}, outputs={"a_done": 1})
net.add_transition("do_b", inputs={"task_b": 1}, outputs={"b_done": 1})
net.add_transition("do_c", inputs={"task_c": 1}, outputs={"c_done": 1})
# Join: requires all tasks complete
net.add_transition("join",
inputs={"a_done": 1, "b_done": 1, "c_done": 1},
outputs={"all_done": 1})
Dining Philosophers (Deadlock Demo)
def dining_philosophers(n: int = 5) -> PetriNet:
net = PetriNet()
# Forks (shared resources)
for i in range(n):
net.add_place(f"fork_{i}", tokens=1)
# Philosophers
for i in range(n):
net.add_place(f"thinking_{i}", tokens=1)
net.add_place(f"eating_{i}", tokens=0)
left_fork = f"fork_{i}"
right_fork = f"fork_{(i + 1) % n}"
net.add_transition(f"pickup_{i}",
inputs={f"thinking_{i}": 1, left_fork: 1, right_fork: 1},
outputs={f"eating_{i}": 1})
net.add_transition(f"putdown_{i}",
inputs={f"eating_{i}": 1},
outputs={f"thinking_{i}": 1, left_fork: 1, right_fork: 1})
return net
net = dining_philosophers(5)
# Reachability analysis would show this net is deadlock-free
# because pickup requires BOTH forks atomically
Invariant Analysis
Place invariants (P-invariants) identify conservation laws — weighted sums of tokens that remain constant regardless of which transitions fire:
import numpy as np
def incidence_matrix(net: PetriNet) -> Tuple[np.ndarray, list, list]:
"""Build the incidence matrix C where C[i,j] = output[i,j] - input[i,j]."""
places = sorted(net.places.keys())
transitions_list = sorted(net.transitions.keys())
C = np.zeros((len(places), len(transitions_list)), dtype=int)
for j, t_name in enumerate(transitions_list):
t = net.transitions[t_name]
for i, p_name in enumerate(places):
produced = t.outputs.get(p_name, 0)
consumed = t.inputs.get(p_name, 0)
C[i, j] = produced - consumed
return C, places, transitions_list
def find_p_invariants(C: np.ndarray) -> np.ndarray:
"""Find place invariants: vectors y where y^T * C = 0."""
from scipy.linalg import null_space
return null_space(C.T).T
A P-invariant y means that y · M is constant for all reachable markings M. This proves conservation properties — for the mutex example, tokens(mutex) + tokens(a_critical) + tokens(b_critical) = 1 always holds.
Performance Considerations
| Operation | Complexity | Notes |
|---|---|---|
| Enable check | O(inputs per transition) | Fast for sparse nets |
| Fire | O(inputs + outputs) | Token arithmetic |
| Reachability graph | O(states × transitions) | Exponential in places/tokens |
| P-invariant computation | O(places³) | Linear algebra |
The reachability graph is the bottleneck. A net with p places each holding up to k tokens has up to (k+1)^p possible markings. For analysis of large nets, use structural techniques (invariants, traps, siphons) that avoid state enumeration.
One thing to remember: Petri nets let you model and formally verify concurrent systems in Python — from simple producer-consumer patterns to complex distributed protocols — and their mathematical properties (reachability, boundedness, liveness) give you provable guarantees that testing alone can’t provide.
See Also
- Python Finite Automata How finite automata work like vending machines — they read input step by step and decide whether to accept or reject based on simple rules.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.
- Containerization Why does software that works on your computer break on everyone else's? Containers fix that — and they're why Netflix can deploy 100 updates a day without the site going down.
- Python 310 New Features Python 3.10 gave programmers a shape-sorting machine, friendlier error messages, and cleaner ways to say 'this or that' in type hints.
- Python 311 New Features Python 3.11 made everything faster, error messages smarter, and let you catch several mistakes at once instead of stopping at the first one.