Python ROS 2 for Robotics — Deep Dive

ROS 2 Communication Layer: DDS

Unlike ROS 1’s custom TCP-based transport, ROS 2 is built on DDS (Data Distribution Service), an industry-standard middleware for real-time systems. DDS handles discovery, serialization, transport, and quality-of-service in a peer-to-peer architecture — no central master node required.

ROS 2 supports multiple DDS implementations:

  • Fast DDS (eProsima) — Default in most distributions
  • Cyclone DDS (Eclipse) — Popular alternative, often better for large-scale systems
  • Connext DDS (RTI) — Commercial, used in safety-critical applications

Switch implementations via environment variable:

export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp

Quality of Service Profiles

DDS QoS policies let you fine-tune communication behavior:

from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy

sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,  # drop if slow
    durability=DurabilityPolicy.VOLATILE,        # no late-join data
    history=HistoryPolicy.KEEP_LAST,
    depth=1                                       # only latest reading
)

command_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,      # guaranteed delivery
    durability=DurabilityPolicy.TRANSIENT_LOCAL,  # last value for late joiners
    history=HistoryPolicy.KEEP_LAST,
    depth=10
)

# Use in publisher/subscriber
self.create_publisher(LaserScan, '/scan', sensor_qos)
self.create_subscription(Twist, '/cmd_vel', self.cb, command_qos)

Sensor data typically uses BEST_EFFORT (fast, accepts losses), while commands use RELIABLE (guaranteed delivery).

Lifecycle Nodes

Production nodes need managed startup, shutdown, and error recovery. Lifecycle nodes implement a state machine with transitions:

Unconfigured → [configure] → Inactive → [activate] → Active
Active → [deactivate] → Inactive → [cleanup] → Unconfigured
Any state → [shutdown] → Finalized
Any state → [error] → ErrorProcessing → Unconfigured
import rclpy
from rclpy.lifecycle import Node as LifecycleNode
from rclpy.lifecycle import TransitionCallbackReturn

class ManagedSensorNode(LifecycleNode):
    def __init__(self):
        super().__init__('managed_sensor')
        self.sensor = None
        self.publisher = None
    
    def on_configure(self, state):
        self.get_logger().info('Configuring sensor...')
        try:
            self.sensor = initialize_sensor(
                self.get_parameter('port').value
            )
            self.publisher = self.create_lifecycle_publisher(
                SensorData, '/sensor/data', 10
            )
            return TransitionCallbackReturn.SUCCESS
        except Exception as e:
            self.get_logger().error(f'Configuration failed: {e}')
            return TransitionCallbackReturn.FAILURE
    
    def on_activate(self, state):
        self.get_logger().info('Activating sensor...')
        self.timer = self.create_timer(0.1, self.read_sensor)
        return TransitionCallbackReturn.SUCCESS
    
    def on_deactivate(self, state):
        self.get_logger().info('Deactivating sensor...')
        self.destroy_timer(self.timer)
        return TransitionCallbackReturn.SUCCESS
    
    def on_cleanup(self, state):
        self.get_logger().info('Cleaning up...')
        if self.sensor:
            self.sensor.close()
        self.destroy_publisher(self.publisher)
        return TransitionCallbackReturn.SUCCESS
    
    def read_sensor(self):
        if self.publisher.is_activated:
            msg = SensorData()
            msg.value = self.sensor.read()
            self.publisher.publish(msg)

Lifecycle nodes enable orchestrated startup sequences — configure all hardware nodes first, check for errors, then activate in the correct order.

Custom Message and Service Definitions

Define domain-specific interfaces in .msg, .srv, and .action files:

# my_robot_interfaces/msg/Detection.msg
std_msgs/Header header
string object_class
float32 confidence
geometry_msgs/Point position
float32[4] bounding_box
# my_robot_interfaces/srv/SetSpeed.srv
float64 target_speed
---
bool success
string message
# my_robot_interfaces/action/Navigate.action
geometry_msgs/PoseStamped target_pose
---
bool success
float64 total_distance
---
float64 distance_remaining
float64 estimated_time

Build custom interfaces in a CMake package even when your nodes are Python — interface generation requires the CMake build system.

Nav2 is ROS 2’s production navigation framework. It handles path planning, obstacle avoidance, and robot localization. Python nodes interact with Nav2 through its action interfaces:

from nav2_simple_commander.robot_navigator import BasicNavigator
from geometry_msgs.msg import PoseStamped
import rclpy

rclpy.init()
navigator = BasicNavigator()

# Set initial pose
initial_pose = PoseStamped()
initial_pose.header.frame_id = 'map'
initial_pose.pose.position.x = 0.0
initial_pose.pose.position.y = 0.0
navigator.setInitialPose(initial_pose)

# Wait for Nav2 to fully activate
navigator.waitUntilNav2Active()

# Navigate to a goal
goal_pose = PoseStamped()
goal_pose.header.frame_id = 'map'
goal_pose.pose.position.x = 5.0
goal_pose.pose.position.y = 3.0
goal_pose.pose.orientation.w = 1.0

navigator.goToPose(goal_pose)

while not navigator.isTaskComplete():
    feedback = navigator.getFeedback()
    if feedback:
        print(f'Distance remaining: {feedback.distance_remaining:.2f}m')

result = navigator.getResult()
print(f'Navigation result: {result}')

