Python Behavior Trees for Robotics — Deep Dive
Building a Behavior Tree from Scratch
Before using libraries, understanding the core mechanism:
from enum import Enum
from typing import List, Optional
class Status(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
RUNNING = "RUNNING"
class Node:
def __init__(self, name: str):
self.name = name
self.status = Status.FAILURE
def tick(self) -> Status:
raise NotImplementedError
class Action(Node):
def __init__(self, name, action_fn):
super().__init__(name)
self.action_fn = action_fn
def tick(self) -> Status:
self.status = self.action_fn()
return self.status
class Condition(Node):
def __init__(self, name, check_fn):
super().__init__(name)
self.check_fn = check_fn
def tick(self) -> Status:
self.status = Status.SUCCESS if self.check_fn() else Status.FAILURE
return self.status
class Sequence(Node):
def __init__(self, name, children: List[Node]):
super().__init__(name)
self.children = children
self._running_child = 0
def tick(self) -> Status:
for i in range(self._running_child, len(self.children)):
status = self.children[i].tick()
if status == Status.RUNNING:
self._running_child = i
self.status = Status.RUNNING
return self.status
elif status == Status.FAILURE:
self._running_child = 0
self.status = Status.FAILURE
return self.status
self._running_child = 0
self.status = Status.SUCCESS
return self.status
class Selector(Node):
def __init__(self, name, children: List[Node]):
super().__init__(name)
self.children = children
self._running_child = 0
def tick(self) -> Status:
for i in range(self._running_child, len(self.children)):
status = self.children[i].tick()
if status == Status.RUNNING:
self._running_child = i
self.status = Status.RUNNING
return self.status
elif status == Status.SUCCESS:
self._running_child = 0
self.status = Status.SUCCESS
return self.status
self._running_child = 0
self.status = Status.FAILURE
return self.status
Decorator Nodes
class Inverter(Node):
def __init__(self, name, child: Node):
super().__init__(name)
self.child = child
def tick(self) -> Status:
status = self.child.tick()
if status == Status.SUCCESS:
self.status = Status.FAILURE
elif status == Status.FAILURE:
self.status = Status.SUCCESS
else:
self.status = Status.RUNNING
return self.status
class RetryUntilSuccess(Node):
def __init__(self, name, child: Node, max_retries: int = 3):
super().__init__(name)
self.child = child
self.max_retries = max_retries
self._attempts = 0
def tick(self) -> Status:
status = self.child.tick()
if status == Status.SUCCESS:
self._attempts = 0
self.status = Status.SUCCESS
elif status == Status.FAILURE:
self._attempts += 1
if self._attempts >= self.max_retries:
self._attempts = 0
self.status = Status.FAILURE
else:
self.status = Status.RUNNING
else:
self.status = Status.RUNNING
return self.status
class Timeout(Node):
def __init__(self, name, child: Node, max_ticks: int):
super().__init__(name)
self.child = child
self.max_ticks = max_ticks
self._tick_count = 0
def tick(self) -> Status:
self._tick_count += 1
if self._tick_count > self.max_ticks:
self._tick_count = 0
self.status = Status.FAILURE
return self.status
status = self.child.tick()
if status != Status.RUNNING:
self._tick_count = 0
self.status = status
return self.status
py_trees Library
py_trees provides a mature, battle-tested implementation:
import py_trees
# Define custom action
class NavigateToGoal(py_trees.behaviour.Behaviour):
def __init__(self, name, goal_position):
super().__init__(name)
self.goal = goal_position
self.distance = None
def initialise(self):
"""Called once when the behavior first starts running."""
self.distance = self._compute_distance()
self.logger.info(f"Starting navigation to {self.goal}, distance: {self.distance:.1f}m")
def update(self) -> py_trees.common.Status:
"""Called on every tick while running."""
self.distance -= 0.5 # simulate movement
if self.distance <= 0:
return py_trees.common.Status.SUCCESS
self.feedback_message = f"{self.distance:.1f}m remaining"
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
"""Called when status changes (completion, preemption, etc.)."""
if new_status == py_trees.common.Status.INVALID:
self.logger.info("Navigation preempted!")
def _compute_distance(self):
return 10.0 # simplified
class CheckBattery(py_trees.behaviour.Behaviour):
def __init__(self, name, threshold=20):
super().__init__(name)
self.threshold = threshold
def update(self):
battery = self._get_battery_level()
if battery > self.threshold:
return py_trees.common.Status.SUCCESS
self.feedback_message = f"Battery low: {battery}%"
return py_trees.common.Status.FAILURE
def _get_battery_level(self):
return 75 # simplified
Building a Complete Robot Tree
def create_delivery_robot_tree():
"""Behavior tree for an autonomous delivery robot."""
# Emergency behaviors (highest priority)
emergency = py_trees.composites.Sequence("Emergency", memory=True)
emergency.add_children([
py_trees.behaviours.CheckBlackboardVariableValue(
name="Battery Critical?",
check=py_trees.common.ComparisonExpression(
variable="battery_level", value=10, operator=lambda a, b: a < b
)
),
NavigateToGoal("Go to Charger", goal_position=(0, 0)),
])
# Delivery task
delivery = py_trees.composites.Sequence("Deliver Package", memory=True)
delivery.add_children([
py_trees.behaviours.CheckBlackboardVariableValue(
name="Have Package?",
check=py_trees.common.ComparisonExpression(
variable="has_package", value=True, operator=lambda a, b: a == b
)
),
NavigateToGoal("Go to Destination", goal_position=(10, 5)),
DropPackage("Drop Package"),
])
# Pickup task
pickup = py_trees.composites.Sequence("Pick Up Package", memory=True)
pickup.add_children([
py_trees.behaviours.CheckBlackboardVariableValue(
name="Package Available?",
check=py_trees.common.ComparisonExpression(
variable="package_available", value=True, operator=lambda a, b: a == b
)
),
NavigateToGoal("Go to Pickup", goal_position=(2, 8)),
GrabPackage("Grab Package"),
])
# Root selector: try emergency first, then delivery, then pickup, then idle
root = py_trees.composites.Selector("Root", memory=False)
root.add_children([
emergency,
delivery,
pickup,
NavigateToGoal("Idle Patrol", goal_position=(5, 5)),
])
return py_trees.trees.BehaviourTree(root=root)
Blackboard Communication
The blackboard is a shared data store that behavior nodes read from and write to:
def setup_blackboard():
blackboard = py_trees.blackboard.Client(name="Robot")
blackboard.register_key(key="battery_level", access=py_trees.common.Access.WRITE)
blackboard.register_key(key="has_package", access=py_trees.common.Access.WRITE)
blackboard.register_key(key="package_available", access=py_trees.common.Access.WRITE)
blackboard.register_key(key="obstacle_detected", access=py_trees.common.Access.WRITE)
blackboard.register_key(key="current_position", access=py_trees.common.Access.WRITE)
# Initialize values
blackboard.battery_level = 85
blackboard.has_package = False
blackboard.package_available = True
blackboard.obstacle_detected = False
blackboard.current_position = (0, 0)
return blackboard
class UpdateSensorData(py_trees.behaviour.Behaviour):
"""Reads sensors and writes to blackboard."""
def __init__(self, name):
super().__init__(name)
self.blackboard = self.attach_blackboard_client()
self.blackboard.register_key(key="battery_level", access=py_trees.common.Access.WRITE)
self.blackboard.register_key(key="obstacle_detected", access=py_trees.common.Access.WRITE)
def update(self):
self.blackboard.battery_level = read_battery_sensor()
self.blackboard.obstacle_detected = read_proximity_sensor()
return py_trees.common.Status.SUCCESS
Subtree Composition
Large behavior trees use subtrees for modularity:
def create_navigation_subtree(name, goal):
"""Reusable navigation subtree with obstacle avoidance."""
root = py_trees.composites.Sequence(name, memory=True)
# Check path is clear, with fallback to replanning
navigate = py_trees.composites.Selector("Navigate or Replan", memory=False)
direct_path = py_trees.composites.Sequence("Direct Path", memory=True)
direct_path.add_children([
Condition("Path Clear?", lambda: not check_obstacles()),
NavigateToGoal(f"Go to {goal}", goal),
])
replan_path = py_trees.composites.Sequence("Replan Path", memory=True)
replan_path.add_children([
ComputeAlternatePath("Find Alternate Route", goal),
NavigateToGoal(f"Follow Alternate to {goal}", goal),
])
navigate.add_children([direct_path, replan_path])
root.add_children([navigate])
return root
def create_manipulation_subtree(name, object_id):
"""Reusable pick-and-place subtree."""
root = py_trees.composites.Sequence(name, memory=True)
root.add_children([
DetectObject(f"Find {object_id}", object_id),
PlanGrasp(f"Plan Grasp for {object_id}", object_id),
RetryUntilSuccessDecorator(
ExecuteGrasp(f"Grasp {object_id}", object_id),
max_retries=3
),
VerifyGrasp(f"Verify Holding {object_id}", object_id),
])
return root
ROS 2 Integration Pattern
In ROS 2 robotics, behavior trees coordinate action servers:
import rclpy
from rclpy.action import ActionClient
from nav2_msgs.action import NavigateToPose
class ROS2NavigateAction(py_trees.behaviour.Behaviour):
"""Behavior tree node that calls a ROS 2 action server."""
def __init__(self, name, node, goal_pose):
super().__init__(name)
self.ros_node = node
self.goal_pose = goal_pose
self.action_client = ActionClient(node, NavigateToPose, 'navigate_to_pose')
self._goal_handle = None
def initialise(self):
goal_msg = NavigateToPose.Goal()
goal_msg.pose = self.goal_pose
self._send_future = self.action_client.send_goal_async(goal_msg)
self._goal_handle = None
def update(self):
if self._goal_handle is None:
if self._send_future.done():
self._goal_handle = self._send_future.result()
if not self._goal_handle.accepted:
return py_trees.common.Status.FAILURE
self._result_future = self._goal_handle.get_result_async()
return py_trees.common.Status.RUNNING
if self._result_future.done():
result = self._result_future.result()
if result.status == 4: # SUCCEEDED
return py_trees.common.Status.SUCCESS
return py_trees.common.Status.FAILURE
return py_trees.common.Status.RUNNING
def terminate(self, new_status):
if self._goal_handle and new_status == py_trees.common.Status.INVALID:
self._goal_handle.cancel_goal_async()
Testing Behavior Trees
import unittest
class TestDeliveryTree(unittest.TestCase):
def test_low_battery_preempts_delivery(self):
tree = create_delivery_robot_tree()
blackboard = setup_blackboard()
blackboard.battery_level = 5 # critically low
blackboard.has_package = True # has a delivery to make
tree.tick()
# Emergency branch should activate, not delivery
emergency_node = tree.root.children[0]
assert emergency_node.status == py_trees.common.Status.RUNNING
def test_delivery_when_battery_ok(self):
tree = create_delivery_robot_tree()
blackboard = setup_blackboard()
blackboard.battery_level = 80
blackboard.has_package = True
tree.tick()
delivery_node = tree.root.children[1]
assert delivery_node.status in (
py_trees.common.Status.RUNNING,
py_trees.common.Status.SUCCESS
)
def test_idle_when_nothing_to_do(self):
tree = create_delivery_robot_tree()
blackboard = setup_blackboard()
blackboard.battery_level = 80
blackboard.has_package = False
blackboard.package_available = False
tree.tick()
idle_node = tree.root.children[3]
assert idle_node.status == py_trees.common.Status.RUNNING
Visualization and Debugging
py_trees includes ASCII and graphviz rendering:
# ASCII tree visualization
print(py_trees.display.ascii_tree(tree.root))
# Graphviz DOT rendering
py_trees.display.render_dot_tree(tree.root, name="delivery_robot")
# Tick-by-tick logging
py_trees.logging.level = py_trees.logging.Level.DEBUG
for i in range(10):
tree.tick()
print(f"\n--- Tick {i} ---")
print(py_trees.display.ascii_tree(tree.root, show_status=True))
One thing to remember: py_trees provides a production-ready Python behavior tree framework with blackboard communication, decorator patterns, subtree composition, and ROS 2 integration — enabling modular robot behavior that scales from simple patrol routines to complex multi-step manipulation tasks.
See Also
- Python Bluetooth Ble How Python connects to fitness trackers, smart locks, and wireless sensors using the invisible radio signals all around you.
- Python Circuitpython Hardware Why CircuitPython makes wiring up LEDs, sensors, and motors as easy as plugging in a USB drive.
- Python Computer Vision Autonomous How self-driving cars use cameras and Python to see the road, spot pedestrians, read signs, and understand traffic — like giving a car human eyes and a brain.
- Python Home Assistant Automation How Python turns your home into a smart home that reacts to you automatically, like a helpful invisible butler.
- Python Lidar Point Cloud Processing How self-driving cars use millions of laser dots to build a 3D picture of the world around them, and how Python helps make sense of it all.