Python Serial Communication — Deep Dive
Serial Port Internals
At the OS level, serial ports are represented as character devices. On Linux, USB-to-serial adapters appear as /dev/ttyUSB0 or /dev/ttyACM0. The kernel driver converts USB packets to and from the serial byte stream.
The pyserial library abstracts platform differences, but understanding the layers helps debug issues:
Python (pyserial) → OS serial API → USB driver → USB-to-UART chip → UART signals
The USB-to-UART chip (commonly FTDI FT232, CP2102, or CH340) has internal buffers — typically 64 to 512 bytes for receive and transmit. If your Python code reads too slowly, these buffers overflow and data is lost.
Buffer Management
import serial
ser = serial.Serial('/dev/ttyUSB0', 115200)
# Check buffer status
print(f"Bytes waiting to be read: {ser.in_waiting}")
print(f"Bytes waiting to be sent: {ser.out_waiting}")
# Flush buffers
ser.reset_input_buffer() # discard unread received data
ser.reset_output_buffer() # discard unsent data
# Wait for all data to be transmitted
ser.flush() # blocks until output buffer is empty
Binary Protocol Design
Text-based protocols (sending “TEMP=23.5\n”) are simple but wasteful. Binary protocols pack data tightly, include error detection, and handle framing reliably.
Packet Structure
A typical binary protocol:
[START] [LENGTH] [COMMAND] [PAYLOAD...] [CRC] [END]
0xAA 1 byte 1 byte N bytes 2 bytes 0x55
Implementation
import struct
import crcmod
# CRC-16/MODBUS
crc16 = crcmod.predefined.mkCrcFun('modbus')
START_BYTE = 0xAA
END_BYTE = 0x55
def build_packet(command: int, payload: bytes) -> bytes:
length = len(payload)
frame = struct.pack('BBB', START_BYTE, length, command) + payload
checksum = crc16(frame[1:]) # CRC excludes start byte
return frame + struct.pack('<H', checksum) + bytes([END_BYTE])
def parse_packet(data: bytes) -> tuple:
if data[0] != START_BYTE or data[-1] != END_BYTE:
raise ValueError("Invalid frame markers")
length = data[1]
command = data[2]
payload = data[3:3 + length]
received_crc = struct.unpack('<H', data[3 + length:5 + length])[0]
computed_crc = crc16(data[1:3 + length])
if received_crc != computed_crc:
raise ValueError(f"CRC mismatch: {received_crc:#06x} != {computed_crc:#06x}")
return command, payload
# Send a sensor reading command
packet = build_packet(0x01, b'')
ser.write(packet)
# Read response
response = read_framed_packet(ser)
cmd, payload = parse_packet(response)
temperature = struct.unpack('<f', payload)[0]
Frame Synchronization
The receiver must find packet boundaries in a continuous byte stream. A state machine approach handles this robustly:
class PacketReader:
def __init__(self):
self.buffer = bytearray()
self.state = 'WAIT_START'
self.expected_length = 0
def feed(self, data: bytes) -> list:
packets = []
for byte in data:
if self.state == 'WAIT_START':
if byte == START_BYTE:
self.buffer = bytearray([byte])
self.state = 'READ_LENGTH'
elif self.state == 'READ_LENGTH':
self.buffer.append(byte)
self.expected_length = byte
self.state = 'READ_BODY'
elif self.state == 'READ_BODY':
self.buffer.append(byte)
# command(1) + payload(N) + crc(2) + end(1)
remaining = 1 + self.expected_length + 2 + 1
if len(self.buffer) >= 2 + remaining:
if byte == END_BYTE:
try:
packets.append(parse_packet(bytes(self.buffer)))
except ValueError:
pass # corrupted packet
self.state = 'WAIT_START'
self.buffer = bytearray()
return packets
Asynchronous Serial I/O
For applications that cannot block on serial reads, use threading or asyncio.
Threading Approach
import serial
import threading
import queue
class SerialWorker:
def __init__(self, port, baudrate=115200):
self.ser = serial.Serial(port, baudrate, timeout=0.1)
self.rx_queue = queue.Queue()
self.tx_queue = queue.Queue()
self.running = True
def start(self):
self.rx_thread = threading.Thread(target=self._reader, daemon=True)
self.tx_thread = threading.Thread(target=self._writer, daemon=True)
self.rx_thread.start()
self.tx_thread.start()
def _reader(self):
reader = PacketReader()
while self.running:
if self.ser.in_waiting:
data = self.ser.read(self.ser.in_waiting)
for packet in reader.feed(data):
self.rx_queue.put(packet)
def _writer(self):
while self.running:
try:
data = self.tx_queue.get(timeout=0.1)
self.ser.write(data)
except queue.Empty:
pass
def send(self, command, payload=b''):
packet = build_packet(command, payload)
self.tx_queue.put(packet)
def receive(self, timeout=1.0):
try:
return self.rx_queue.get(timeout=timeout)
except queue.Empty:
return None
def stop(self):
self.running = False
self.ser.close()
asyncio with pyserial-asyncio
import asyncio
import serial_asyncio
class SerialProtocol(asyncio.Protocol):
def __init__(self):
self.buffer = bytearray()
self.transport = None
self.packet_received = asyncio.Event()
self.last_packet = None
def connection_made(self, transport):
self.transport = transport
print(f"Connected to {transport.serial.name}")
def data_received(self, data):
self.buffer.extend(data)
# Process complete lines
while b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
self.last_packet = line.decode().strip()
self.packet_received.set()
def connection_lost(self, exc):
print(f"Connection lost: {exc}")
async def main():
loop = asyncio.get_event_loop()
transport, protocol = await serial_asyncio.create_serial_connection(
loop, SerialProtocol, '/dev/ttyUSB0', baudrate=115200
)
# Send command
transport.write(b'READ_SENSORS\n')
# Wait for response
await asyncio.wait_for(protocol.packet_received.wait(), timeout=5.0)
print(f"Response: {protocol.last_packet}")
asyncio.run(main())
Multi-Device Management
When controlling multiple serial devices simultaneously:
import serial
from serial.tools import list_ports
from dataclasses import dataclass
@dataclass
class DeviceConfig:
vid: int # USB Vendor ID
pid: int # USB Product ID
baudrate: int
name: str
KNOWN_DEVICES = [
DeviceConfig(0x2341, 0x0043, 9600, "Arduino Uno"),
DeviceConfig(0x10C4, 0xEA60, 115200, "CP2102 Sensor Hub"),
DeviceConfig(0x067B, 0x2303, 9600, "PL2303 GPS Module"),
]
def discover_devices():
connections = {}
for port in list_ports.comports():
for device in KNOWN_DEVICES:
if port.vid == device.vid and port.pid == device.pid:
ser = serial.Serial(port.device, device.baudrate, timeout=1)
connections[device.name] = ser
print(f"Found {device.name} on {port.device}")
return connections
devices = discover_devices()
if "Arduino Uno" in devices:
devices["Arduino Uno"].write(b'STATUS\n')
RS-485 and Modbus
Industrial equipment often uses RS-485 (multi-drop bus) with the Modbus protocol:
from pymodbus.client import ModbusSerialClient
client = ModbusSerialClient(
port='/dev/ttyUSB0',
baudrate=9600,
parity='N',
stopbits=1,
bytesize=8,
timeout=1
)
client.connect()
# Read holding registers from device at address 1
result = client.read_holding_registers(address=0, count=10, slave=1)
if not result.isError():
print(f"Registers: {result.registers}")
# Write a single register
client.write_register(address=0, value=100, slave=1)
client.close()
RS-485 supports up to 32 devices on a single bus, each with a unique address. Direction control (transmit vs receive) is handled automatically by most USB-to-RS-485 adapters.
Error Handling and Recovery
Production serial code must handle every failure mode:
import serial
import time
import logging
logger = logging.getLogger(__name__)
class RobustSerialConnection:
def __init__(self, port, baudrate, max_retries=5):
self.port = port
self.baudrate = baudrate
self.max_retries = max_retries
self.ser = None
def connect(self):
for attempt in range(self.max_retries):
try:
self.ser = serial.Serial(
self.port, self.baudrate,
timeout=1, write_timeout=1
)
time.sleep(2) # device startup
logger.info(f"Connected to {self.port}")
return True
except serial.SerialException as e:
wait = min(2 ** attempt, 30)
logger.warning(f"Attempt {attempt + 1} failed: {e}. "
f"Retrying in {wait}s")
time.sleep(wait)
return False
def send_command(self, command: bytes, expect_response=True):
try:
self.ser.reset_input_buffer()
self.ser.write(command)
self.ser.flush()
if expect_response:
response = self.ser.readline()
if not response:
raise serial.SerialTimeoutException("No response")
return response.decode().strip()
except (serial.SerialException, OSError) as e:
logger.error(f"Communication error: {e}")
self.reconnect()
raise
def reconnect(self):
logger.info("Attempting reconnection...")
try:
if self.ser:
self.ser.close()
except Exception:
pass
time.sleep(1)
self.connect()
Performance Optimization
Batch Reads
Reading one byte at a time is slow due to system call overhead:
# Slow: one syscall per byte
while True:
byte = ser.read(1)
process(byte)
# Fast: read all available data
while True:
if ser.in_waiting:
data = ser.read(ser.in_waiting)
process_chunk(data)
else:
time.sleep(0.001)
Baud Rate Selection
Higher baud rates reduce transmission time but increase susceptibility to noise:
| Baud Rate | Time for 1 KB | Typical Use |
|---|---|---|
| 9600 | 1.07 seconds | Simple sensors, GPS NMEA |
| 57600 | 0.18 seconds | Data logging |
| 115200 | 0.09 seconds | General purpose |
| 921600 | 0.011 seconds | High-speed data transfer |
| 1000000+ | <0.01 seconds | ESP32, modern MCUs |
For USB-serial adapters, baud rates up to 1 Mbps typically work reliably. Beyond that, cable quality and adapter chipset matter significantly.
One thing to remember: Reliable serial communication is about more than just reading and writing bytes — it requires proper framing, error detection, timeout handling, and reconnection logic to build systems that work in the real world where cables get unplugged and devices reset unexpectedly.
See Also
- Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
- Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
- Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
- Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
- Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.