Waypoint Following

waypoints = []
for x, y in [(1.0, 0.0), (2.0, 1.0), (3.0, 0.0), (2.0, -1.0)]:
    wp = PoseStamped()
    wp.header.frame_id = 'map'
    wp.pose.position.x = x
    wp.pose.position.y = y
    wp.pose.orientation.w = 1.0
    waypoints.append(wp)

navigator.followWaypoints(waypoints)

Transforms with tf2

Robots have multiple coordinate frames — the base, each wheel, each sensor, the map. The tf2 library manages transformations between them:

from tf2_ros import TransformBroadcaster, Buffer, TransformListener
from geometry_msgs.msg import TransformStamped
import math

class OdometryNode(Node):
    def __init__(self):
        super().__init__('odometry_node')
        self.tf_broadcaster = TransformBroadcaster(self)
        self.timer = self.create_timer(0.02, self.broadcast_transform)
        self.x = 0.0
        self.y = 0.0
        self.theta = 0.0
    
    def broadcast_transform(self):
        t = TransformStamped()
        t.header.stamp = self.get_clock().now().to_msg()
        t.header.frame_id = 'odom'
        t.child_frame_id = 'base_link'
        t.transform.translation.x = self.x
        t.transform.translation.y = self.y
        t.transform.rotation.z = math.sin(self.theta / 2.0)
        t.transform.rotation.w = math.cos(self.theta / 2.0)
        self.tf_broadcaster.sendTransform(t)

Listening for transforms:

class SensorFusionNode(Node):
    def __init__(self):
        super().__init__('sensor_fusion')
        self.tf_buffer = Buffer()
        self.tf_listener = TransformListener(self.tf_buffer, self)
    
    def get_sensor_in_base_frame(self):
        try:
            transform = self.tf_buffer.lookup_transform(
                'base_link', 'camera_link',
                rclpy.time.Time()
            )
            return transform
        except Exception as e:
            self.get_logger().warn(f'Transform not available: {e}')
            return None

Simulation with Gazebo

Gazebo provides physics simulation, sensor emulation, and realistic rendering. ROS 2 nodes interact with simulated robots identically to real hardware:

# launch/simulation.launch.py
from launch import LaunchDescription
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch_ros.actions import Node
import os

def generate_launch_description():
    gazebo = IncludeLaunchDescription(
        PythonLaunchDescriptionSource([
            os.path.join(get_package_share_directory('gazebo_ros'),
                        'launch', 'gazebo.launch.py')
        ]),
        launch_arguments={'world': 'warehouse.world'}.items()
    )
    
    spawn_robot = Node(
        package='gazebo_ros',
        executable='spawn_entity.py',
        arguments=['-topic', 'robot_description',
                   '-entity', 'my_robot']
    )
    
    return LaunchDescription([gazebo, spawn_robot])

Testing ROS 2 Python Nodes

Unit Testing

import pytest
import rclpy
from my_robot.detection_node import DetectionNode

@pytest.fixture
def node():
    rclpy.init()
    node = DetectionNode()
    yield node
    node.destroy_node()
    rclpy.shutdown()

def test_detection_callback(node):
    msg = create_test_image(width=640, height=480)
    node.detect_objects(msg)
    assert len(node.last_detections) > 0

Integration Testing with launch_testing

import launch_testing
import unittest
import rclpy
from std_msgs.msg import String

def generate_test_description():
    return LaunchDescription([
        Node(package='my_robot', executable='publisher_node'),
        launch_testing.actions.ReadyToTest()
    ])

class TestPublisher(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        rclpy.init()
        cls.node = rclpy.create_node('test_subscriber')
    
    def test_messages_published(self):
        messages = []
        self.node.create_subscription(
            String, 'chatter',
            lambda msg: messages.append(msg), 10
        )
        
        end_time = self.node.get_clock().now() + rclpy.duration.Duration(seconds=5)
        while rclpy.ok() and self.node.get_clock().now() < end_time:
            rclpy.spin_once(self.node, timeout_sec=0.1)
        
        self.assertGreater(len(messages), 0)

Deployment Patterns

Docker Containers

FROM ros:humble

RUN apt-get update && apt-get install -y \
    python3-pip ros-humble-nav2-bringup

COPY . /ros2_ws/src/my_robot
WORKDIR /ros2_ws

RUN . /opt/ros/humble/setup.sh && \
    colcon build --packages-select my_robot

ENTRYPOINT ["/ros2_ws/install/my_robot/lib/my_robot/main_node"]

Micro-ROS for Microcontrollers

Micro-ROS brings ROS 2 to microcontrollers (ESP32, STM32), enabling direct communication between embedded firmware and ROS 2 nodes over serial, Wi-Fi, or Ethernet. Sensor microcontrollers publish standard ROS 2 messages that high-level Python nodes consume seamlessly.

One thing to remember: ROS 2’s power comes from composability — lifecycle-managed nodes, typed interfaces, configurable QoS, and a vast ecosystem of packages let you build robot software that scales from a single sensor to a fleet of autonomous vehicles.

pythonros2roboticsautomation

See Also

  • Python Behavior Trees Robotics How robots make decisions using a tree-shaped rulebook that keeps them organized, like a flowchart that tells a robot what to do in every situation.
  • Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
  • Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
  • Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
  • Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.