Python Pulsar Messaging — Deep Dive
Client Architecture
The pulsar-client Python package wraps the official C++ client via pybind11, giving Python programs access to the same connection pooling, batching, and compression that Java and C++ clients use. This matters for throughput — the Python overhead is minimal since the hot path runs in native code.
import pulsar
client = pulsar.Client(
service_url="pulsar://broker1:6650,broker2:6650",
operation_timeout_seconds=30,
io_threads=4,
message_listener_threads=4,
authentication=pulsar.AuthenticationToken("eyJhbGci..."),
)
Connection multiplexing means a single client instance efficiently handles multiple producers and consumers. Creating separate client instances per topic wastes TCP connections and memory.
Partitioned Topics
For high throughput, Pulsar topics are partitioned across brokers. Unlike Kafka where partition count is set at topic creation, Pulsar allows increasing partitions on a live topic:
pulsar-admin topics update-partitioned-topic \
persistent://public/default/events -p 16
From Python, the producer routes messages to partitions automatically. Custom routing is supported:
class OrderRouter(pulsar.MessageRoutingMode):
def choose_partition(self, message, num_partitions):
key = message.partition_key()
if key:
return hash(key) % num_partitions
return random.randint(0, num_partitions - 1)
producer = client.create_producer(
"persistent://public/default/orders",
message_routing_mode=pulsar.MessageRoutingMode.CustomPartition,
custom_message_router=OrderRouter(),
batching_enabled=True,
batching_max_messages=1000,
batching_max_publish_delay_ms=10,
)
Producer Patterns
Batching and Compression
Pulsar producers batch messages by default. Configure batch size and delay to balance latency against throughput:
producer = client.create_producer(
topic="persistent://public/default/events",
batching_enabled=True,
batching_max_messages=500,
batching_max_publish_delay_ms=5,
compression_type=pulsar.CompressionType.ZSTD,
)
ZSTD compression typically reduces message sizes by 60–80% for JSON payloads, directly reducing network and storage costs.
Async Sending with Callbacks
import concurrent.futures
results = []
def send_callback(res, msg_id):
if res == pulsar.Result.Ok:
results.append(msg_id)
else:
logging.error(f"Send failed: {res}")
for event in event_batch:
producer.send_async(
event.encode(),
callback=send_callback,
partition_key=event.user_id,
)
producer.flush() # Wait for all async sends
The async API is essential for high-throughput pipelines. Synchronous send() blocks on each broker acknowledgment, limiting throughput to one round-trip per message.
Consumer Patterns
Dead-Letter Topics
Messages that fail processing repeatedly should not block the subscription. Pulsar’s dead-letter policy automatically routes them:
consumer = client.subscribe(
"persistent://public/default/orders",
subscription_name="order-processor",
consumer_type=pulsar.ConsumerType.Shared,
dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(
max_redeliver_count=3,
dead_letter_topic="persistent://public/default/orders-dlq",
),
negative_ack_redelivery_delay_ms=5000,
)
while True:
msg = consumer.receive(timeout_millis=5000)
try:
process(msg.data())
consumer.acknowledge(msg)
except TransientError:
consumer.negative_acknowledge(msg) # Retry after delay
After three negative acknowledgments, the message moves to the DLQ topic. A separate consumer monitors the DLQ for alerting and manual reprocessing.
Reader API for Replay
The Reader API provides direct access to a topic without consumer group management — ideal for rebuilding state or auditing:
reader = client.create_reader(
"persistent://public/default/orders",
start_message_id=pulsar.MessageId.earliest,
)
while reader.has_message_available():
msg = reader.read_next(timeout_millis=1000)
rebuild_state(msg.data())
Unlike consumers, readers do not track subscriptions or acknowledgments. They simply read from a position.
Exactly-Once Processing
Pulsar supports transactions that span multiple topics and subscriptions:
txn = client.new_transaction(timeout_ms=30000)
try:
# Consume from input
msg = consumer.receive()
result = transform(msg.data())
# Produce to output within the same transaction
producer.send(result.encode(), txn=txn)
consumer.acknowledge(msg, txn=txn)
txn.commit()
except Exception:
txn.abort()
The transaction coordinator ensures that the consume acknowledgment and the produce happen atomically. If the process crashes between them, the transaction times out and both operations roll back.
Schema Evolution
Pulsar’s built-in schema registry supports Avro, Protobuf, and JSON schemas with compatibility enforcement:
- BACKWARD — new schema can read data written with the previous schema.
- FORWARD — old schema can read data written with the new schema.
- FULL — both directions work.
# Version 1
class OrderV1(pulsar.schema.Record):
order_id = pulsar.schema.String()
amount = pulsar.schema.Float()
# Version 2 — adds a nullable field (backward compatible)
class OrderV2(pulsar.schema.Record):
order_id = pulsar.schema.String()
amount = pulsar.schema.Float()
currency = pulsar.schema.String(default="USD")
The broker rejects schema changes that violate the configured compatibility level, protecting downstream consumers from breaking changes.
Operational Considerations
Monitoring
Key metrics to track:
- Publish rate and latency per topic — spikes in latency suggest broker or BookKeeper pressure.
- Subscription backlog — messages waiting to be consumed. Persistent growth means consumers cannot keep up.
- Storage usage per namespace — tiered storage offload reduces costs, but the offload lag indicates how quickly data moves to cold storage.
Pulsar exposes Prometheus metrics at /metrics on each broker and BookKeeper node.
Geo-Replication
Pulsar supports native geo-replication between clusters. Configuring it is an admin operation — Python producers and consumers do not change. Messages published in one cluster are automatically replicated to configured peer clusters, enabling disaster recovery and multi-region architectures.
Python Client Connection Patterns
For long-running services, create the Pulsar client once at startup and reuse it throughout the application lifetime. Creating a new client per request wastes TCP connections and authentication handshakes. Use a connection manager or dependency injection to share the client instance across your application’s producers and consumers.
Capacity Planning
A single Pulsar broker handles roughly 100K messages per second for small messages (< 1 KB). BookKeeper write throughput depends on disk I/O — SSD-backed bookies handle 50–80 MB/s per node. Plan partitions and bookies based on peak throughput, not average.
The one thing to remember: Pulsar’s decoupled architecture gives Python applications unique capabilities — dead-letter policies, transactional exactly-once processing, and transparent tiered storage — but production success requires understanding how brokers, BookKeeper, and the schema registry interact.
See Also
- Python Change Data Capture How Python watches database changes like a security camera, catching every insert, update, and delete the moment it happens.
- Python Faust Stream Processing How Faust lets Python programs process endless rivers of data in real time, like a factory assembly line that never stops.
- Python Kafka Consumers Understand Python Kafka consumers as organized listeners that read event streams without losing place in the line.
- Python Kafka Producers How Python programs send millions of messages into Kafka like a postal sorting machine that never sleeps.
- Ci Cd Why big apps can ship updates every day without turning your phone into a glitchy mess — CI/CD is the behind-the-scenes quality gate and delivery truck.