Python MQTT for IoT — Deep Dive

MQTT Protocol Internals

MQTT operates over TCP (port 1883) or TLS-encrypted TCP (port 8883). The protocol uses a binary wire format with a fixed header of just 2 bytes, making it extremely efficient for constrained networks.

Packet Types

The protocol defines 14 packet types:

  • CONNECT/CONNACK — Client authentication and session establishment
  • PUBLISH/PUBACK/PUBREC/PUBREL/PUBCOMP — Message delivery with QoS handshakes
  • SUBSCRIBE/SUBACK — Topic subscription management
  • UNSUBSCRIBE/UNSUBACK — Topic unsubscription
  • PINGREQ/PINGRESP — Keep-alive heartbeat
  • DISCONNECT — Graceful disconnection

The remaining bytes encode the payload length using a variable-length encoding scheme that can represent payloads up to 256 MB in 1-4 bytes.

MQTT 5.0 Enhancements

MQTT 5.0, finalized in 2019, adds significant features over 3.1.1:

  • User properties — Key-value metadata on any packet
  • Shared subscriptions — Load balance messages across subscriber groups
  • Message expiry — Automatic cleanup of stale retained messages
  • Topic aliases — Reduce bandwidth by replacing repeated topic strings with integers
  • Flow control — Client-specified receive maximum prevents overwhelming slow consumers
  • Reason codes — Detailed error information on every acknowledgment

Production Python Client Patterns

Secure Connection with TLS

import paho.mqtt.client as mqtt
import ssl

client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
                     client_id="sensor-001",
                     protocol=mqtt.MQTTv5)

# TLS with client certificate authentication
client.tls_set(
    ca_certs="/etc/mqtt/ca.crt",
    certfile="/etc/mqtt/client.crt",
    keyfile="/etc/mqtt/client.key",
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# Or username/password over TLS
client.tls_set(ca_certs="/etc/mqtt/ca.crt")
client.username_pw_set("device_user", "secure_password")

client.connect("broker.example.com", 8883, keepalive=60)

Persistent Sessions and Clean Start

MQTT supports persistent sessions where the broker stores subscriptions and queued QoS 1/2 messages for disconnected clients:

# MQTT 5.0: clean_start=False preserves session
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
                     client_id="sensor-001",
                     protocol=mqtt.MQTTv5)

properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
properties.SessionExpiryInterval = 3600  # keep session for 1 hour

client.connect("broker.example.com", 8883,
               clean_start=False, properties=properties)

When the client reconnects, the broker delivers messages that arrived during the disconnection. This is critical for devices with intermittent connectivity.

Reconnection Strategy

Network interruptions are normal in IoT. Implement robust reconnection:

import time
import random

class MQTTReliableClient:
    def __init__(self, broker, port=8883):
        self.broker = broker
        self.port = port
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,
                                  client_id="sensor-001")
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect
        self.client.on_message = self._on_message
        self._subscriptions = []
    
    def _on_connect(self, client, userdata, flags, rc, properties=None):
        if rc == 0:
            print("Connected successfully")
            # Re-subscribe on reconnect
            for topic, qos in self._subscriptions:
                client.subscribe(topic, qos)
        else:
            print(f"Connection failed: {rc}")
    
    def _on_disconnect(self, client, userdata, flags, rc, properties=None):
        if rc != 0:
            print(f"Unexpected disconnect (rc={rc}), reconnecting...")
    
    def _on_message(self, client, userdata, msg):
        print(f"Received: {msg.topic} = {msg.payload.decode()}")
    
    def subscribe(self, topic, qos=1):
        self._subscriptions.append((topic, qos))
        self.client.subscribe(topic, qos)
    
    def start(self):
        self.client.connect(self.broker, self.port, keepalive=30)
        self.client.loop_forever(retry_first_connection=True)

The loop_forever method with retry_first_connection=True handles automatic reconnection with exponential backoff built into paho-mqtt.

Async MQTT with aiomqtt

For asyncio-based applications, aiomqtt wraps paho-mqtt with native async support:

import asyncio
import aiomqtt
import json

async def sensor_publisher():
    async with aiomqtt.Client("broker.example.com") as client:
        while True:
            reading = {"temperature": 23.5, "humidity": 65}
            await client.publish(
                "home/livingroom/climate",
                json.dumps(reading),
                qos=1
            )
            await asyncio.sleep(30)

async def alert_subscriber():
    async with aiomqtt.Client("broker.example.com") as client:
        async with client.messages() as messages:
            await client.subscribe("home/+/alert")
            async for message in messages:
                alert = json.loads(message.payload)
                print(f"ALERT on {message.topic}: {alert}")
                await send_notification(alert)

async def main():
    await asyncio.gather(
        sensor_publisher(),
        alert_subscriber()
    )

asyncio.run(main())

Topic Design Patterns

Well-designed topic structures make systems maintainable:

Hierarchical Device Topics

