Python ROS 2 for Robotics — Deep Dive
ROS 2 Communication Layer: DDS
Unlike ROS 1’s custom TCP-based transport, ROS 2 is built on DDS (Data Distribution Service), an industry-standard middleware for real-time systems. DDS handles discovery, serialization, transport, and quality-of-service in a peer-to-peer architecture — no central master node required.
ROS 2 supports multiple DDS implementations:
- Fast DDS (eProsima) — Default in most distributions
- Cyclone DDS (Eclipse) — Popular alternative, often better for large-scale systems
- Connext DDS (RTI) — Commercial, used in safety-critical applications
Switch implementations via environment variable:
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
Quality of Service Profiles
DDS QoS policies let you fine-tune communication behavior:
from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT, # drop if slow
durability=DurabilityPolicy.VOLATILE, # no late-join data
history=HistoryPolicy.KEEP_LAST,
depth=1 # only latest reading
)
command_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE, # guaranteed delivery
durability=DurabilityPolicy.TRANSIENT_LOCAL, # last value for late joiners
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Use in publisher/subscriber
self.create_publisher(LaserScan, '/scan', sensor_qos)
self.create_subscription(Twist, '/cmd_vel', self.cb, command_qos)
Sensor data typically uses BEST_EFFORT (fast, accepts losses), while commands use RELIABLE (guaranteed delivery).
Lifecycle Nodes
Production nodes need managed startup, shutdown, and error recovery. Lifecycle nodes implement a state machine with transitions:
Unconfigured → [configure] → Inactive → [activate] → Active
Active → [deactivate] → Inactive → [cleanup] → Unconfigured
Any state → [shutdown] → Finalized
Any state → [error] → ErrorProcessing → Unconfigured
import rclpy
from rclpy.lifecycle import Node as LifecycleNode
from rclpy.lifecycle import TransitionCallbackReturn
class ManagedSensorNode(LifecycleNode):
def __init__(self):
super().__init__('managed_sensor')
self.sensor = None
self.publisher = None
def on_configure(self, state):
self.get_logger().info('Configuring sensor...')
try:
self.sensor = initialize_sensor(
self.get_parameter('port').value
)
self.publisher = self.create_lifecycle_publisher(
SensorData, '/sensor/data', 10
)
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f'Configuration failed: {e}')
return TransitionCallbackReturn.FAILURE
def on_activate(self, state):
self.get_logger().info('Activating sensor...')
self.timer = self.create_timer(0.1, self.read_sensor)
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state):
self.get_logger().info('Deactivating sensor...')
self.destroy_timer(self.timer)
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state):
self.get_logger().info('Cleaning up...')
if self.sensor:
self.sensor.close()
self.destroy_publisher(self.publisher)
return TransitionCallbackReturn.SUCCESS
def read_sensor(self):
if self.publisher.is_activated:
msg = SensorData()
msg.value = self.sensor.read()
self.publisher.publish(msg)
Lifecycle nodes enable orchestrated startup sequences — configure all hardware nodes first, check for errors, then activate in the correct order.
Custom Message and Service Definitions
Define domain-specific interfaces in .msg, .srv, and .action files:
# my_robot_interfaces/msg/Detection.msg
std_msgs/Header header
string object_class
float32 confidence
geometry_msgs/Point position
float32[4] bounding_box
# my_robot_interfaces/srv/SetSpeed.srv
float64 target_speed
---
bool success
string message
# my_robot_interfaces/action/Navigate.action
geometry_msgs/PoseStamped target_pose
---
bool success
float64 total_distance
---
float64 distance_remaining
float64 estimated_time
Build custom interfaces in a CMake package even when your nodes are Python — interface generation requires the CMake build system.
Navigation Stack (Nav2)
Nav2 is ROS 2’s production navigation framework. It handles path planning, obstacle avoidance, and robot localization. Python nodes interact with Nav2 through its action interfaces:
from nav2_simple_commander.robot_navigator import BasicNavigator
from geometry_msgs.msg import PoseStamped
import rclpy
rclpy.init()
navigator = BasicNavigator()
# Set initial pose
initial_pose = PoseStamped()
initial_pose.header.frame_id = 'map'
initial_pose.pose.position.x = 0.0
initial_pose.pose.position.y = 0.0
navigator.setInitialPose(initial_pose)
# Wait for Nav2 to fully activate
navigator.waitUntilNav2Active()
# Navigate to a goal
goal_pose = PoseStamped()
goal_pose.header.frame_id = 'map'
goal_pose.pose.position.x = 5.0
goal_pose.pose.position.y = 3.0
goal_pose.pose.orientation.w = 1.0
navigator.goToPose(goal_pose)
while not navigator.isTaskComplete():
feedback = navigator.getFeedback()
if feedback:
print(f'Distance remaining: {feedback.distance_remaining:.2f}m')
result = navigator.getResult()
print(f'Navigation result: {result}')
Waypoint Following
waypoints = []
for x, y in [(1.0, 0.0), (2.0, 1.0), (3.0, 0.0), (2.0, -1.0)]:
wp = PoseStamped()
wp.header.frame_id = 'map'
wp.pose.position.x = x
wp.pose.position.y = y
wp.pose.orientation.w = 1.0
waypoints.append(wp)
navigator.followWaypoints(waypoints)
Transforms with tf2
Robots have multiple coordinate frames — the base, each wheel, each sensor, the map. The tf2 library manages transformations between them:
from tf2_ros import TransformBroadcaster, Buffer, TransformListener
from geometry_msgs.msg import TransformStamped
import math
class OdometryNode(Node):
def __init__(self):
super().__init__('odometry_node')
self.tf_broadcaster = TransformBroadcaster(self)
self.timer = self.create_timer(0.02, self.broadcast_transform)
self.x = 0.0
self.y = 0.0
self.theta = 0.0
def broadcast_transform(self):
t = TransformStamped()
t.header.stamp = self.get_clock().now().to_msg()
t.header.frame_id = 'odom'
t.child_frame_id = 'base_link'
t.transform.translation.x = self.x
t.transform.translation.y = self.y
t.transform.rotation.z = math.sin(self.theta / 2.0)
t.transform.rotation.w = math.cos(self.theta / 2.0)
self.tf_broadcaster.sendTransform(t)
Listening for transforms:
class SensorFusionNode(Node):
def __init__(self):
super().__init__('sensor_fusion')
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
def get_sensor_in_base_frame(self):
try:
transform = self.tf_buffer.lookup_transform(
'base_link', 'camera_link',
rclpy.time.Time()
)
return transform
except Exception as e:
self.get_logger().warn(f'Transform not available: {e}')
return None
Simulation with Gazebo
Gazebo provides physics simulation, sensor emulation, and realistic rendering. ROS 2 nodes interact with simulated robots identically to real hardware:
# launch/simulation.launch.py
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_ros.actions import Node
import os
def generate_launch_description():
gazebo = IncludeLaunchDescription(
PythonLaunchDescriptionSource([
os.path.join(get_package_share_directory('gazebo_ros'),
'launch', 'gazebo.launch.py')
]),
launch_arguments={'world': 'warehouse.world'}.items()
)
spawn_robot = Node(
package='gazebo_ros',
executable='spawn_entity.py',
arguments=['-topic', 'robot_description',
'-entity', 'my_robot']
)
return LaunchDescription([gazebo, spawn_robot])
Testing ROS 2 Python Nodes
Unit Testing
import pytest
import rclpy
from my_robot.detection_node import DetectionNode
@pytest.fixture
def node():
rclpy.init()
node = DetectionNode()
yield node
node.destroy_node()
rclpy.shutdown()
def test_detection_callback(node):
msg = create_test_image(width=640, height=480)
node.detect_objects(msg)
assert len(node.last_detections) > 0
Integration Testing with launch_testing
import launch_testing
import unittest
import rclpy
from std_msgs.msg import String
def generate_test_description():
return LaunchDescription([
Node(package='my_robot', executable='publisher_node'),
launch_testing.actions.ReadyToTest()
])
class TestPublisher(unittest.TestCase):
@classmethod
def setUpClass(cls):
rclpy.init()
cls.node = rclpy.create_node('test_subscriber')
def test_messages_published(self):
messages = []
self.node.create_subscription(
String, 'chatter',
lambda msg: messages.append(msg), 10
)
end_time = self.node.get_clock().now() + rclpy.duration.Duration(seconds=5)
while rclpy.ok() and self.node.get_clock().now() < end_time:
rclpy.spin_once(self.node, timeout_sec=0.1)
self.assertGreater(len(messages), 0)
Deployment Patterns
Docker Containers
FROM ros:humble
RUN apt-get update && apt-get install -y \
python3-pip ros-humble-nav2-bringup
COPY . /ros2_ws/src/my_robot
WORKDIR /ros2_ws
RUN . /opt/ros/humble/setup.sh && \
colcon build --packages-select my_robot
ENTRYPOINT ["/ros2_ws/install/my_robot/lib/my_robot/main_node"]
Micro-ROS for Microcontrollers
Micro-ROS brings ROS 2 to microcontrollers (ESP32, STM32), enabling direct communication between embedded firmware and ROS 2 nodes over serial, Wi-Fi, or Ethernet. Sensor microcontrollers publish standard ROS 2 messages that high-level Python nodes consume seamlessly.
One thing to remember: ROS 2’s power comes from composability — lifecycle-managed nodes, typed interfaces, configurable QoS, and a vast ecosystem of packages let you build robot software that scales from a single sensor to a fleet of autonomous vehicles.
See Also
- Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
- Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
- Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
- Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
- Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.