Python Communicating Sequential Processes — Deep Dive
CSP formalism and Python mapping
Hoare’s CSP defines processes as mathematical objects that communicate over named channels with synchronous rendezvous semantics. A process either offers to send on a channel, offers to receive, or chooses between multiple channels (external choice). Python lacks native CSP primitives, but its concurrency tools can model these concepts faithfully.
Building a CSP channel from scratch
A true rendezvous channel has zero buffer — the sender blocks until the receiver arrives, and vice versa:
import asyncio
from typing import TypeVar, Generic
T = TypeVar("T")
class RendezvousChannel(Generic[T]):
"""Zero-buffer CSP channel with synchronous rendezvous."""
def __init__(self):
self._send_event = asyncio.Event()
self._recv_event = asyncio.Event()
self._value: T | None = None
self._lock = asyncio.Lock()
async def send(self, value: T) -> None:
async with self._lock:
self._value = value
self._send_event.set()
self._recv_event.clear()
await self._recv_event.wait()
self._send_event.clear()
async def recv(self) -> T:
await self._send_event.wait()
value = self._value
self._recv_event.set()
return value # type: ignore
This implementation ensures both sides synchronize on each transfer. The sender deposits a value and waits for the receiver to acknowledge it. No buffering, no message accumulation.
Trio’s memory channels — production-grade CSP
Trio provides the most CSP-faithful channel implementation in the Python ecosystem:
import trio
async def producer(send_channel: trio.MemorySendChannel):
async with send_channel:
for i in range(10):
await send_channel.send(f"item-{i}")
print(f"Sent item-{i}")
async def consumer(receive_channel: trio.MemoryReceiveChannel):
async with receive_channel:
async for item in receive_channel:
print(f"Processing {item}")
await trio.sleep(0.1)
async def main():
send_ch, recv_ch = trio.open_memory_channel(max_buffer_size=0)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_ch)
nursery.start_soon(consumer, recv_ch)
trio.run(main)
With max_buffer_size=0, this is a rendezvous channel. Each send blocks until the corresponding recv. Trio also supports bounded buffers (max_buffer_size=N) for when you want some decoupling.
Select: waiting on multiple channels
Go’s select statement is one of CSP’s most powerful features. In Python, you can approximate it:
import asyncio
async def csp_select(*channels):
"""Wait on multiple channels, return the first available value."""
tasks = [
asyncio.create_task(ch.recv())
for ch in channels
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
# Cancel the rest
for task in pending:
task.cancel()
result = done.pop().result()
return result
# Usage in a timeout pattern
async def recv_with_timeout(channel, timeout_seconds):
timeout_ch = RendezvousChannel()
async def timer():
await asyncio.sleep(timeout_seconds)
await timeout_ch.send("timeout")
asyncio.create_task(timer())
result = await csp_select(channel, timeout_ch)
return result
This pattern handles timeouts, cancellation, and priority between channels — all without shared state.
Pipeline composition
CSP excels at data pipelines where each stage is an independent process:
import asyncio
async def read_lines(out_ch: RendezvousChannel):
lines = ["hello world", "CSP in Python", "channels are great"]
for line in lines:
await out_ch.send(line)
await out_ch.send(None) # sentinel
async def tokenize(in_ch: RendezvousChannel, out_ch: RendezvousChannel):
while True:
line = await in_ch.recv()
if line is None:
await out_ch.send(None)
break
for word in line.split():
await out_ch.send(word.lower())
async def count_words(in_ch: RendezvousChannel):
counts = {}
while True:
word = await in_ch.recv()
if word is None:
break
counts[word] = counts.get(word, 0) + 1
print(counts)
async def pipeline():
ch1 = RendezvousChannel()
ch2 = RendezvousChannel()
await asyncio.gather(
read_lines(ch1),
tokenize(ch1, ch2),
count_words(ch2),
)
Each stage is completely independent. You can test them in isolation by feeding synthetic messages. You can replace any stage without touching the others. The channel types serve as the interface contract.
Fan-out and fan-in patterns
CSP supports distributing work across multiple consumers:
async def fan_out(in_ch, worker_channels: list):
"""Round-robin distribute work to multiple workers."""
idx = 0
while True:
item = await in_ch.recv()
if item is None:
for wch in worker_channels:
await wch.send(None)
break
await worker_channels[idx % len(worker_channels)].send(item)
idx += 1
async def fan_in(worker_outputs: list, out_ch):
"""Merge results from multiple workers into one channel."""
active = len(worker_outputs)
while active > 0:
result = await csp_select(*worker_outputs)
if result is None:
active -= 1
continue
await out_ch.send(result)
await out_ch.send(None)
This gives you controllable parallelism with explicit data flow — no thread pools, no shared counters, no locks.
Multiprocessing CSP for CPU-bound work
For true parallelism, use multiprocessing.Pipe as a CSP channel:
import multiprocessing as mp
def cpu_worker(conn: mp.connection.Connection):
"""Process that reads from a pipe and sends results back."""
while True:
data = conn.recv()
if data is None:
break
result = heavy_computation(data)
conn.send(result)
def heavy_computation(data):
return sum(x * x for x in range(data))
# Create channel pair
parent_conn, child_conn = mp.Pipe()
worker = mp.Process(target=cpu_worker, args=(child_conn,))
worker.start()
# Send work through the channel
parent_conn.send(1_000_000)
result = parent_conn.recv()
print(f"Result: {result}")
parent_conn.send(None) # shutdown signal
worker.join()
Pipe provides bidirectional communication with rendezvous-like semantics — recv() blocks until data arrives. For unidirectional channels, pass duplex=False.
Deadlock detection and prevention
CSP’s synchronous nature means deadlocks are possible: process A waits to send to B, which waits to send to A. Prevention strategies:
- Consistent channel ordering — always acquire channels in the same order
- Timeout on all operations — use
asyncio.wait_forwith a deadline - Cycle detection — track which process waits on which channel; detect cycles at runtime
- Design review — draw the channel graph before coding; cycles in the graph are potential deadlocks
Performance considerations
| Channel type | Throughput (msgs/sec) | Latency | Use case |
|---|---|---|---|
asyncio.Queue(1) | ~500K | ~2μs | Async I/O pipelines |
trio.open_memory_channel(0) | ~200K | ~5μs | Structured concurrency |
multiprocessing.Pipe | ~50K | ~20μs | CPU-bound parallelism |
| Custom rendezvous | ~300K | ~3μs | Strict CSP semantics |
The overhead is in serialization for multiprocessing and event scheduling for asyncio. For most applications, these throughputs are more than sufficient.
CSP vs other concurrency models
CSP works best when your problem looks like a dataflow graph — data enters, flows through transformations, and exits. If your problem looks more like independent entities managing their own lifecycle (chat sessions, game objects), the actor model may be a better fit. If you need raw shared-memory performance, threading with locks might be necessary despite the complexity cost.
The one thing to remember: CSP gives you composable concurrency through synchronized channels. Each process is simple and sequential; the concurrency emerges from how they’re wired together. In Python, Trio’s memory channels are the closest to production-grade CSP, while asyncio and multiprocessing offer the building blocks for custom implementations.
See Also
- Python Actor Model Why treating each piece of your program like a person with their own mailbox makes concurrency way less scary.
- Python Aiocache Caching aiocache remembers expensive answers so your async Python app doesn't waste time asking the same question twice.
- Python Aiofiles Async Io aiofiles lets your async Python program read and write files without freezing — because normal file operations secretly block everything.
- Python Aiohttp Understand Aiohttp through an everyday analogy so Python behavior feels intuitive, not random.
- Python Anyio Portability AnyIO lets your async Python code work with any async library — write once, run on asyncio or Trio without changes.