Python Barrier Synchronization — Deep Dive
threading.Barrier internals
Python’s threading.Barrier uses a combination of a Lock and a Condition variable. When wait() is called, the barrier increments a counter. If the counter hasn’t reached parties, the thread waits on the condition. When the last thread arrives, it broadcasts to all waiting threads and resets the counter for reuse.
# Simplified implementation of how Barrier works internally
import threading
class SimpleBarrier:
def __init__(self, parties):
self.parties = parties
self._count = 0
self._lock = threading.Lock()
self._condition = threading.Condition(self._lock)
self._generation = 0 # prevents spurious wake across rounds
def wait(self, timeout=None):
with self._lock:
gen = self._generation
self._count += 1
index = self._count - 1
if self._count == self.parties:
# Last to arrive — release everyone
self._count = 0
self._generation += 1
self._condition.notify_all()
return 0 # leader index
else:
# Wait for the last thread
while self._generation == gen:
if not self._condition.wait(timeout):
raise threading.BrokenBarrierError
return index
The generation counter prevents a fast thread from one round being confused with waiters in the next round — a subtle bug that’s easy to miss in custom implementations.
Barrier with action callback
threading.Barrier accepts an optional action callable that runs exactly once when the barrier trips, before any thread is released:
import threading
results = {}
results_lock = threading.Lock()
def aggregate_results():
"""Runs once when all threads reach the barrier."""
total = sum(results.values())
print(f"Phase complete. Total: {total}")
barrier = threading.Barrier(4, action=aggregate_results)
def worker(worker_id):
# Phase 1: compute
partial_result = worker_id * 10
with results_lock:
results[worker_id] = partial_result
# Sync — action runs here between phases
barrier.wait()
# Phase 2: use aggregated results
print(f"Worker {worker_id} continuing after barrier")
threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
The action callback is powerful for reduction operations — summing partial results, updating shared state, or logging phase transitions.
Multiprocessing barriers
For true parallelism across CPU cores, use multiprocessing.Barrier:
import multiprocessing as mp
import time
def parallel_worker(barrier, worker_id, shared_array):
# Phase 1: each process fills its section
for i in range(10):
shared_array[worker_id * 10 + i] = worker_id * 100 + i
print(f"Worker {worker_id} done with phase 1")
barrier.wait()
# Phase 2: all processes can now read the full array
total = sum(shared_array)
print(f"Worker {worker_id} sees total: {total}")
if __name__ == "__main__":
num_workers = 4
barrier = mp.Barrier(num_workers)
shared_array = mp.Array("i", num_workers * 10)
processes = [
mp.Process(target=parallel_worker, args=(barrier, i, shared_array))
for i in range(num_workers)
]
for p in processes:
p.start()
for p in processes:
p.join()
Async barrier for asyncio
Python’s standard library doesn’t provide an async barrier, but you can build one:
import asyncio
class AsyncBarrier:
def __init__(self, parties: int):
self.parties = parties
self._count = 0
self._event = asyncio.Event()
self._lock = asyncio.Lock()
self._generation = 0
async def wait(self):
async with self._lock:
gen = self._generation
self._count += 1
idx = self._count - 1
if self._count == self.parties:
self._count = 0
self._generation += 1
self._event.set()
self._event = asyncio.Event() # reset for next round
return 0
# Wait outside the lock
while True:
await self._event.wait()
if self._generation > gen:
return idx
# Spurious wake, wait again
async def reset(self):
async with self._lock:
self._count = 0
self._generation += 1
self._event.set()
self._event = asyncio.Event()
Parallel iterative algorithm: Jacobi relaxation
Barriers enable parallel iterative algorithms where each iteration depends on the previous one being complete across all workers:
import threading
import numpy as np
def jacobi_worker(grid, new_grid, start_row, end_row, barrier, iterations):
"""Parallel Jacobi iteration for solving Laplace equation."""
rows, cols = grid.shape
for _ in range(iterations):
# Compute phase: update assigned rows
for i in range(max(1, start_row), min(rows - 1, end_row)):
for j in range(1, cols - 1):
new_grid[i, j] = 0.25 * (
grid[i-1, j] + grid[i+1, j] +
grid[i, j-1] + grid[i, j+1]
)
# Sync: everyone must finish computing before swapping
barrier.wait()
# Swap phase: copy new values (only leader does this)
# Using barrier return value to elect leader
idx = barrier.wait()
if idx == 0:
grid[:] = new_grid[:]
# Sync again: ensure swap is complete before next iteration
barrier.wait()
def parallel_jacobi(size=100, num_threads=4, iterations=1000):
grid = np.zeros((size, size))
new_grid = np.zeros_like(grid)
# Set boundary conditions
grid[0, :] = 100.0 # top edge hot
barrier = threading.Barrier(num_threads)
rows_per_thread = size // num_threads
threads = []
for t in range(num_threads):
start = t * rows_per_thread
end = start + rows_per_thread if t < num_threads - 1 else size
thread = threading.Thread(
target=jacobi_worker,
args=(grid, new_grid, start, end, barrier, iterations),
)
threads.append(thread)
for t in threads:
t.start()
for t in threads:
t.join()
return grid
Each iteration has three barrier synchronizations: after computation, after leader swap, and before the next iteration. This pattern is ubiquitous in parallel scientific computing.
Cyclic barriers and phases
Python’s threading.Barrier is inherently cyclic — it resets after each trip. This enables multi-phase algorithms without creating new barriers:
import threading
def phased_worker(barrier, worker_id, num_phases):
for phase in range(num_phases):
# Do phase-specific work
result = compute_phase(worker_id, phase)
# Barrier returns the index; 0 = leader
idx = barrier.wait()
if idx == 0:
# Leader aggregates, logs, or transitions
print(f"Phase {phase} complete")
# Everyone continues to next phase
barrier.wait() # ensure leader finished before next phase
def compute_phase(worker_id, phase):
return worker_id * phase
Error handling and broken barriers
When a participant fails, the barrier must not hang the remaining participants:
import threading
barrier = threading.Barrier(4, timeout=30)
def resilient_worker(worker_id):
try:
result = risky_computation(worker_id)
barrier.wait()
except Exception:
barrier.abort() # breaks the barrier for all
raise
def safe_wait(barrier):
try:
return barrier.wait()
except threading.BrokenBarrierError:
# Handle gracefully: log, cleanup, retry with fewer workers
return -1
abort() immediately breaks the barrier — all currently waiting threads and all future callers receive BrokenBarrierError. This is a hard stop. For graceful degradation, you need a custom barrier that can reduce its party count at runtime.
Performance considerations
Barriers add synchronization overhead proportional to the number of participants:
| Participants | Barrier overhead (per sync) | Notes |
|---|---|---|
| 2-4 | ~5-10μs | Negligible for most workloads |
| 8-16 | ~20-50μs | Fine for phases lasting >1ms |
| 64+ | ~100-500μs | Consider hierarchical barriers |
| 1000+ | ~1-5ms | Barrier becomes the bottleneck |
For large numbers of participants, use hierarchical barriers: groups of 8-16 workers synchronize locally, then group leaders synchronize globally. This reduces contention on the condition variable.
The one thing to remember: barriers coordinate phased parallel execution by ensuring all workers reach the same point before any proceed. Python’s threading.Barrier handles cyclic reuse, action callbacks, timeouts, and broken-barrier error propagation. Use them for iterative algorithms, parallel simulations, and any workload where phases must complete atomically across all workers.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.