Python MQTT for IoT — Deep Dive
MQTT Protocol Internals
MQTT operates over TCP (port 1883) or TLS-encrypted TCP (port 8883). The protocol uses a binary wire format with a fixed header of just 2 bytes, making it extremely efficient for constrained networks.
Packet Types
The protocol defines 14 packet types:
- CONNECT/CONNACK — Client authentication and session establishment
- PUBLISH/PUBACK/PUBREC/PUBREL/PUBCOMP — Message delivery with QoS handshakes
- SUBSCRIBE/SUBACK — Topic subscription management
- UNSUBSCRIBE/UNSUBACK — Topic unsubscription
- PINGREQ/PINGRESP — Keep-alive heartbeat
- DISCONNECT — Graceful disconnection
The remaining bytes encode the payload length using a variable-length encoding scheme that can represent payloads up to 256 MB in 1-4 bytes.
MQTT 5.0 Enhancements
MQTT 5.0, finalized in 2019, adds significant features over 3.1.1:
- User properties — Key-value metadata on any packet
- Shared subscriptions — Load balance messages across subscriber groups
- Message expiry — Automatic cleanup of stale retained messages
- Topic aliases — Reduce bandwidth by replacing repeated topic strings with integers
- Flow control — Client-specified receive maximum prevents overwhelming slow consumers
- Reason codes — Detailed error information on every acknowledgment
Production Python Client Patterns
Secure Connection with TLS
import paho.mqtt.client as mqtt
import ssl
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id="sensor-001",
protocol=mqtt.MQTTv5)
# TLS with client certificate authentication
client.tls_set(
ca_certs="/etc/mqtt/ca.crt",
certfile="/etc/mqtt/client.crt",
keyfile="/etc/mqtt/client.key",
tls_version=ssl.PROTOCOL_TLSv1_2
)
# Or username/password over TLS
client.tls_set(ca_certs="/etc/mqtt/ca.crt")
client.username_pw_set("device_user", "secure_password")
client.connect("broker.example.com", 8883, keepalive=60)
Persistent Sessions and Clean Start
MQTT supports persistent sessions where the broker stores subscriptions and queued QoS 1/2 messages for disconnected clients:
# MQTT 5.0: clean_start=False preserves session
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id="sensor-001",
protocol=mqtt.MQTTv5)
properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
properties.SessionExpiryInterval = 3600 # keep session for 1 hour
client.connect("broker.example.com", 8883,
clean_start=False, properties=properties)
When the client reconnects, the broker delivers messages that arrived during the disconnection. This is critical for devices with intermittent connectivity.
Reconnection Strategy
Network interruptions are normal in IoT. Implement robust reconnection:
import time
import random
class MQTTReliableClient:
def __init__(self, broker, port=8883):
self.broker = broker
self.port = port
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
client_id="sensor-001")
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self._subscriptions = []
def _on_connect(self, client, userdata, flags, rc, properties=None):
if rc == 0:
print("Connected successfully")
# Re-subscribe on reconnect
for topic, qos in self._subscriptions:
client.subscribe(topic, qos)
else:
print(f"Connection failed: {rc}")
def _on_disconnect(self, client, userdata, flags, rc, properties=None):
if rc != 0:
print(f"Unexpected disconnect (rc={rc}), reconnecting...")
def _on_message(self, client, userdata, msg):
print(f"Received: {msg.topic} = {msg.payload.decode()}")
def subscribe(self, topic, qos=1):
self._subscriptions.append((topic, qos))
self.client.subscribe(topic, qos)
def start(self):
self.client.connect(self.broker, self.port, keepalive=30)
self.client.loop_forever(retry_first_connection=True)
The loop_forever method with retry_first_connection=True handles automatic reconnection with exponential backoff built into paho-mqtt.
Async MQTT with aiomqtt
For asyncio-based applications, aiomqtt wraps paho-mqtt with native async support:
import asyncio
import aiomqtt
import json
async def sensor_publisher():
async with aiomqtt.Client("broker.example.com") as client:
while True:
reading = {"temperature": 23.5, "humidity": 65}
await client.publish(
"home/livingroom/climate",
json.dumps(reading),
qos=1
)
await asyncio.sleep(30)
async def alert_subscriber():
async with aiomqtt.Client("broker.example.com") as client:
async with client.messages() as messages:
await client.subscribe("home/+/alert")
async for message in messages:
alert = json.loads(message.payload)
print(f"ALERT on {message.topic}: {alert}")
await send_notification(alert)
async def main():
await asyncio.gather(
sensor_publisher(),
alert_subscriber()
)
asyncio.run(main())
Topic Design Patterns
Well-designed topic structures make systems maintainable:
Hierarchical Device Topics
{org}/{site}/{area}/{device_type}/{device_id}/{data_type}
Example:
acme/factory-1/assembly/sensor/temp-042/temperature
acme/factory-1/assembly/sensor/temp-042/status
acme/factory-1/assembly/actuator/valve-007/command
acme/factory-1/assembly/actuator/valve-007/state
Command and Response Pattern
# Commands go down
devices/{device_id}/commands/set-interval
devices/{device_id}/commands/reboot
# Responses come up
devices/{device_id}/responses/set-interval
devices/{device_id}/responses/reboot
MQTT 5.0 Shared Subscriptions
Distribute processing across multiple workers:
# Three workers share the load
# Each message goes to exactly one worker
client.subscribe("$share/workers/telemetry/#", qos=1)
This enables horizontal scaling of message consumers without application-level load balancing.
Payload Serialization
JSON (Human-Readable, Common)
import json
payload = json.dumps({
"timestamp": 1711670400,
"temperature": 23.5,
"humidity": 65.2,
"battery": 85
})
# ~80 bytes
MessagePack (Compact Binary)
import msgpack
payload = msgpack.packb({
"timestamp": 1711670400,
"temperature": 23.5,
"humidity": 65.2,
"battery": 85
})
# ~45 bytes (44% smaller)
Protocol Buffers (Schema-Enforced)
# After compiling .proto file
from sensor_pb2 import SensorReading
reading = SensorReading()
reading.timestamp = 1711670400
reading.temperature = 23.5
reading.humidity = 65.2
reading.battery = 85
payload = reading.SerializeToString()
# ~20 bytes (75% smaller than JSON)
For bandwidth-constrained IoT devices, binary formats significantly reduce data usage.
Broker Architecture
Mosquitto Configuration for Production
# /etc/mosquitto/mosquitto.conf
# Listeners
listener 8883
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
cafile /etc/mosquitto/certs/ca.crt
require_certificate true
# Authentication
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
# Persistence
persistence true
persistence_location /var/lib/mosquitto/
# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
# Limits
max_connections 10000
max_inflight_messages 20
max_queued_messages 1000
message_size_limit 1048576
Access Control Lists
# /etc/mosquitto/acl
# Sensors can only publish to their own topics
user sensor-001
topic write home/livingroom/temperature
topic write home/livingroom/humidity
# Dashboards can read everything
user dashboard
topic read home/#
# Admin can do everything
user admin
topic readwrite #
Bridge Pattern
MQTT bridges connect brokers to create distributed architectures:
# Bridge local broker to cloud broker
connection cloud-bridge
address cloud-broker.example.com:8883
bridge_cafile /etc/mosquitto/cloud-ca.crt
bridge_certfile /etc/mosquitto/bridge.crt
bridge_keyfile /etc/mosquitto/bridge.key
# Forward local sensor data to cloud
topic home/# out 1
# Receive commands from cloud
topic commands/# in 1
This pattern keeps local MQTT traffic fast (LAN latency) while selectively forwarding data to cloud services.
Monitoring and Observability
Mosquitto exposes system topics under $SYS/:
async def monitor_broker():
async with aiomqtt.Client("localhost") as client:
await client.subscribe("$SYS/#")
async with client.messages() as messages:
async for message in messages:
topic = message.topic.value
if "connected" in topic or "bytes" in topic:
print(f"{topic}: {message.payload.decode()}")
Key metrics to track:
$SYS/broker/clients/connected— Current client count$SYS/broker/messages/received— Total messages received$SYS/broker/load/messages/received/1min— Message rate$SYS/broker/heap/current— Broker memory usage$SYS/broker/retained messages/count— Retained message count
For production systems, export these metrics to Prometheus using mqtt2prometheus or a custom exporter.
Testing MQTT Applications
import pytest
from unittest.mock import MagicMock, patch
import json
def test_message_handler():
"""Test that temperature alerts trigger notifications."""
handler = TemperatureAlertHandler(threshold=30.0)
mock_msg = MagicMock()
mock_msg.topic = "home/kitchen/temperature"
mock_msg.payload = json.dumps({"value": 35.0}).encode()
with patch.object(handler, 'send_alert') as mock_alert:
handler.on_message(None, None, mock_msg)
mock_alert.assert_called_once_with(
"kitchen", 35.0, "Temperature exceeds threshold"
)
For integration tests, use the Mosquitto test broker in a Docker container:
# docker-compose.test.yml
services:
mqtt-broker:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- ./mosquitto-test.conf:/mosquitto/config/mosquitto.conf
One thing to remember: MQTT’s simplicity is deceptive — production IoT messaging requires careful attention to topic design, security, session management, and payload efficiency to build systems that scale to thousands of devices reliably.
See Also
- Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
- Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
- Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
- Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
- Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.