Python Bluetooth Low Energy (BLE) — Deep Dive

BLE Protocol Stack

The BLE protocol stack has three main layers:

Physical Layer — Operates in the 2.4 GHz ISM band, using 40 channels with 2 MHz spacing. Three channels (37, 38, 39) are dedicated to advertising; the remaining 37 are data channels. Frequency hopping across data channels reduces interference.

Link Layer — Manages connections, advertising, and scanning. Defines connection parameters: connection interval (7.5 ms to 4 seconds), slave latency (how many intervals a peripheral can skip), and supervision timeout (disconnection detection).

GATT/ATT Layer — The Attribute Protocol (ATT) provides the read/write primitives. GATT builds the service/characteristic hierarchy on top of ATT. Applications interact primarily with GATT.

Connection Parameters and Performance

Connection parameters directly affect throughput and power consumption:

# bleak doesn't expose connection parameter negotiation directly,
# but understanding them helps debug performance issues:

# Connection Interval: 7.5ms = fast but power-hungry
#                      4000ms = slow but power-efficient
# Typical: 15-30ms for interactive, 100-500ms for sensors

# MTU (Maximum Transmission Unit): default 23 bytes, negotiate up to 517
# Larger MTU = fewer round trips for big data transfers

Advanced Scanning Patterns

Filtered Scanning

from bleak import BleakScanner

async def scan_for_specific_devices():
    # Filter by service UUID
    devices = await BleakScanner.discover(
        timeout=10.0,
        service_uuids=["0000181a-0000-1000-8000-00805f9b34fb"]  # Environmental Sensing
    )
    
    for device in devices:
        print(f"Environmental sensor: {device.name} at {device.address}")

asyncio.run(scan_for_specific_devices())

Continuous Scanning with Callbacks

import asyncio
from bleak import BleakScanner
from collections import defaultdict

class DeviceTracker:
    def __init__(self):
        self.devices = {}
        self.rssi_history = defaultdict(list)
    
    def callback(self, device, advertisement_data):
        self.devices[device.address] = {
            'name': advertisement_data.local_name or device.name,
            'rssi': advertisement_data.rssi,
            'services': advertisement_data.service_uuids,
            'last_seen': asyncio.get_event_loop().time()
        }
        self.rssi_history[device.address].append(advertisement_data.rssi)
    
    def get_nearest(self, n=5):
        sorted_devices = sorted(
            self.devices.items(),
            key=lambda x: x[1]['rssi'],
            reverse=True
        )
        return sorted_devices[:n]
    
    def estimate_distance(self, address, tx_power=-59):
        """Rough distance estimate from RSSI (very approximate)."""
        rssi_values = self.rssi_history.get(address, [])
        if not rssi_values:
            return None
        avg_rssi = sum(rssi_values[-10:]) / len(rssi_values[-10:])
        ratio = (tx_power - avg_rssi) / (10 * 2.0)  # path loss exponent ~2
        return 10 ** ratio

async def track_devices():
    tracker = DeviceTracker()
    scanner = BleakScanner(detection_callback=tracker.callback)
    
    await scanner.start()
    
    for _ in range(30):  # 30 seconds
        await asyncio.sleep(1)
        nearest = tracker.get_nearest(3)
        for addr, info in nearest:
            print(f"  {info['name'] or addr}: RSSI={info['rssi']}")
    
    await scanner.stop()

asyncio.run(track_devices())

Write Operations

Writing to Characteristics

from bleak import BleakClient

async def control_device():
    async with BleakClient("AA:BB:CC:DD:EE:FF") as client:
        # Write with response (acknowledged)
        await client.write_gatt_char(
            "0000ff01-0000-1000-8000-00805f9b34fb",
            bytearray([0x01, 0xFF, 0x00]),  # command bytes
            response=True
        )
        
        # Write without response (faster, fire-and-forget)
        await client.write_gatt_char(
            "0000ff02-0000-1000-8000-00805f9b34fb",
            bytearray([0x00, 0x80]),
            response=False
        )

asyncio.run(control_device())

Controlling an LED Strip via BLE

async def set_led_color(address, r, g, b):
    SERVICE_UUID = "0000fff0-0000-1000-8000-00805f9b34fb"
    CHAR_UUID = "0000fff3-0000-1000-8000-00805f9b34fb"
    
    async with BleakClient(address) as client:
        # Common BLE LED controller protocol
        command = bytearray([0x56, r, g, b, 0x00, 0xF0, 0xAA])
        await client.write_gatt_char(CHAR_UUID, command, response=False)

asyncio.run(set_led_color("AA:BB:CC:DD:EE:FF", 255, 0, 128))

Multi-Device Connection Management

Managing concurrent BLE connections requires careful resource management:

from bleak import BleakClient
import asyncio

class BLEDevicePool:
    def __init__(self, addresses: list, max_concurrent: int = 5):
        self.addresses = addresses
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = {}
    
    async def read_device(self, address: str, char_uuid: str):
        async with self.semaphore:
            try:
                async with BleakClient(address, timeout=10.0) as client:
                    value = await client.read_gatt_char(char_uuid)
                    self.results[address] = {
                        'value': value,
                        'status': 'ok'
                    }
            except Exception as e:
                self.results[address] = {
                    'value': None,
                    'status': f'error: {e}'
                }
    
    async def read_all(self, char_uuid: str):
        tasks = [
            self.read_device(addr, char_uuid)
            for addr in self.addresses
        ]
        await asyncio.gather(*tasks)
        return self.results

