Python Bluetooth Low Energy (BLE) — Deep Dive
BLE Protocol Stack
The BLE protocol stack has three main layers:
Physical Layer — Operates in the 2.4 GHz ISM band, using 40 channels with 2 MHz spacing. Three channels (37, 38, 39) are dedicated to advertising; the remaining 37 are data channels. Frequency hopping across data channels reduces interference.
Link Layer — Manages connections, advertising, and scanning. Defines connection parameters: connection interval (7.5 ms to 4 seconds), slave latency (how many intervals a peripheral can skip), and supervision timeout (disconnection detection).
GATT/ATT Layer — The Attribute Protocol (ATT) provides the read/write primitives. GATT builds the service/characteristic hierarchy on top of ATT. Applications interact primarily with GATT.
Connection Parameters and Performance
Connection parameters directly affect throughput and power consumption:
# bleak doesn't expose connection parameter negotiation directly,
# but understanding them helps debug performance issues:
# Connection Interval: 7.5ms = fast but power-hungry
# 4000ms = slow but power-efficient
# Typical: 15-30ms for interactive, 100-500ms for sensors
# MTU (Maximum Transmission Unit): default 23 bytes, negotiate up to 517
# Larger MTU = fewer round trips for big data transfers
Advanced Scanning Patterns
Filtered Scanning
from bleak import BleakScanner
async def scan_for_specific_devices():
# Filter by service UUID
devices = await BleakScanner.discover(
timeout=10.0,
service_uuids=["0000181a-0000-1000-8000-00805f9b34fb"] # Environmental Sensing
)
for device in devices:
print(f"Environmental sensor: {device.name} at {device.address}")
asyncio.run(scan_for_specific_devices())
Continuous Scanning with Callbacks
import asyncio
from bleak import BleakScanner
from collections import defaultdict
class DeviceTracker:
def __init__(self):
self.devices = {}
self.rssi_history = defaultdict(list)
def callback(self, device, advertisement_data):
self.devices[device.address] = {
'name': advertisement_data.local_name or device.name,
'rssi': advertisement_data.rssi,
'services': advertisement_data.service_uuids,
'last_seen': asyncio.get_event_loop().time()
}
self.rssi_history[device.address].append(advertisement_data.rssi)
def get_nearest(self, n=5):
sorted_devices = sorted(
self.devices.items(),
key=lambda x: x[1]['rssi'],
reverse=True
)
return sorted_devices[:n]
def estimate_distance(self, address, tx_power=-59):
"""Rough distance estimate from RSSI (very approximate)."""
rssi_values = self.rssi_history.get(address, [])
if not rssi_values:
return None
avg_rssi = sum(rssi_values[-10:]) / len(rssi_values[-10:])
ratio = (tx_power - avg_rssi) / (10 * 2.0) # path loss exponent ~2
return 10 ** ratio
async def track_devices():
tracker = DeviceTracker()
scanner = BleakScanner(detection_callback=tracker.callback)
await scanner.start()
for _ in range(30): # 30 seconds
await asyncio.sleep(1)
nearest = tracker.get_nearest(3)
for addr, info in nearest:
print(f" {info['name'] or addr}: RSSI={info['rssi']}")
await scanner.stop()
asyncio.run(track_devices())
Write Operations
Writing to Characteristics
from bleak import BleakClient
async def control_device():
async with BleakClient("AA:BB:CC:DD:EE:FF") as client:
# Write with response (acknowledged)
await client.write_gatt_char(
"0000ff01-0000-1000-8000-00805f9b34fb",
bytearray([0x01, 0xFF, 0x00]), # command bytes
response=True
)
# Write without response (faster, fire-and-forget)
await client.write_gatt_char(
"0000ff02-0000-1000-8000-00805f9b34fb",
bytearray([0x00, 0x80]),
response=False
)
asyncio.run(control_device())
Controlling an LED Strip via BLE
async def set_led_color(address, r, g, b):
SERVICE_UUID = "0000fff0-0000-1000-8000-00805f9b34fb"
CHAR_UUID = "0000fff3-0000-1000-8000-00805f9b34fb"
async with BleakClient(address) as client:
# Common BLE LED controller protocol
command = bytearray([0x56, r, g, b, 0x00, 0xF0, 0xAA])
await client.write_gatt_char(CHAR_UUID, command, response=False)
asyncio.run(set_led_color("AA:BB:CC:DD:EE:FF", 255, 0, 128))
Multi-Device Connection Management
Managing concurrent BLE connections requires careful resource management:
from bleak import BleakClient
import asyncio
class BLEDevicePool:
def __init__(self, addresses: list, max_concurrent: int = 5):
self.addresses = addresses
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = {}
async def read_device(self, address: str, char_uuid: str):
async with self.semaphore:
try:
async with BleakClient(address, timeout=10.0) as client:
value = await client.read_gatt_char(char_uuid)
self.results[address] = {
'value': value,
'status': 'ok'
}
except Exception as e:
self.results[address] = {
'value': None,
'status': f'error: {e}'
}
async def read_all(self, char_uuid: str):
tasks = [
self.read_device(addr, char_uuid)
for addr in self.addresses
]
await asyncio.gather(*tasks)
return self.results
async def poll_all_sensors():
sensors = [
"AA:BB:CC:DD:EE:01",
"AA:BB:CC:DD:EE:02",
"AA:BB:CC:DD:EE:03",
]
pool = BLEDevicePool(sensors, max_concurrent=3)
temp_uuid = "00002a6e-0000-1000-8000-00805f9b34fb"
results = await pool.read_all(temp_uuid)
for addr, result in results.items():
if result['status'] == 'ok':
temp = int.from_bytes(result['value'], 'little') / 100
print(f"{addr}: {temp}°C")
else:
print(f"{addr}: {result['status']}")
asyncio.run(poll_all_sensors())
BLE Peripheral Mode with Python
While bleak is central-only, you can create BLE peripherals using bless:
from bless import BlessServer, BlessGATTCharacteristic, GATTCharacteristicProperties
import asyncio
import struct
TEMP_SERVICE_UUID = "0000181a-0000-1000-8000-00805f9b34fb"
TEMP_CHAR_UUID = "00002a6e-0000-1000-8000-00805f9b34fb"
async def run_peripheral():
server = BlessServer(name="PythonTempSensor")
await server.add_new_service(TEMP_SERVICE_UUID)
await server.add_new_characteristic(
TEMP_SERVICE_UUID,
TEMP_CHAR_UUID,
GATTCharacteristicProperties.read | GATTCharacteristicProperties.notify,
value=None,
permissions=1 # readable
)
await server.start()
print("BLE peripheral running...")
try:
while True:
temperature = read_system_temperature()
value = struct.pack('<h', int(temperature * 100))
server.get_characteristic(TEMP_CHAR_UUID).value = value
server.update_value(TEMP_SERVICE_UUID, TEMP_CHAR_UUID)
await asyncio.sleep(1)
finally:
await server.stop()
asyncio.run(run_peripheral())
Parsing Standard BLE Data Formats
Heart Rate Measurement
def parse_heart_rate(data: bytearray) -> dict:
flags = data[0]
hr_format_16bit = bool(flags & 0x01)
sensor_contact = bool(flags & 0x06)
energy_expended = bool(flags & 0x08)
rr_interval = bool(flags & 0x10)
offset = 1
if hr_format_16bit:
heart_rate = int.from_bytes(data[offset:offset+2], 'little')
offset += 2
else:
heart_rate = data[offset]
offset += 1
result = {'heart_rate': heart_rate, 'sensor_contact': sensor_contact}
if energy_expended:
result['energy'] = int.from_bytes(data[offset:offset+2], 'little')
offset += 2
if rr_interval:
rr_values = []
while offset < len(data) - 1:
rr = int.from_bytes(data[offset:offset+2], 'little')
rr_values.append(rr / 1024.0 * 1000) # convert to ms
offset += 2
result['rr_intervals_ms'] = rr_values
return result
Environmental Sensing
def parse_temperature(data: bytearray) -> float:
"""Parse BLE Temperature characteristic (0x2A6E)."""
raw = int.from_bytes(data[:2], byteorder='little', signed=True)
return raw / 100.0 # resolution: 0.01 °C
def parse_humidity(data: bytearray) -> float:
"""Parse BLE Humidity characteristic (0x2A6F)."""
raw = int.from_bytes(data[:2], byteorder='little', signed=False)
return raw / 100.0 # resolution: 0.01 %
def parse_pressure(data: bytearray) -> float:
"""Parse BLE Pressure characteristic (0x2A6D)."""
raw = int.from_bytes(data[:4], byteorder='little', signed=False)
return raw / 10.0 # resolution: 0.1 Pa
Debugging BLE Issues
Signal Strength and Reliability
BLE range is typically 10-30 meters indoors but varies wildly based on obstacles, interference, and device quality:
async def rssi_monitor(address: str, duration: int = 60):
"""Monitor RSSI to diagnose connection issues."""
readings = []
def callback(device, adv_data):
if device.address == address:
readings.append(adv_data.rssi)
scanner = BleakScanner(detection_callback=callback)
await scanner.start()
await asyncio.sleep(duration)
await scanner.stop()
if readings:
avg = sum(readings) / len(readings)
print(f"RSSI: avg={avg:.0f} min={min(readings)} max={max(readings)}")
print(f"Quality: {'Good' if avg > -70 else 'Fair' if avg > -85 else 'Poor'}")
Common Issues
- Connection timeout — Device may be connected to another central (most peripherals support only one connection). Reset the peripheral or disconnect other centrals.
- Service discovery hangs — Some devices have slow GATT databases. Increase the timeout in BleakClient.
- Notification stops — Connection supervision timeout. Reduce connection interval or ensure the peripheral stays within range.
- Linux permissions — BLE scanning requires root or proper D-Bus permissions. Add your user to the
bluetoothgroup or configure a polkit rule.
Security Considerations
BLE has evolved through several security modes:
- LE Legacy Pairing — Vulnerable to passive eavesdropping (BLE 4.0-4.1)
- LE Secure Connections — Uses ECDH key exchange, resistant to passive attacks (BLE 4.2+)
- LE Privacy — Devices rotate their MAC address to prevent tracking
For Python applications handling sensitive data, always verify that the connection uses LE Secure Connections and the device supports bonding (persistent encryption keys).
One thing to remember: BLE programming in Python is fundamentally about navigating the GATT hierarchy — discover services, find the right characteristics, and handle the async nature of wireless communication with proper timeouts, reconnection logic, and multi-device management.
See Also
- Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
- Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
- Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
- Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.
- Python Lidar Point Cloud Processing How self-driving cars use millions of laser dots to build a 3D picture of the world around them, and how Python helps make sense of it all.