Python Behavior Trees for Robotics — Deep Dive

Building a Behavior Tree from Scratch

Before using libraries, understanding the core mechanism:

from enum import Enum
from typing import List, Optional

class Status(Enum):
    SUCCESS = "SUCCESS"
    FAILURE = "FAILURE"
    RUNNING = "RUNNING"

class Node:
    def __init__(self, name: str):
        self.name = name
        self.status = Status.FAILURE

    def tick(self) -> Status:
        raise NotImplementedError

class Action(Node):
    def __init__(self, name, action_fn):
        super().__init__(name)
        self.action_fn = action_fn

    def tick(self) -> Status:
        self.status = self.action_fn()
        return self.status

class Condition(Node):
    def __init__(self, name, check_fn):
        super().__init__(name)
        self.check_fn = check_fn

    def tick(self) -> Status:
        self.status = Status.SUCCESS if self.check_fn() else Status.FAILURE
        return self.status

class Sequence(Node):
    def __init__(self, name, children: List[Node]):
        super().__init__(name)
        self.children = children
        self._running_child = 0

    def tick(self) -> Status:
        for i in range(self._running_child, len(self.children)):
            status = self.children[i].tick()
            if status == Status.RUNNING:
                self._running_child = i
                self.status = Status.RUNNING
                return self.status
            elif status == Status.FAILURE:
                self._running_child = 0
                self.status = Status.FAILURE
                return self.status

        self._running_child = 0
        self.status = Status.SUCCESS
        return self.status

class Selector(Node):
    def __init__(self, name, children: List[Node]):
        super().__init__(name)
        self.children = children
        self._running_child = 0

    def tick(self) -> Status:
        for i in range(self._running_child, len(self.children)):
            status = self.children[i].tick()
            if status == Status.RUNNING:
                self._running_child = i
                self.status = Status.RUNNING
                return self.status
            elif status == Status.SUCCESS:
                self._running_child = 0
                self.status = Status.SUCCESS
                return self.status

        self._running_child = 0
        self.status = Status.FAILURE
        return self.status

Decorator Nodes

class Inverter(Node):
    def __init__(self, name, child: Node):
        super().__init__(name)
        self.child = child

    def tick(self) -> Status:
        status = self.child.tick()
        if status == Status.SUCCESS:
            self.status = Status.FAILURE
        elif status == Status.FAILURE:
            self.status = Status.SUCCESS
        else:
            self.status = Status.RUNNING
        return self.status

class RetryUntilSuccess(Node):
    def __init__(self, name, child: Node, max_retries: int = 3):
        super().__init__(name)
        self.child = child
        self.max_retries = max_retries
        self._attempts = 0

    def tick(self) -> Status:
        status = self.child.tick()
        if status == Status.SUCCESS:
            self._attempts = 0
            self.status = Status.SUCCESS
        elif status == Status.FAILURE:
            self._attempts += 1
            if self._attempts >= self.max_retries:
                self._attempts = 0
                self.status = Status.FAILURE
            else:
                self.status = Status.RUNNING
        else:
            self.status = Status.RUNNING
        return self.status

class Timeout(Node):
    def __init__(self, name, child: Node, max_ticks: int):
        super().__init__(name)
        self.child = child
        self.max_ticks = max_ticks
        self._tick_count = 0

    def tick(self) -> Status:
        self._tick_count += 1
        if self._tick_count > self.max_ticks:
            self._tick_count = 0
            self.status = Status.FAILURE
            return self.status

        status = self.child.tick()
        if status != Status.RUNNING:
            self._tick_count = 0
        self.status = status
        return self.status

py_trees Library

py_trees provides a mature, battle-tested implementation:

import py_trees

# Define custom action
class NavigateToGoal(py_trees.behaviour.Behaviour):
    def __init__(self, name, goal_position):
        super().__init__(name)
        self.goal = goal_position
        self.distance = None

    def initialise(self):
        """Called once when the behavior first starts running."""
        self.distance = self._compute_distance()
        self.logger.info(f"Starting navigation to {self.goal}, distance: {self.distance:.1f}m")

    def update(self) -> py_trees.common.Status:
        """Called on every tick while running."""
        self.distance -= 0.5  # simulate movement

        if self.distance <= 0:
            return py_trees.common.Status.SUCCESS

        self.feedback_message = f"{self.distance:.1f}m remaining"
        return py_trees.common.Status.RUNNING

    def terminate(self, new_status):
        """Called when status changes (completion, preemption, etc.)."""
        if new_status == py_trees.common.Status.INVALID:
            self.logger.info("Navigation preempted!")

    def _compute_distance(self):
        return 10.0  # simplified

