Python Serial Communication — Deep Dive

Serial Port Internals

At the OS level, serial ports are represented as character devices. On Linux, USB-to-serial adapters appear as /dev/ttyUSB0 or /dev/ttyACM0. The kernel driver converts USB packets to and from the serial byte stream.

The pyserial library abstracts platform differences, but understanding the layers helps debug issues:

Python (pyserial) → OS serial API → USB driver → USB-to-UART chip → UART signals

The USB-to-UART chip (commonly FTDI FT232, CP2102, or CH340) has internal buffers — typically 64 to 512 bytes for receive and transmit. If your Python code reads too slowly, these buffers overflow and data is lost.

Buffer Management

import serial

ser = serial.Serial('/dev/ttyUSB0', 115200)

# Check buffer status
print(f"Bytes waiting to be read: {ser.in_waiting}")
print(f"Bytes waiting to be sent: {ser.out_waiting}")

# Flush buffers
ser.reset_input_buffer()   # discard unread received data
ser.reset_output_buffer()  # discard unsent data

# Wait for all data to be transmitted
ser.flush()  # blocks until output buffer is empty

Binary Protocol Design

Text-based protocols (sending “TEMP=23.5\n”) are simple but wasteful. Binary protocols pack data tightly, include error detection, and handle framing reliably.

Packet Structure

A typical binary protocol:

[START] [LENGTH] [COMMAND] [PAYLOAD...] [CRC] [END]
  0xAA    1 byte   1 byte   N bytes    2 bytes  0x55

Implementation

import struct
import crcmod

# CRC-16/MODBUS
crc16 = crcmod.predefined.mkCrcFun('modbus')

START_BYTE = 0xAA
END_BYTE = 0x55

def build_packet(command: int, payload: bytes) -> bytes:
    length = len(payload)
    frame = struct.pack('BBB', START_BYTE, length, command) + payload
    checksum = crc16(frame[1:])  # CRC excludes start byte
    return frame + struct.pack('<H', checksum) + bytes([END_BYTE])

def parse_packet(data: bytes) -> tuple:
    if data[0] != START_BYTE or data[-1] != END_BYTE:
        raise ValueError("Invalid frame markers")
    
    length = data[1]
    command = data[2]
    payload = data[3:3 + length]
    received_crc = struct.unpack('<H', data[3 + length:5 + length])[0]
    
    computed_crc = crc16(data[1:3 + length])
    if received_crc != computed_crc:
        raise ValueError(f"CRC mismatch: {received_crc:#06x} != {computed_crc:#06x}")
    
    return command, payload

# Send a sensor reading command
packet = build_packet(0x01, b'')
ser.write(packet)

# Read response
response = read_framed_packet(ser)
cmd, payload = parse_packet(response)
temperature = struct.unpack('<f', payload)[0]

Frame Synchronization

The receiver must find packet boundaries in a continuous byte stream. A state machine approach handles this robustly:

class PacketReader:
    def __init__(self):
        self.buffer = bytearray()
        self.state = 'WAIT_START'
        self.expected_length = 0
    
    def feed(self, data: bytes) -> list:
        packets = []
        for byte in data:
            if self.state == 'WAIT_START':
                if byte == START_BYTE:
                    self.buffer = bytearray([byte])
                    self.state = 'READ_LENGTH'
            
            elif self.state == 'READ_LENGTH':
                self.buffer.append(byte)
                self.expected_length = byte
                self.state = 'READ_BODY'
            
            elif self.state == 'READ_BODY':
                self.buffer.append(byte)
                # command(1) + payload(N) + crc(2) + end(1)
                remaining = 1 + self.expected_length + 2 + 1
                if len(self.buffer) >= 2 + remaining:
                    if byte == END_BYTE:
                        try:
                            packets.append(parse_packet(bytes(self.buffer)))
                        except ValueError:
                            pass  # corrupted packet
                    self.state = 'WAIT_START'
                    self.buffer = bytearray()
        
        return packets

Asynchronous Serial I/O

For applications that cannot block on serial reads, use threading or asyncio.

Threading Approach

import serial
import threading
import queue

class SerialWorker:
    def __init__(self, port, baudrate=115200):
        self.ser = serial.Serial(port, baudrate, timeout=0.1)
        self.rx_queue = queue.Queue()
        self.tx_queue = queue.Queue()
        self.running = True
    
    def start(self):
        self.rx_thread = threading.Thread(target=self._reader, daemon=True)
        self.tx_thread = threading.Thread(target=self._writer, daemon=True)
        self.rx_thread.start()
        self.tx_thread.start()
    
    def _reader(self):
        reader = PacketReader()
        while self.running:
            if self.ser.in_waiting:
                data = self.ser.read(self.ser.in_waiting)
                for packet in reader.feed(data):
                    self.rx_queue.put(packet)
    
    def _writer(self):
        while self.running:
            try:
                data = self.tx_queue.get(timeout=0.1)
                self.ser.write(data)
            except queue.Empty:
                pass
    
    def send(self, command, payload=b''):
        packet = build_packet(command, payload)
        self.tx_queue.put(packet)
    
    def receive(self, timeout=1.0):
        try:
            return self.rx_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def stop(self):
        self.running = False
        self.ser.close()

asyncio with pyserial-asyncio

import asyncio
import serial_asyncio