async def poll_all_sensors():
    sensors = [
        "AA:BB:CC:DD:EE:01",
        "AA:BB:CC:DD:EE:02",
        "AA:BB:CC:DD:EE:03",
    ]
    
    pool = BLEDevicePool(sensors, max_concurrent=3)
    temp_uuid = "00002a6e-0000-1000-8000-00805f9b34fb"
    
    results = await pool.read_all(temp_uuid)
    for addr, result in results.items():
        if result['status'] == 'ok':
            temp = int.from_bytes(result['value'], 'little') / 100
            print(f"{addr}: {temp}°C")
        else:
            print(f"{addr}: {result['status']}")

asyncio.run(poll_all_sensors())

BLE Peripheral Mode with Python

While bleak is central-only, you can create BLE peripherals using bless:

from bless import BlessServer, BlessGATTCharacteristic, GATTCharacteristicProperties
import asyncio
import struct

TEMP_SERVICE_UUID = "0000181a-0000-1000-8000-00805f9b34fb"
TEMP_CHAR_UUID = "00002a6e-0000-1000-8000-00805f9b34fb"

async def run_peripheral():
    server = BlessServer(name="PythonTempSensor")
    
    await server.add_new_service(TEMP_SERVICE_UUID)
    
    await server.add_new_characteristic(
        TEMP_SERVICE_UUID,
        TEMP_CHAR_UUID,
        GATTCharacteristicProperties.read | GATTCharacteristicProperties.notify,
        value=None,
        permissions=1  # readable
    )
    
    await server.start()
    print("BLE peripheral running...")
    
    try:
        while True:
            temperature = read_system_temperature()
            value = struct.pack('<h', int(temperature * 100))
            server.get_characteristic(TEMP_CHAR_UUID).value = value
            server.update_value(TEMP_SERVICE_UUID, TEMP_CHAR_UUID)
            await asyncio.sleep(1)
    finally:
        await server.stop()

asyncio.run(run_peripheral())

Parsing Standard BLE Data Formats

Heart Rate Measurement

def parse_heart_rate(data: bytearray) -> dict:
    flags = data[0]
    hr_format_16bit = bool(flags & 0x01)
    sensor_contact = bool(flags & 0x06)
    energy_expended = bool(flags & 0x08)
    rr_interval = bool(flags & 0x10)
    
    offset = 1
    if hr_format_16bit:
        heart_rate = int.from_bytes(data[offset:offset+2], 'little')
        offset += 2
    else:
        heart_rate = data[offset]
        offset += 1
    
    result = {'heart_rate': heart_rate, 'sensor_contact': sensor_contact}
    
    if energy_expended:
        result['energy'] = int.from_bytes(data[offset:offset+2], 'little')
        offset += 2
    
    if rr_interval:
        rr_values = []
        while offset < len(data) - 1:
            rr = int.from_bytes(data[offset:offset+2], 'little')
            rr_values.append(rr / 1024.0 * 1000)  # convert to ms
            offset += 2
        result['rr_intervals_ms'] = rr_values
    
    return result

Environmental Sensing

def parse_temperature(data: bytearray) -> float:
    """Parse BLE Temperature characteristic (0x2A6E)."""
    raw = int.from_bytes(data[:2], byteorder='little', signed=True)
    return raw / 100.0  # resolution: 0.01 °C

def parse_humidity(data: bytearray) -> float:
    """Parse BLE Humidity characteristic (0x2A6F)."""
    raw = int.from_bytes(data[:2], byteorder='little', signed=False)
    return raw / 100.0  # resolution: 0.01 %

def parse_pressure(data: bytearray) -> float:
    """Parse BLE Pressure characteristic (0x2A6D)."""
    raw = int.from_bytes(data[:4], byteorder='little', signed=False)
    return raw / 10.0  # resolution: 0.1 Pa

Debugging BLE Issues

Signal Strength and Reliability

BLE range is typically 10-30 meters indoors but varies wildly based on obstacles, interference, and device quality:

async def rssi_monitor(address: str, duration: int = 60):
    """Monitor RSSI to diagnose connection issues."""
    readings = []
    
    def callback(device, adv_data):
        if device.address == address:
            readings.append(adv_data.rssi)
    
    scanner = BleakScanner(detection_callback=callback)
    await scanner.start()
    await asyncio.sleep(duration)
    await scanner.stop()
    
    if readings:
        avg = sum(readings) / len(readings)
        print(f"RSSI: avg={avg:.0f} min={min(readings)} max={max(readings)}")
        print(f"Quality: {'Good' if avg > -70 else 'Fair' if avg > -85 else 'Poor'}")

Common Issues

  • Connection timeout — Device may be connected to another central (most peripherals support only one connection). Reset the peripheral or disconnect other centrals.
  • Service discovery hangs — Some devices have slow GATT databases. Increase the timeout in BleakClient.
  • Notification stops — Connection supervision timeout. Reduce connection interval or ensure the peripheral stays within range.
  • Linux permissions — BLE scanning requires root or proper D-Bus permissions. Add your user to the bluetooth group or configure a polkit rule.

Security Considerations

BLE has evolved through several security modes:

  • LE Legacy Pairing — Vulnerable to passive eavesdropping (BLE 4.0-4.1)
  • LE Secure Connections — Uses ECDH key exchange, resistant to passive attacks (BLE 4.2+)
  • LE Privacy — Devices rotate their MAC address to prevent tracking

For Python applications handling sensitive data, always verify that the connection uses LE Secure Connections and the device supports bonding (persistent encryption keys).

One thing to remember: BLE programming in Python is fundamentally about navigating the GATT hierarchy — discover services, find the right characteristics, and handle the async nature of wireless communication with proper timeouts, reconnection logic, and multi-device management.

pythonbluetoothbleiot

See Also