{org}/{site}/{area}/{device_type}/{device_id}/{data_type}

Example:
acme/factory-1/assembly/sensor/temp-042/temperature
acme/factory-1/assembly/sensor/temp-042/status
acme/factory-1/assembly/actuator/valve-007/command
acme/factory-1/assembly/actuator/valve-007/state

Command and Response Pattern

# Commands go down
devices/{device_id}/commands/set-interval
devices/{device_id}/commands/reboot

# Responses come up
devices/{device_id}/responses/set-interval
devices/{device_id}/responses/reboot

MQTT 5.0 Shared Subscriptions

Distribute processing across multiple workers:

# Three workers share the load
# Each message goes to exactly one worker
client.subscribe("$share/workers/telemetry/#", qos=1)

This enables horizontal scaling of message consumers without application-level load balancing.

Payload Serialization

JSON (Human-Readable, Common)

import json

payload = json.dumps({
    "timestamp": 1711670400,
    "temperature": 23.5,
    "humidity": 65.2,
    "battery": 85
})
# ~80 bytes

MessagePack (Compact Binary)

import msgpack

payload = msgpack.packb({
    "timestamp": 1711670400,
    "temperature": 23.5,
    "humidity": 65.2,
    "battery": 85
})
# ~45 bytes (44% smaller)

Protocol Buffers (Schema-Enforced)

# After compiling .proto file
from sensor_pb2 import SensorReading

reading = SensorReading()
reading.timestamp = 1711670400
reading.temperature = 23.5
reading.humidity = 65.2
reading.battery = 85

payload = reading.SerializeToString()
# ~20 bytes (75% smaller than JSON)

For bandwidth-constrained IoT devices, binary formats significantly reduce data usage.

Broker Architecture

Mosquitto Configuration for Production

# /etc/mosquitto/mosquitto.conf

# Listeners
listener 8883
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
cafile /etc/mosquitto/certs/ca.crt
require_certificate true

# Authentication
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl

# Persistence
persistence true
persistence_location /var/lib/mosquitto/

# Logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type all

# Limits
max_connections 10000
max_inflight_messages 20
max_queued_messages 1000
message_size_limit 1048576

Access Control Lists

# /etc/mosquitto/acl

# Sensors can only publish to their own topics
user sensor-001
topic write home/livingroom/temperature
topic write home/livingroom/humidity

# Dashboards can read everything
user dashboard
topic read home/#

# Admin can do everything
user admin
topic readwrite #

Bridge Pattern

MQTT bridges connect brokers to create distributed architectures:

# Bridge local broker to cloud broker
connection cloud-bridge
address cloud-broker.example.com:8883
bridge_cafile /etc/mosquitto/cloud-ca.crt
bridge_certfile /etc/mosquitto/bridge.crt
bridge_keyfile /etc/mosquitto/bridge.key

# Forward local sensor data to cloud
topic home/# out 1
# Receive commands from cloud
topic commands/# in 1

This pattern keeps local MQTT traffic fast (LAN latency) while selectively forwarding data to cloud services.

Monitoring and Observability

Mosquitto exposes system topics under $SYS/:

async def monitor_broker():
    async with aiomqtt.Client("localhost") as client:
        await client.subscribe("$SYS/#")
        async with client.messages() as messages:
            async for message in messages:
                topic = message.topic.value
                if "connected" in topic or "bytes" in topic:
                    print(f"{topic}: {message.payload.decode()}")

Key metrics to track:

  • $SYS/broker/clients/connected — Current client count
  • $SYS/broker/messages/received — Total messages received
  • $SYS/broker/load/messages/received/1min — Message rate
  • $SYS/broker/heap/current — Broker memory usage
  • $SYS/broker/retained messages/count — Retained message count

For production systems, export these metrics to Prometheus using mqtt2prometheus or a custom exporter.

Testing MQTT Applications

import pytest
from unittest.mock import MagicMock, patch
import json

def test_message_handler():
    """Test that temperature alerts trigger notifications."""
    handler = TemperatureAlertHandler(threshold=30.0)
    
    mock_msg = MagicMock()
    mock_msg.topic = "home/kitchen/temperature"
    mock_msg.payload = json.dumps({"value": 35.0}).encode()
    
    with patch.object(handler, 'send_alert') as mock_alert:
        handler.on_message(None, None, mock_msg)
        mock_alert.assert_called_once_with(
            "kitchen", 35.0, "Temperature exceeds threshold"
        )

For integration tests, use the Mosquitto test broker in a Docker container:

# docker-compose.test.yml
services:
  mqtt-broker:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"
    volumes:
      - ./mosquitto-test.conf:/mosquitto/config/mosquitto.conf

One thing to remember: MQTT’s simplicity is deceptive — production IoT messaging requires careful attention to topic design, security, session management, and payload efficiency to build systems that scale to thousands of devices reliably.

pythonmqttiotnetworking

See Also

  • Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
  • Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
  • Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
  • Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
  • Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.