class SerialProtocol(asyncio.Protocol):
    def __init__(self):
        self.buffer = bytearray()
        self.transport = None
        self.packet_received = asyncio.Event()
        self.last_packet = None
    
    def connection_made(self, transport):
        self.transport = transport
        print(f"Connected to {transport.serial.name}")
    
    def data_received(self, data):
        self.buffer.extend(data)
        # Process complete lines
        while b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            self.last_packet = line.decode().strip()
            self.packet_received.set()
    
    def connection_lost(self, exc):
        print(f"Connection lost: {exc}")

async def main():
    loop = asyncio.get_event_loop()
    transport, protocol = await serial_asyncio.create_serial_connection(
        loop, SerialProtocol, '/dev/ttyUSB0', baudrate=115200
    )
    
    # Send command
    transport.write(b'READ_SENSORS\n')
    
    # Wait for response
    await asyncio.wait_for(protocol.packet_received.wait(), timeout=5.0)
    print(f"Response: {protocol.last_packet}")

asyncio.run(main())

Multi-Device Management

When controlling multiple serial devices simultaneously:

import serial
from serial.tools import list_ports
from dataclasses import dataclass

@dataclass
class DeviceConfig:
    vid: int   # USB Vendor ID
    pid: int   # USB Product ID
    baudrate: int
    name: str

KNOWN_DEVICES = [
    DeviceConfig(0x2341, 0x0043, 9600, "Arduino Uno"),
    DeviceConfig(0x10C4, 0xEA60, 115200, "CP2102 Sensor Hub"),
    DeviceConfig(0x067B, 0x2303, 9600, "PL2303 GPS Module"),
]

def discover_devices():
    connections = {}
    for port in list_ports.comports():
        for device in KNOWN_DEVICES:
            if port.vid == device.vid and port.pid == device.pid:
                ser = serial.Serial(port.device, device.baudrate, timeout=1)
                connections[device.name] = ser
                print(f"Found {device.name} on {port.device}")
    return connections

devices = discover_devices()
if "Arduino Uno" in devices:
    devices["Arduino Uno"].write(b'STATUS\n')

RS-485 and Modbus

Industrial equipment often uses RS-485 (multi-drop bus) with the Modbus protocol:

from pymodbus.client import ModbusSerialClient

client = ModbusSerialClient(
    port='/dev/ttyUSB0',
    baudrate=9600,
    parity='N',
    stopbits=1,
    bytesize=8,
    timeout=1
)

client.connect()

# Read holding registers from device at address 1
result = client.read_holding_registers(address=0, count=10, slave=1)
if not result.isError():
    print(f"Registers: {result.registers}")

# Write a single register
client.write_register(address=0, value=100, slave=1)

client.close()

RS-485 supports up to 32 devices on a single bus, each with a unique address. Direction control (transmit vs receive) is handled automatically by most USB-to-RS-485 adapters.

Error Handling and Recovery

Production serial code must handle every failure mode:

import serial
import time
import logging

logger = logging.getLogger(__name__)

class RobustSerialConnection:
    def __init__(self, port, baudrate, max_retries=5):
        self.port = port
        self.baudrate = baudrate
        self.max_retries = max_retries
        self.ser = None
    
    def connect(self):
        for attempt in range(self.max_retries):
            try:
                self.ser = serial.Serial(
                    self.port, self.baudrate,
                    timeout=1, write_timeout=1
                )
                time.sleep(2)  # device startup
                logger.info(f"Connected to {self.port}")
                return True
            except serial.SerialException as e:
                wait = min(2 ** attempt, 30)
                logger.warning(f"Attempt {attempt + 1} failed: {e}. "
                             f"Retrying in {wait}s")
                time.sleep(wait)
        return False
    
    def send_command(self, command: bytes, expect_response=True):
        try:
            self.ser.reset_input_buffer()
            self.ser.write(command)
            self.ser.flush()
            
            if expect_response:
                response = self.ser.readline()
                if not response:
                    raise serial.SerialTimeoutException("No response")
                return response.decode().strip()
        except (serial.SerialException, OSError) as e:
            logger.error(f"Communication error: {e}")
            self.reconnect()
            raise
    
    def reconnect(self):
        logger.info("Attempting reconnection...")
        try:
            if self.ser:
                self.ser.close()
        except Exception:
            pass
        time.sleep(1)
        self.connect()

Performance Optimization

Batch Reads

Reading one byte at a time is slow due to system call overhead:

# Slow: one syscall per byte
while True:
    byte = ser.read(1)
    process(byte)

# Fast: read all available data
while True:
    if ser.in_waiting:
        data = ser.read(ser.in_waiting)
        process_chunk(data)
    else:
        time.sleep(0.001)

Baud Rate Selection

Higher baud rates reduce transmission time but increase susceptibility to noise:

Baud RateTime for 1 KBTypical Use
96001.07 secondsSimple sensors, GPS NMEA
576000.18 secondsData logging
1152000.09 secondsGeneral purpose
9216000.011 secondsHigh-speed data transfer
1000000+<0.01 secondsESP32, modern MCUs

For USB-serial adapters, baud rates up to 1 Mbps typically work reliably. Beyond that, cable quality and adapter chipset matter significantly.

One thing to remember: Reliable serial communication is about more than just reading and writing bytes — it requires proper framing, error detection, timeout handling, and reconnection logic to build systems that work in the real world where cables get unplugged and devices reset unexpectedly.

pythonserialhardwarecommunication

See Also

  • Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
  • Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
  • Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
  • Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
  • Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.