class CheckBattery(py_trees.behaviour.Behaviour):
    def __init__(self, name, threshold=20):
        super().__init__(name)
        self.threshold = threshold

    def update(self):
        battery = self._get_battery_level()
        if battery > self.threshold:
            return py_trees.common.Status.SUCCESS
        self.feedback_message = f"Battery low: {battery}%"
        return py_trees.common.Status.FAILURE

    def _get_battery_level(self):
        return 75  # simplified

Building a Complete Robot Tree

def create_delivery_robot_tree():
    """Behavior tree for an autonomous delivery robot."""

    # Emergency behaviors (highest priority)
    emergency = py_trees.composites.Sequence("Emergency", memory=True)
    emergency.add_children([
        py_trees.behaviours.CheckBlackboardVariableValue(
            name="Battery Critical?",
            check=py_trees.common.ComparisonExpression(
                variable="battery_level", value=10, operator=lambda a, b: a < b
            )
        ),
        NavigateToGoal("Go to Charger", goal_position=(0, 0)),
    ])

    # Delivery task
    delivery = py_trees.composites.Sequence("Deliver Package", memory=True)
    delivery.add_children([
        py_trees.behaviours.CheckBlackboardVariableValue(
            name="Have Package?",
            check=py_trees.common.ComparisonExpression(
                variable="has_package", value=True, operator=lambda a, b: a == b
            )
        ),
        NavigateToGoal("Go to Destination", goal_position=(10, 5)),
        DropPackage("Drop Package"),
    ])

    # Pickup task
    pickup = py_trees.composites.Sequence("Pick Up Package", memory=True)
    pickup.add_children([
        py_trees.behaviours.CheckBlackboardVariableValue(
            name="Package Available?",
            check=py_trees.common.ComparisonExpression(
                variable="package_available", value=True, operator=lambda a, b: a == b
            )
        ),
        NavigateToGoal("Go to Pickup", goal_position=(2, 8)),
        GrabPackage("Grab Package"),
    ])

    # Root selector: try emergency first, then delivery, then pickup, then idle
    root = py_trees.composites.Selector("Root", memory=False)
    root.add_children([
        emergency,
        delivery,
        pickup,
        NavigateToGoal("Idle Patrol", goal_position=(5, 5)),
    ])

    return py_trees.trees.BehaviourTree(root=root)

Blackboard Communication

The blackboard is a shared data store that behavior nodes read from and write to:

def setup_blackboard():
    blackboard = py_trees.blackboard.Client(name="Robot")
    blackboard.register_key(key="battery_level", access=py_trees.common.Access.WRITE)
    blackboard.register_key(key="has_package", access=py_trees.common.Access.WRITE)
    blackboard.register_key(key="package_available", access=py_trees.common.Access.WRITE)
    blackboard.register_key(key="obstacle_detected", access=py_trees.common.Access.WRITE)
    blackboard.register_key(key="current_position", access=py_trees.common.Access.WRITE)

    # Initialize values
    blackboard.battery_level = 85
    blackboard.has_package = False
    blackboard.package_available = True
    blackboard.obstacle_detected = False
    blackboard.current_position = (0, 0)

    return blackboard

class UpdateSensorData(py_trees.behaviour.Behaviour):
    """Reads sensors and writes to blackboard."""
    def __init__(self, name):
        super().__init__(name)
        self.blackboard = self.attach_blackboard_client()
        self.blackboard.register_key(key="battery_level", access=py_trees.common.Access.WRITE)
        self.blackboard.register_key(key="obstacle_detected", access=py_trees.common.Access.WRITE)

    def update(self):
        self.blackboard.battery_level = read_battery_sensor()
        self.blackboard.obstacle_detected = read_proximity_sensor()
        return py_trees.common.Status.SUCCESS

Subtree Composition

Large behavior trees use subtrees for modularity:

def create_navigation_subtree(name, goal):
    """Reusable navigation subtree with obstacle avoidance."""
    root = py_trees.composites.Sequence(name, memory=True)

    # Check path is clear, with fallback to replanning
    navigate = py_trees.composites.Selector("Navigate or Replan", memory=False)

    direct_path = py_trees.composites.Sequence("Direct Path", memory=True)
    direct_path.add_children([
        Condition("Path Clear?", lambda: not check_obstacles()),
        NavigateToGoal(f"Go to {goal}", goal),
    ])

    replan_path = py_trees.composites.Sequence("Replan Path", memory=True)
    replan_path.add_children([
        ComputeAlternatePath("Find Alternate Route", goal),
        NavigateToGoal(f"Follow Alternate to {goal}", goal),
    ])

    navigate.add_children([direct_path, replan_path])
    root.add_children([navigate])

    return root

def create_manipulation_subtree(name, object_id):
    """Reusable pick-and-place subtree."""
    root = py_trees.composites.Sequence(name, memory=True)
    root.add_children([
        DetectObject(f"Find {object_id}", object_id),
        PlanGrasp(f"Plan Grasp for {object_id}", object_id),
        RetryUntilSuccessDecorator(
            ExecuteGrasp(f"Grasp {object_id}", object_id),
            max_retries=3
        ),
        VerifyGrasp(f"Verify Holding {object_id}", object_id),
    ])
    return root

ROS 2 Integration Pattern

In ROS 2 robotics, behavior trees coordinate action servers:

import rclpy
from rclpy.action import ActionClient
from nav2_msgs.action import NavigateToPose

class ROS2NavigateAction(py_trees.behaviour.Behaviour):
    """Behavior tree node that calls a ROS 2 action server."""
    def __init__(self, name, node, goal_pose):
        super().__init__(name)
        self.ros_node = node
        self.goal_pose = goal_pose
        self.action_client = ActionClient(node, NavigateToPose, 'navigate_to_pose')
        self._goal_handle = None

    def initialise(self):
        goal_msg = NavigateToPose.Goal()
        goal_msg.pose = self.goal_pose
        self._send_future = self.action_client.send_goal_async(goal_msg)
        self._goal_handle = None

    def update(self):
        if self._goal_handle is None:
            if self._send_future.done():
                self._goal_handle = self._send_future.result()
                if not self._goal_handle.accepted:
                    return py_trees.common.Status.FAILURE
                self._result_future = self._goal_handle.get_result_async()
            return py_trees.common.Status.RUNNING

        if self._result_future.done():
            result = self._result_future.result()
            if result.status == 4:  # SUCCEEDED
                return py_trees.common.Status.SUCCESS
            return py_trees.common.Status.FAILURE

        return py_trees.common.Status.RUNNING

    def terminate(self, new_status):
        if self._goal_handle and new_status == py_trees.common.Status.INVALID:
            self._goal_handle.cancel_goal_async()

Testing Behavior Trees

import unittest

class TestDeliveryTree(unittest.TestCase):
    def test_low_battery_preempts_delivery(self):
        tree = create_delivery_robot_tree()
        blackboard = setup_blackboard()
        blackboard.battery_level = 5  # critically low
        blackboard.has_package = True  # has a delivery to make

        tree.tick()

        # Emergency branch should activate, not delivery
        emergency_node = tree.root.children[0]
        assert emergency_node.status == py_trees.common.Status.RUNNING

    def test_delivery_when_battery_ok(self):
        tree = create_delivery_robot_tree()
        blackboard = setup_blackboard()
        blackboard.battery_level = 80
        blackboard.has_package = True

        tree.tick()

        delivery_node = tree.root.children[1]
        assert delivery_node.status in (
            py_trees.common.Status.RUNNING,
            py_trees.common.Status.SUCCESS
        )

    def test_idle_when_nothing_to_do(self):
        tree = create_delivery_robot_tree()
        blackboard = setup_blackboard()
        blackboard.battery_level = 80
        blackboard.has_package = False
        blackboard.package_available = False

        tree.tick()

        idle_node = tree.root.children[3]
        assert idle_node.status == py_trees.common.Status.RUNNING

Visualization and Debugging

py_trees includes ASCII and graphviz rendering:

# ASCII tree visualization
print(py_trees.display.ascii_tree(tree.root))

# Graphviz DOT rendering
py_trees.display.render_dot_tree(tree.root, name="delivery_robot")

# Tick-by-tick logging
py_trees.logging.level = py_trees.logging.Level.DEBUG
for i in range(10):
    tree.tick()
    print(f"\n--- Tick {i} ---")
    print(py_trees.display.ascii_tree(tree.root, show_status=True))

One thing to remember: py_trees provides a production-ready Python behavior tree framework with blackboard communication, decorator patterns, subtree composition, and ROS 2 integration — enabling modular robot behavior that scales from simple patrol routines to complex multi-step manipulation tasks.

pythonbehavior-treesroboticsdecision-makingai

See Also