Getting Started With the Simple Commander API – ROS 2 Jazzy

In this tutorial, we will explore the Nav2 Simple Commander API, a powerful tool that simplifies robot navigation programming in Python. We’ll learn how to use this API to control a robot’s movement, plan paths, and execute various navigation tasks without getting bogged down in complex ROS 2 details. This guide will help you get started with the most important features of the Simple Commander API.

By the end of this tutorial, you will be able to build this:

go-to-goal-simple-commander-api

Real-World Applications

You can use the Nav2 Simple Commander API in many real-world situations. Here are some examples:

  1. Home Help: You can create robots that clean your house or bring you things from other rooms.
  2. Warehouse Work: You can build robots that check items on shelves or pick up and move things around the warehouse.
  3. Farming: You can develop vehicles that plant, check, or pick crops.
  4. Security: You can make robots that patrol areas or quickly go to places where there might be trouble.
  5. Healthcare: You can create robots that bring medicine to hospital rooms or clean areas to keep them safe.

Prerequisites

All my code for this project is located here on GitHub.

Create Folders

To get started with the Nav2 Simple Commander API, you’ll need to set up your workspace. Follow these steps to create the necessary folders:

Open a terminal window, and type the following command:

cd ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_navigation

Create a folder for your Python scripts:

mkdir scripts

Create a folder for the helper files you will create:

mkdir yahboom_rosmaster_navigation
cd yahboom_rosmaster_navigation
touch __init__.py

Create a Goal Pose Message Generator

Now we will create a goal pose message generator. This generator is important for our navigation tasks because:

  1. In ROS 2, robot positions and orientations are often represented using the PoseStamped message type.
  2. This message type includes not just the position and orientation, but also timing and reference frame information, which are essential for accurate navigation.
  3. Creating a function to generate these messages will simplify our code when we need to specify goal positions for our robot.

Open a terminal window:

cd ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_navigation/yahboom_rosmaster_navigation/
touch posestamped_msg_generator.py

Type this code.

#!/usr/bin/env python3
"""
Create a PoseStamped message for ROS 2 navigation.

This script creates a geometry_msgs/PoseStamped message which is commonly used
for robot navigation in ROS 2. It shows how to properly construct a PoseStamped
message with header information and pose data.

:author: Addison Sears-Collins
:date: December 5, 2024
"""

from rclpy.node import Node
from builtin_interfaces.msg import Time
from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion
from std_msgs.msg import Header


class PoseStampedGenerator(Node):
    """A ROS 2 node that generates PoseStamped messages."""

    def __init__(self, node_name='pose_stamped_generator'):
        """
        Initialize the ROS 2 node.

        :param node_name: Name of the node
        :type node_name: str
        """
        super().__init__(node_name)

    def create_pose_stamped(self, x=0.0, y=0.0, z=0.0,
                            qx=0.0, qy=0.0, qz=0.0, qw=1.0,
                            frame_id='map'):
        """
        Create a PoseStamped message with the specified parameters.

        :param x: x position coordinate
        :type x: float
        :param y: y position coordinate
        :type y: float
        :param z: z position coordinate
        :type z: float
        :param qx: x quaternion component
        :type qx: float
        :param qy: y quaternion component
        :type qy: float
        :param qz: z quaternion component
        :type qz: float
        :param qw: w quaternion component
        :type qw: float
        :param frame_id: reference frame
        :type frame_id: str
        :return: pose stamped message
        :rtype: PoseStamped
        """
        # Create the PoseStamped message object
        pose_stamped = PoseStamped()

        # Create and fill the header with frame_id and timestamp
        header = Header()
        header.frame_id = frame_id

        # Get current ROS time and convert to Time message
        now = self.get_clock().now()
        header.stamp = Time(
            sec=now.seconds_nanoseconds()[0],
            nanosec=now.seconds_nanoseconds()[1]
        )

        # Create and fill the position message with x, y, z coordinates
        position = Point()
        position.x = float(x)
        position.y = float(y)
        position.z = float(z)

        # Create and fill the orientation message with quaternion values
        orientation = Quaternion()
        orientation.x = float(qx)
        orientation.y = float(qy)
        orientation.z = float(qz)
        orientation.w = float(qw)

        # Create Pose message and combine position and orientation
        pose = Pose()
        pose.position = position
        pose.orientation = orientation

        # Combine header and pose into the final PoseStamped message
        pose_stamped.header = header
        pose_stamped.pose = pose

        return pose_stamped

Save the file, and close it.

This script provides a ROS 2 node class PoseStampedGenerator that creates geometry_msgs/PoseStamped messages. The class provides a create_pose_stamped method that takes:

  • Position coordinates (x, y, z)
  • Orientation as a quaternion (qx, qy, qz, qw)
  • Reference frame (defaults to ‘map’)

Create a Go to Goal Pose Script

Now I will show you how to create a script that will send a robot to a single goal location (i.e. “goal pose”). A barebones official example code for sending a single goal is here on the Nav2 GitHub. However, this code is missing some important functionality that you will need for a real-world production mobile robot that needs to navigate in a dynamic environment with people moving around and objects of all shapes and sizes. For this reason, we are going to use that code as a basis and then add additional functionality.

Our navigation script will assume the initial pose of the robot has already been set either via RViz, the command line, or some other method.

To create this script, open a terminal, and navigate to the scripts directory:

cd ~/ros2_ws/src/yahboom_rosmaster_navigation/yahboom_rosmaster_navigation/scripts
touch nav_to_pose.py

Add this code.

#!/usr/bin/env python3
"""
ROS 2 node for navigating to a goal pose and publishing status updates.

This script creates a ROS 2 node that:
    - Subscribes to a goal pose
    - Navigates the robot to the goal pose
    - Publishes the estimated time of arrival
    - Publishes the goal status
    - Cancels the goal if a stop signal is received

Subscription Topics:
    /goal_pose/goal (geometry_msgs/PoseStamped): The desired goal pose
    /stop/navigation/go_to_goal_pose (std_msgs/Bool): Signal to stop navigation
    /cmd_vel (geometry_msgs/Twist): Velocity command

Publishing Topics:
    /goal_pose/eta (std_msgs/String): Estimated time of arrival in seconds
    /goal_pose/status (std_msgs/String): Goal pose status

:author: Addison Sears-Collins
:date: December 5, 2024
"""

import time
import rclpy
from rclpy.duration import Duration
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from nav2_simple_commander.robot_navigator import BasicNavigator, TaskResult
from std_msgs.msg import Bool, String
from geometry_msgs.msg import Twist, PoseStamped

# Global flags for navigation state
STOP_NAVIGATION_NOW = False
NAV_IN_PROGRESS = False
MOVING_FORWARD = True

COSTMAP_CLEARING_PERIOD = 0.5


class GoToGoalPose(Node):
    """This class subscribes to the goal pose and publishes the ETA
       and goal status information to ROS 2.
    """

    def __init__(self):
        """Constructor."""
        # Initialize the class using the constructor
        super().__init__('go_to_goal_pose')

        # Create the ROS 2 publishers
        self.publisher_eta = self.create_publisher(String, '/goal_pose/eta', 10)
        self.publisher_status = self.create_publisher(String, '/goal_pose/status', 10)

        # Create a subscriber
        # This node subscribes to messages of type geometry_msgs/PoseStamped
        self.subscription_go_to_goal_pose = self.create_subscription(
            PoseStamped,
            '/goal_pose/goal',
            self.go_to_goal_pose,
            10)

        # Keep track of time for clearing the costmaps
        self.current_time = self.get_clock().now().nanoseconds
        self.last_time = self.current_time
        self.dt = (self.current_time - self.last_time) * 1e-9

        # Clear the costmap every X.X seconds when the robot is not making forward progress
        self.costmap_clearing_period = COSTMAP_CLEARING_PERIOD

        # Launch the ROS 2 Navigation Stack
        self.navigator = BasicNavigator()

        # Wait for navigation to fully activate
        self.navigator.waitUntilNav2Active()

    def go_to_goal_pose(self, msg):
        """Go to goal pose."""
        global NAV_IN_PROGRESS, STOP_NAVIGATION_NOW  # pylint: disable=global-statement

        # Clear all costmaps before sending to a goal
        self.navigator.clearAllCostmaps()

        # Go to the goal pose
        self.navigator.goToPose(msg)

        # As long as the robot is moving to the goal pose
        while rclpy.ok() and not self.navigator.isTaskComplete():
            # Get the feedback message
            feedback = self.navigator.getFeedback()

            if feedback:
                # Publish the estimated time of arrival in seconds
                estimated_time_of_arrival = f"{Duration.from_msg(
                    feedback.estimated_time_remaining).nanoseconds / 1e9:.0f}"
                msg_eta = String()
                msg_eta.data = str(estimated_time_of_arrival)
                self.publisher_eta.publish(msg_eta)

                # Publish the goal status
                msg_status = String()
                msg_status.data = "IN_PROGRESS"
                self.publisher_status.publish(msg_status)
                NAV_IN_PROGRESS = True

                # Clear the costmap at the desired frequency
                # Get the current time
                self.current_time = self.get_clock().now().nanoseconds

                # How much time has passed in seconds since the last costmap clearing
                self.dt = (self.current_time - self.last_time) * 1e-9

                # If we are making no forward progress after X seconds, clear all costmaps
                if not MOVING_FORWARD and self.dt > self.costmap_clearing_period:
                    self.navigator.clearAllCostmaps()
                    self.last_time = self.current_time

            # Stop the robot if necessary
            if STOP_NAVIGATION_NOW:
                self.navigator.cancelTask()
                self.get_logger().info('Navigation cancellation request fulfilled...')

            time.sleep(0.1)

        # Reset the variable values
        STOP_NAVIGATION_NOW = False
        NAV_IN_PROGRESS = False

        # Publish the final goal status
        result = self.navigator.getResult()
        msg_status = String()

        if result == TaskResult.SUCCEEDED:
            self.get_logger().info('Successfully reached the goal!')
            msg_status.data = "SUCCEEDED"
            self.publisher_status.publish(msg_status)

        elif result == TaskResult.CANCELED:
            self.get_logger().info('Goal was canceled!')
            msg_status.data = "CANCELED"
            self.publisher_status.publish(msg_status)

        elif result == TaskResult.FAILED:
            self.get_logger().info('Goal failed!')
            msg_status.data = "FAILED"
            self.publisher_status.publish(msg_status)

        else:
            self.get_logger().info('Goal has an invalid return status!')
            msg_status.data = "INVALID"
            self.publisher_status.publish(msg_status)


class GetStopNavigationSignal(Node):
    """This class subscribes to a Boolean flag that tells the
       robot to stop navigation.
    """

    def __init__(self):
        """Constructor."""
        # Initialize the class using the constructor
        super().__init__('get_stop_navigation_signal')

        # Create a subscriber
        # This node subscribes to messages of type std_msgs/Bool
        self.subscription_stop_navigation = self.create_subscription(
            Bool,
            '/stop/navigation/go_to_goal_pose',
            self.set_stop_navigation,
            10)

    def set_stop_navigation(self, msg):
        """Determine if the robot needs to stop, and adjust the variable value accordingly."""
        global STOP_NAVIGATION_NOW  # pylint: disable=global-statement

        if NAV_IN_PROGRESS and msg.data:
            STOP_NAVIGATION_NOW = msg.data
            self.get_logger().info('Navigation cancellation request received by ROS 2...')


class GetCurrentVelocity(Node):
    """This class subscribes to the current velocity and determines if the robot
       is making forward progress.
    """

    def __init__(self):
        """Constructor."""
        # Initialize the class using the constructor
        super().__init__('get_current_velocity')

        # Create a subscriber
        # This node subscribes to messages of type geometry_msgs/Twist
        self.subscription_current_velocity = self.create_subscription(
            Twist,
            '/cmd_vel',
            self.get_current_velocity,
            1)

    def get_current_velocity(self, msg):
        """Get the current velocity, and determine if the robot is making forward progress."""
        global MOVING_FORWARD  # pylint: disable=global-statement

        linear_x_value = msg.linear.x

        if linear_x_value <= 0.0:
            MOVING_FORWARD = False
        else:
            MOVING_FORWARD = True


def main(args=None):
    """Main function to initialize and run the ROS 2 nodes."""
    # Initialize the rclpy library
    rclpy.init(args=args)

    try:
        # Create the nodes
        go_to_goal_pose = GoToGoalPose()
        get_stop_navigation_signal = GetStopNavigationSignal()
        get_current_velocity = GetCurrentVelocity()

        # Set up multithreading
        executor = MultiThreadedExecutor()
        executor.add_node(go_to_goal_pose)
        executor.add_node(get_stop_navigation_signal)
        executor.add_node(get_current_velocity)

        try:
            # Spin the nodes to execute the callbacks
            executor.spin()
        finally:
            # Shutdown the nodes
            executor.shutdown()
            go_to_goal_pose.destroy_node()
            get_stop_navigation_signal.destroy_node()
            get_current_velocity.destroy_node()

    finally:
        # Shutdown the ROS client library for Python
        rclpy.shutdown()


if __name__ == '__main__':
    main()

Save the code, and close it.

Our navigation script, nav_to_pose.py, works as follows:

It sets up three ROS 2 nodes.

The GoToGoalPose node:

  • Subscribes to goal poses and initiates navigation.
  • Publishes estimated time of arrival (ETA) and navigation status.
  • Handles navigation feedback and result processing.
  • Implements costmap clearing to handle dynamic obstacles.

The GetStopNavigationSignal node allows external cancellation of navigation tasks.

The GetCurrentVelocity node monitors if the robot is moving forward, which is used for costmap clearing decisions.

The script uses a multithreaded executor to run all nodes concurrently.

It implements error handling and proper shutdown procedures.

Key features include:

  • Dynamic obstacle handling through costmap clearing.
  • Real-time status and ETA updates.
  • Ability to cancel navigation tasks.
  • Robust error handling and graceful shutdown.

Once we area ready to test the navigation, we will send a goal pose from the command line:

ros2 topic pub /goal_pose/goal geometry_msgs/PoseStamped "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: 'map'}, pose: {position: {x: 2.0, y: 2.0, z: 0.0}, orientation: {x: 0.0, y: 0.0, z: 0.7071068, w: 0.7071068}}}" --once

Alternatively, we can use a script to send the goal. Let’s create a script called test_nav_to_pose.py:

In the same directory, create and open the file:

touch test_nav_to_pose.py

Add this code.

#!/usr/bin/env python3
"""
ROS 2 node for sending a robot to different tables based on user input.

This script creates a ROS 2 node that continuously prompts the user to select a table
number (1-5) in the cafe world and publishes the corresponding goal pose for the
robot to navigate to. The script will keep running until the user enters 'q' or presses Ctrl+C.

Publishing Topics:
    /goal_pose/goal (geometry_msgs/PoseStamped): The desired goal pose for the robot

:author: Addison Sears-Collins
:date: December 5, 2024
"""

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from yahboom_rosmaster_navigation.posestamped_msg_generator import PoseStampedGenerator


class GoalPublisher(Node):
    """
    ROS 2 node for publishing goal poses to different tables.

    This node allows users to input table numbers and publishes the corresponding
    goal poses for robot navigation.

    Attributes:
        publisher: Publisher object for goal poses
        locations (dict): Dictionary containing x,y coordinates for each table
        pose_generator: Generator for PoseStamped messages
    """

    def __init__(self):
        """
        Initialize the GoalPublisher node.

        Sets up the publisher for goal poses and defines the table locations.
        """
        super().__init__('goal_publisher')

        # Create a publisher that will send goal poses to the robot
        self.publisher = self.create_publisher(PoseStamped, '/goal_pose/goal', 10)
        self.get_logger().info('Goal Publisher node has been initialized')

        # Initialize the PoseStamped message generator
        self.pose_generator = PoseStampedGenerator('pose_generator')

        # Dictionary storing the x,y coordinates for each table
        self.locations = {
            '1': {'x': -0.96, 'y': -0.92},   # Table 1 location
            '2': {'x': 1.16, 'y': -4.23},    # Table 2 location
            '3': {'x': 0.792, 'y': -8.27},   # Table 3 location
            '4': {'x': -3.12, 'y': -7.495},  # Table 4 location
            '5': {'x': -2.45, 'y': -3.55}    # Table 5 location
        }

    def publish_goal(self, table_number):
        """
        Publish a goal pose for the selected table.

        Creates and publishes a PoseStamped message with the coordinates
        corresponding to the selected table number.

        Args:
            table_number (str): The selected table number ('1' through '5')
        """
        # Get the coordinates for the selected table
        x = self.locations[table_number]['x']
        y = self.locations[table_number]['y']

        # Create the PoseStamped message using the generator
        # Using default orientation (facing forward) and z=0.0
        pose_msg = self.pose_generator.create_pose_stamped(
            x=x,
            y=y,
            z=0.0,
            qx=0.0,
            qy=0.0,
            qz=0.0,
            qw=1.0,
            frame_id='map'
        )

        # Publish the goal pose
        self.publisher.publish(pose_msg)
        self.get_logger().info(f'Goal pose published for table {table_number}')

    def run_interface(self):
        """
        Run the user interface for table selection.

        Continuously prompts the user to select a table number and publishes
        the corresponding goal pose. Exits when user enters 'q' or Ctrl+C.
        """
        while True:
            # Display available tables
            print("\nAVAILABLE TABLES:")
            for table in self.locations:
                print(f"Table {table}")

            # Get user input
            user_input = input('\nEnter table number (1-5) or "q" to quit: ').strip()

            # Check if user wants to quit
            if user_input.lower() == 'q':
                break

            # Validate and process user input
            if user_input in self.locations:
                self.publish_goal(user_input)
            else:
                print("Invalid table number! Please enter a number between 1 and 5.")


def main(args=None):
    """
    Main function to initialize and run the ROS 2 node.

    Args:
        args: Command-line arguments (default: None)
    """
    # Initialize ROS 2
    rclpy.init(args=args)

    # Create and run the node
    goal_publisher = GoalPublisher()

    try:
        goal_publisher.run_interface()
    except KeyboardInterrupt:
        print("\nShutting down...")
    finally:
        # Clean up
        goal_publisher.destroy_node()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

Save the code, and close it.

This Python script creates a ROS 2 node that lets you send a robot to different predefined table locations in a cafe environment by entering table numbers (1-5) through a command-line interface. When you select a table number, the code generates and publishes a PoseStamped message containing the x,y coordinates for that table location, which tells the robot where to go. The program runs in a continuous loop asking for your table selections until you quit by entering ‘q’ or pressing Ctrl+C, allowing you to send the robot to multiple locations in sequence.

How to Determine Coordinates for Goals

To determine coordinates for goal locations, open a fresh terminal window, and type:

ros2 topic echo /clicked_point

At the top of RViz, click the “Publish Point” button.

Click near the first table, and record the coordinates of that point, which will appear in the terminal window.

  • table_1: (x=-0.96, y=-0.92)

Now do the same for the other four tables. I will go in a clockwise direction:

  • table_2: (x=1.16, y=-4.23)
  • table_3: (x=0.792, y=-8.27)
  • table_4: (x=-3.12, y=-7.495)
  • table_5: (x=-2.45, y=-3.55)

Press CTRL + C when you are done determining the goal locations.

Create an Assisted Teleoperation Script

In this section, we will create an Assisted Teleoperation script for our robot. Assisted Teleoperation allows manual control of the robot while automatically stopping it when obstacles are detected. This feature helps prevent collisions when you are manually sending velocity commands to steer the robot..

You can find a bare-bones example of assisted teleoperation here on the Nav2 GitHub, but we will go beyond that script to make it useful for a real-world environment.

We’ll begin by creating a Python script that implements this functionality:

Open a terminal and navigate to the scripts folder:

cd ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_navigation/scripts

Create a new file for assisted teleoperation:

touch assisted_teleoperation.py

Add the provided code to this file.

#!/usr/bin/env python3
"""
Assisted Teleop Node for ROS 2 Navigation

This script implements an Assisted Teleop node that interfaces with the Nav2 stack.
It manages the lifecycle of the AssistedTeleop action, handles cancellation requests,
and periodically clears costmaps to remove temporary obstacles.

Subscription Topics:
    /cmd_vel_teleop (geometry_msgs/Twist): Velocity commands for assisted teleop
    /cancel_assisted_teleop (std_msgs/Bool): Cancellation requests for assisted teleop

Parameters:
    ~/costmap_clear_frequency (double): Frequency in Hz for costmap clearing. Default: 2.0

:author: Addison Sears-Collins
:date: December 5, 2024
"""

import rclpy
from rclpy.node import Node
from rclpy.exceptions import ROSInterruptException
from geometry_msgs.msg import Twist
from std_msgs.msg import Bool
from nav2_simple_commander.robot_navigator import BasicNavigator
from rcl_interfaces.msg import ParameterDescriptor


class AssistedTeleopNode(Node):
    """
    A ROS 2 node for managing Assisted Teleop functionality.
    """

    def __init__(self):
        """Initialize the AssistedTeleopNode."""
        super().__init__('assisted_teleop_node')

        # Declare and get parameters
        self.declare_parameter(
            'costmap_clear_frequency',
            2.0,
            ParameterDescriptor(description='Frequency in Hz for costmap clearing')
        )
        clear_frequency = self.get_parameter('costmap_clear_frequency').value

        # Initialize the BasicNavigator for interfacing with Nav2
        self.navigator = BasicNavigator('assisted_teleop_navigator')

        # Create subscribers for velocity commands and cancellation requests
        self.cmd_vel_sub = self.create_subscription(
            Twist, '/cmd_vel_teleop', self.cmd_vel_callback, 10)
        self.cancel_sub = self.create_subscription(
            Bool, '/cancel_assisted_teleop', self.cancel_callback, 10)

        # Initialize state variables
        self.assisted_teleop_active = False
        self.cancellation_requested = False

        # Create a timer for periodic costmap clearing with configurable frequency
        period = 1.0 / clear_frequency
        self.clear_costmaps_timer = self.create_timer(period, self.clear_costmaps_callback)

        self.get_logger().info(
            f'Assisted Teleop Node initialized with costmap clearing frequency: {
                clear_frequency} Hz')

        # Wait for navigation to fully activate.
        self.navigator.waitUntilNav2Active()

    def cmd_vel_callback(self, twist_msg: Twist) -> None:
        """Process incoming velocity commands and activate assisted teleop if needed."""
        # Reset cancellation flag when new velocity commands are received
        if (abs(twist_msg.linear.x) > 0.0 or
            abs(twist_msg.linear.y) > 0.0 or
                abs(twist_msg.angular.z) > 0.0):
            self.cancellation_requested = False

        if not self.assisted_teleop_active and not self.cancellation_requested:
            if (abs(twist_msg.linear.x) > 0.0 or
                abs(twist_msg.linear.y) > 0.0 or
                    abs(twist_msg.angular.z) > 0.0):
                self.start_assisted_teleop()

    def start_assisted_teleop(self) -> None:
        """Start the Assisted Teleop action with indefinite duration."""
        self.assisted_teleop_active = True
        self.cancellation_requested = False
        self.navigator.assistedTeleop(time_allowance=0)  # 0 means indefinite duration
        self.get_logger().info('AssistedTeleop activated with indefinite duration')

    def cancel_callback(self, msg: Bool) -> None:
        """Handle cancellation requests for assisted teleop."""
        if msg.data and self.assisted_teleop_active and not self.cancellation_requested:
            self.cancel_assisted_teleop()

    def cancel_assisted_teleop(self) -> None:
        """Cancel the currently running Assisted Teleop action."""
        if self.assisted_teleop_active:
            self.navigator.cancelTask()
            self.assisted_teleop_active = False
            self.cancellation_requested = True
            self.get_logger().info('AssistedTeleop cancelled')

    def clear_costmaps_callback(self) -> None:
        """Periodically clear all costmaps to remove temporary obstacles."""
        if not self.assisted_teleop_active:
            return

        self.navigator.clearAllCostmaps()
        self.get_logger().debug('Costmaps cleared')


def main():
    """Initialize and run the AssistedTeleopNode."""
    rclpy.init()
    node = None

    try:
        node = AssistedTeleopNode()
        rclpy.spin(node)

    except KeyboardInterrupt:
        if node:
            node.get_logger().info('Node shutting down due to keyboard interrupt')
    except ROSInterruptException:
        if node:
            node.get_logger().info('Node shutting down due to ROS interrupt')
    finally:
        if node:
            node.cancel_assisted_teleop()
            node.navigator.lifecycleShutdown()
        rclpy.shutdown()


if __name__ == '__main__':
    main()

This script:

  • Subscribes to the /cmd_vel_teleop topic to receive velocity commands from the user.
  • Manages the Assisted Teleoperation action in the Nav2 stack.
  • Handles cancellation requests for Assisted Teleoperation.
  • Periodically clears costmaps to remove temporary obstacles.
  • The Nav2 stack processes these commands and publishes the resulting safe velocity commands to the /cmd_vel topic.

It’s important to note that /cmd_vel_teleop is the input topic for user commands, while /cmd_vel is the output topic that sends commands to the robot after processing for obstacle avoidance.

Save and close the file.

Add the Go to Goal Pose and Assisted Teleop Scripts to the Bringup Launch File

Open a terminal window, and type:

cd ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_bringup/launch

Open rosmaster_x3_navigation.launch.py.

Add our two new nodes:

   start_nav_to_pose_cmd = Node(
        package='yahboom_rosmaster_navigation',
        executable='nav_to_pose.py',
        output='screen',
        parameters=[{'use_sim_time': use_sim_time}]
    )

   start_assisted_teleop_cmd = Node(
        package='yahboom_rosmaster_navigation',
        executable='assisted_teleoperation.py',
        output='screen',
        parameters=[{'use_sim_time': use_sim_time}]
    )

….

   ld.add_action(start_assisted_teleop_cmd)
   ld.add_action(start_nav_to_pose_cmd)

Edit package.xml

To ensure our package has all the necessary dependencies, we need to edit the package.xml file:

Navigate to the package directory:

cd ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_navigation/

Make sure your package.xml looks like this:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>yahboom_rosmaster_navigation</name>
  <version>0.0.0</version>
  <description>Navigation package for ROSMASTER series robots by Yahboom</description>
  <maintainer email="automaticaddison@todo.com">ubuntu</maintainer>
  <license>BSD-3-Clause</license>

  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>ament_cmake_python</buildtool_depend>

  <depend>rclcpp</depend>
  <depend>rclpy</depend>
  <depend>builtin_interfaces</depend>
  <depend>geometry_msgs</depend>
  <depend>navigation2</depend>
  <depend>nav2_bringup</depend>
  <depend>nav2_simple_commander</depend>
  <depend>slam_toolbox</depend>
  <depend>std_msgs</depend>
  <depend>tf2_ros</depend>
  <depend>tf_transformations</depend>


  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

These dependencies are required for the Assisted Teleoperation functionality to work properly with the Nav2 stack.

Save and close the file.

Edit CMakeLists.txt

To properly build and install all our Python scripts, including the new Assisted Teleoperation script, we need to update the CMakeLists.txt file:

Navigate to the package directory if you’re not already there:

cd ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_navigation/

Open the CMakeLists.txt file:

cmake_minimum_required(VERSION 3.8)
project(yahboom_rosmaster_navigation)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(navigation2 REQUIRED)
find_package(nav2_bringup REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(slam_toolbox REQUIRED)
find_package(std_msgs REQUIRED)
find_package(tf2_ros REQUIRED)

add_executable(cmd_vel_relay src/cmd_vel_relay_node.cpp)
ament_target_dependencies(cmd_vel_relay
  rclcpp
  geometry_msgs
)

install(TARGETS
  cmd_vel_relay
  DESTINATION lib/${PROJECT_NAME}
)

install (
  DIRECTORY config maps rviz
  DESTINATION share/${PROJECT_NAME}
)

# Install Python modules
ament_python_install_package(${PROJECT_NAME})

# Install Python executables
install(PROGRAMS
  scripts/assisted_teleoperation.py
  scripts/nav_to_pose.py
  scripts/test_nav_to_pose.py
  DESTINATION lib/${PROJECT_NAME}
)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

ament_package()

Make sure it looks like this.

These changes to CMakeLists.txt will ensure that all our scripts, including the new Assisted Teleoperation script, are properly included when building and installing the package.

Build the Package

Now that we’ve created our scripts and updated the necessary configuration files, it’s time to build our package to incorporate all these changes. This process compiles our code and makes it ready for use in the ROS 2 environment.

Open a terminal window, and type:

cd ~/ros2_ws
rosdep install --from-paths src --ignore-src -y -r
colcon build 
source ~/.bashrc

Launch the Robot 

Open a new terminal window, and type the following command to launch the robot. Wait until Gazebo and RViz come fully up, which can take up to 30 seconds:

nav

or

bash ~/ros2_ws/src/yahboom_rosmaster/yahboom_rosmaster_bringup/scripts/rosmaster_x3_navigation.sh 

Run Each Script

Let’s test each of the navigation scripts we’ve created. I recommend rerunning the navigation launch file for each script you test below.

You might run into this error from time to time:

[RTPS_TRANSPORT_SHM Error] Failed init_port fastrtps_port7000: open_and_lock_file failed -> Function open_port_internal

This error typically occurs when nodes didn’t close cleanly after you pressed CTRL + C. To fix this, I have an aggressive cleanup command that you can run:

sudo pkill -9 -f "ros2|gazebo|gz|nav2|amcl|bt_navigator|nav_to_pose|rviz2|python3|assisted_teleop|cmd_vel_relay|robot_state_publisher|joint_state_publisher|move_to_free|mqtt|autodock|cliff_detection|moveit|move_group|basic_navigator"

You can also run it without sudo:

pkill -9 -f "ros2|gazebo|gz|nav2|amcl|bt_navigator|nav_to_pose|rviz2|python3|assisted_teleop|cmd_vel_relay|robot_state_publisher|joint_state_publisher|move_to_free|mqtt|autodock|cliff_detection|moveit|move_group|basic_navigator"

If that doesn’t work, this command will guarantee to terminate all zombie nodes and processes.

sudo reboot

Go to Goal Pose

Af the the robot and its controllers come fully up, run these commands in separate terminal windows:

ros2 topic echo /goal_pose/eta
ros2 topic echo /goal_pose/status

In another terminal, publish a goal pose:

ros2 topic pub /goal_pose/goal geometry_msgs/PoseStamped "{header: {stamp: {sec: 0, nanosec: 0}, frame_id: 'map'}, pose: {position: {x: 2.0, y: 2.0, z: 0.0}, orientation: {x: 0.0, y: 0.0, z: 0.7071068, w: 0.7071068}}}" 

Press CTRL + C when the robot starts moving.

You will see the robot move to the (x=2.0, y=2.0) position.

Alternatively, you can send the robot to different tables in the cafe world using our test script:

ros2 run yahboom_rosmaster_navigation test_nav_to_pose.py --ros-args -p use_sim_time:=true
1-go-to-table-2
Robot going from Table 1 to Table 2

Deending on your CPU, you may have to send commands more than once. I had some issues where some messages simply didn’t get sent the first time I sent a table number.

If you want to cancel an ongoing goal, just type:

ros2 topic pub /stop/navigation/go_to_goal_pose std_msgs/msg/Bool "data: true"

Then press CTRL + C.

Press CTRL + C to close these nodes when you are done.

Assisted Teleoperation

Launch teleoperation in another terminal window.

rqt_robot_steering

In the rqt_robot_steering window, set the topic to /cmd_vel_teleop. Use the sliders to control the robot’s movement. Watch what happens when you teleoperate the robot towards an obstacle like a wall. You will notice the robot stops just before the obstacle.

3-assisted-teleop

Stop the robot, by clicking the Stop button.

Close the rqt_robot_steering tool by pressing CTRL + C.

Cancel Assisted Teleoperation

To cancel assisted teleoperation, publish a cancellation message:

ros2 topic pub /cancel_assisted_teleop std_msgs/Bool "data: true" --once

To restart assisted teleoperation, launch the rqt_robot_steering tool again, and move the sliders:

rqt_robot_steering

Press CTRL + C to close these nodes when you are done.

In this tutorial, we explored the Nav2 Simple Commander API and implemented various navigation functionalities for our robot, including basic goal navigation, predefined locations, and assisted teleoperation. You can find other code examples here at the Nav2 GitHub.

These implementations demonstrate the versatility of Nav2, combining autonomous navigation with user input for flexible control in various scenarios. The skills you’ve gained form a strong foundation for developing more complex robotic systems, applicable to real-world robots with minimal modifications.

How to Create Unit Tests with Pytest – ROS 2 Jazzy

Imagine you’re building a complex robot using ROS 2, the latest version of the Robot Operating System. Your robot is made up of many different parts, each controlled by a piece of software. Now, how do you make sure all these pieces work correctly? That’s where unit testing comes in.

Unit testing is like doing a health check-up for each part of your robot’s software. It’s a way to automatically test small pieces of your code to make sure they’re doing exactly what they’re supposed to do. 

Unit tests are essential tools in robotics software development for two key reasons:

  • Early Bug Detection: Automate the testing of code to ensure that changes or updates don’t introduce bugs.
  • Improved Code Design: Writing testable code leads to cleaner, more modular software.

In this tutorial, I’ll show you how to use a tool called Pytest to write and run these unit tests. With Pytest, you can write tests for individual parts of your code, such as functions and classes, to make sure each part behaves as expected.

Pytest is designed primarily for Python code, so it’s commonly used for testing Python-based ROS 2 packages. For C++ code in ROS 2, other testing frameworks, like GTest, are typically used instead.

Prerequisites

You can find all the code here on GitHub.

Create a Unit Test 

Let’s write a basic unit test for our ROS 2 publisher node. This test will verify three key things: 

  1. that our node is created correctly
  2. that its message counter works properly
  3. that it formats messages as expected. 

We’ll create a Python test file that uses Pytest to run these checks automatically.

Open a terminal window, and move to your workspace.

cd ~/ros2_ws/
colcon build && source ~/.bashrc
cd ~/ros2_ws/src/ros2_fundamentals_examples/
mkdir -p test/pytest

Create a test file test/pytest/test_publisher.py:

#!/usr/bin/env python3
"""
Test suite for the ROS2 minimal publisher node.

This script contains unit tests for verifying the functionality of a minimal ROS2 publisher.
It tests the node creation, message counter increment, and message content formatting.

Subscription Topics:
    None

Publishing Topics:
    /py_example_topic (std_msgs/String): Example messages with incrementing counter

:author: Addison Sears-Collins
:date: November 6, 2024
"""

import pytest
import rclpy
from std_msgs.msg import String
from ros2_fundamentals_examples.py_minimal_publisher import MinimalPyPublisher


def test_publisher_creation():
    """
    Test if the publisher node is created correctly.

    This test verifies:
    1. The node name is set correctly
    2. The publisher object exists
    3. The topic name is correct

    :raises: AssertionError if any of the checks fail
    """
    # Initialize ROS2 communication
    rclpy.init()
    try:
        # Create an instance of our publisher node
        node = MinimalPyPublisher()

        # Test 1: Verify the node has the expected name
        assert node.get_name() == 'minimal_py_publisher'

        # Test 2: Verify the publisher exists and has the correct topic name
        assert hasattr(node, 'publisher_1')
        assert node.publisher_1.topic_name == '/py_example_topic'
    finally:
        # Clean up ROS2 communication
        rclpy.shutdown()


def test_message_counter():
    """
    Test if the message counter increments correctly.

    This test verifies that the counter (node.i) increases by 1 after
    each timer callback execution.

    :raises: AssertionError if the counter doesn't increment properly
    """
    rclpy.init()
    try:
        node = MinimalPyPublisher()
        initial_count = node.i
        node.timer_callback()
        assert node.i == initial_count + 1
    finally:
        rclpy.shutdown()


def test_message_content():
    """
    Test if the message content is formatted correctly.

    This test verifies that the message string is properly formatted
    using an f-string with the current counter value.

    :raises: AssertionError if the message format doesn't match expected output
    """
    rclpy.init()
    try:
        node = MinimalPyPublisher()
        # Set counter to a known value for testing
        node.i = 5
        msg = String()
        # Using f-string instead of % formatting
        msg.data = f'Hello World: {node.i}'
        assert msg.data == 'Hello World: 5'
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    pytest.main(['-v'])

Edit CMakeLists.txt

Now open CMakeLists.txt, and replace the if(BUILD_TESTING) block with this code:

if(BUILD_TESTING)
  #find_package(ament_lint_auto REQUIRED)
  find_package(ament_cmake_pytest REQUIRED)
  #set(ament_cmake_copyright_FOUND TRUE)
  #set(ament_cmake_cpplint_FOUND TRUE)
  #ament_lint_auto_find_test_dependencies()

  ament_add_pytest_test(minimal_publisher_test test/pytest/test_publisher.py
   TIMEOUT 60
  )
endif()

Edit the package.xml File

We need to add this to the package.xml so that our tests are discoverable to cmake:

<test_depend>ament_cmake_pytest</test_depend>

Save the file, and close it.

Build and Run

Compile and run the tests.

cd ~/ros2_ws
rosdep install --from-paths src --ignore-src -r -y
colcon build --packages-select ros2_fundamentals_examples
source ~/.bashrc
colcon test --packages-select ros2_fundamentals_examples
1-colcon-test

To get more detail, you could also have done:

colcon test --packages-select ros2_fundamentals_examples --event-handlers console_direct+

When we run our unit tests with this command above (Starting >>> ros2_fundamentals_examples), it first starts by loading our ROS 2 package.

The system then collects and runs our three test cases, shown as the three passing tests in the output (test/pytest/test_publisher.py …). These dots represent successful runs of our three tests – publisher node creation, message counter, and message content.

Your test results should show all tests passed successfully (============================== 3 passed in 0.28s ===============================). 

My final summary shows that our single test suite (1/1 Test #1: minimal_publisher_test) passed successfully in 0.76 seconds, with CMake reporting 100% success rate as no tests failed.

Now once that is finished, check the results:

colcon test-result --all

To see which test cases failed, type:

colcon test-result --all --verbose

That’s it.

Keep building!

How to Create a ROS 2 Python Publisher – Jazzy

In this tutorial, we will create a Python publisher for ROS 2. Knowing how to write a publisher node is one of the most important skills in robotics software engineering. 

In ROS 2 (Robot Operating System 2), a Python publisher is a script written in Python that sends messages across the ROS network to other parts of the system.

On a real robot, you will write many different publishers that publish data that gets shared by the different components of a robot: strings, LIDAR scan readings, ultrasonic sensor readings, camera frames, 3D point cloud data, integers, float values, battery voltage readings, odometry data, and much more. 

The official instructions for creating a publisher are here, but I will walk you through the entire process, step by step.

Follow along with me click by click, keystroke by keystroke.

We will be following the ROS 2 Python Style Guide.

Let’s get started!

Prerequisites

Understand Important ROS 2 Vocabulary

Before we write our first publisher node, let’s understand some ROS 2 vocabulary.

What Are Publisher Nodes?

In a complex robot, you are going to have many pieces of code in your system that need to communicate with each other. 

For example, the code that is responsible for making navigation decisions needs to subscribe to laser scan messages from the LIDAR to be able to avoid obstacles properly. 

You might have another piece of code that reads from the robot’s battery and publishes the percentage battery remaining.

In ROS 2, the parts of your system that are responsible for publishing messages to the rest of the robot – like the distance to an obstacle – are known as publisher nodes. These nodes are usually written in C++ and Python.

You have many different types of built-in messages that you can publish over ROS 2. 

For example, if you want to send distance information from a LIDAR sensor, you have a special sensor message called LaserScan. You can see the definition of this message type on this link.

For battery health, you even have a special sensor message type for that called BatteryState.

You also have other sensor message types, which you can see on this list.

In a previous tutorial, we published the text message “Hello World” from a publisher node called talker. This message type is a string, which is one of the standard message types in ROS 2. 

On this link, you can see we have standard message types for integers, floating-point numbers, booleans (like True and False), and many more.

The beauty of ROS 2 is that there is a message type for 95% of robotic scenarios. For the other 5%, you can create your own custom message type. I will show you how to do that in a future tutorial.

What Are Subscriber Nodes?

The parts of your system that subscribe to messages sent from publishers are known as subscriber nodes. 

Subscriber nodes are pieces of code, usually written in C++ or Python, that are responsible for receiving information. 

For example, the robot’s path planning code could be a subscriber node since it subscribes to LIDAR data to plan a collision-free path from an initial location to a goal location.

What Are Messages?

Messages are the data packets that the publisher nodes send out. They can include information like LIDAR scan data, camera images, ultrasonic sensor readings, numbers, text, or even complex data structures. 

You can find a master list of the different types of messages by going here to the ROS 2 documentation. You even have other message types to represent things like bounding boxes for object detection.

What Are Topics?

Topics are the named channels over which messages are sent out. 

For example, in many real-world mobile robotics applications where you are using a LIDAR, messages about the distance to obstacles are sent on a topic named “/scan”. By convention, the name of a topic in ROS 2 has a leading forward slash before the name.

Topics allow different parts of a robot or rather different nodes in a robotic system to communicate with each other by subscribing to and publishing messages on these named channels.

Write the Code

Now that we have covered some fundamental terminology, let’s write some code.

cd ~/ros2_ws/ && code .

First, right-click on src/ros2_fundamentals/

Type “ros2_fundamentals_examples” to create a new folder for our Python script.

1-scripts-folder

Pay careful attention to the name of the folder where we will house our Python scripts. The name of this folder must have the same name as the package. 

Right-click on the ros2_fundamentals_examples folder to create a new publisher file called “py_minimal_publisher.py”.

Type the following code inside py_minimal_publisher.py:

#! /usr/bin/env python3

"""
Description:
    This ROS 2 node periodically publishes "Hello World" messages on a topic.
    It demonstrates basic ROS concepts such as node creation, publishing, and
    timer usage.
-------
Publishing Topics:
    The channel containing the "Hello World" messages
    /py_example_topic - std_msgs/String
-------
Subscription Topics:
    None
-------
Author: Addison Sears-Collins
Date: November 4, 2024
"""

import rclpy  # Import the ROS 2 client library for Python
from rclpy.node import Node  # Import the Node class for creating ROS 2 nodes

from std_msgs.msg import String  # Import the String message type for publishing


class MinimalPyPublisher(Node):
    """Create MinimalPyPublisher node.

    """

    def __init__(self):
        """ Create a custom node class for publishing messages

        """

        # Initialize the node with a name
        super().__init__('minimal_py_publisher')

        # Creates a publisher on the topic "topic" with a queue size of 10 messages
        self.publisher_1 = self.create_publisher(String, '/py_example_topic', 10)

        # Create a timer with a period of 0.5 seconds to trigger the callback function
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)

        # Initialize a counter variable for message content
        self.i = 0

    def timer_callback(self):
        """Callback function executed periodically by the timer.

        """
        # Create a new String message object
        msg = String()

        # Set the message data with a counter
        msg.data = 'Hello World: %d' % self.i

        # Publish the message on the topic
        self.publisher_1.publish(msg)

        # Log a message indicating that the message has been published
        self.get_logger().info('Publishing: "%s"' % msg.data)

        # Increment the counter for the next message
        self.i = self.i + 1


def main(args=None):
    """Main function to start the ROS 2 node.

    Args:
        args (List, optional): Command-line arguments. Defaults to None.
    """

    # Initialize ROS 2 communication
    rclpy.init(args=args)

    # Create an instance of the MinimalPublisher node
    minimal_py_publisher = MinimalPyPublisher()

    # Keep the node running and processing events.
    rclpy.spin(minimal_py_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_py_publisher.destroy_node()

    # Shutdown ROS 2 communication
    rclpy.shutdown()


if __name__ == '__main__':
    # Execute the main function if the script is run directly
    main()

I added detailed comments to the code so you can understand what each piece is doing.

To generate the comments for each class and function, you follow these steps for the autoDocstring package.

Your cursor must be on the line directly below the definition of the function or class to generate all the comments.

Then press enter after opening docstring with triple quotes (“””).

What we are going to do in this node is publish the string “Hello World” to a topic named /py_example_topic. The string message will also contain a counter that keeps track of how many times the message has been published.

We chose the name /py_example_topic for the topic, but you could have chosen any name.

Create the __init__.py file

Now, we need to configure our package so that ROS 2 can discover this Python node we just created. To do that, we need to create a special initialization file.

Right-click on the name of the ros2_ws/src/ros2_fundamentals_examples/ros2_fundamentals_examples folder, and create an empty script called

__init__.py

Here is what the file should look like:

2-init-py

The presence of _ _init_ _.py explicitly designates a directory as a Python package. This enables Python’s import machinery to recognize and treat it as a cohesive collection of Python code.

Create a README.md

Now let’s create a README.md file. A README.md file is a plain text file that serves as an introduction and explanation for a project, software, or package. It’s like a welcome mat for anyone encountering your work, providing essential information and guidance to get them started.

Right-click on the package (…/src/ros2_fundamentals_examples/), and create a new file named README.md.

3-readme-file

You can find a syntax guide on how to write a README.md file here on GitHub.

To see what the README.md file looks like, you can right-click on README.md on the left pane and click “Open Preview”.

# ROS 2 Fundamentals Examples

This package contains examples demonstrating fundamental ROS 2 concepts and patterns.

## Description

The package includes minimalist ROS 2 code to demonstrate important ROS 2 concepts and patterns.
- Following ROS 2 Python style guidelines

## Prerequisites

- ROS 2 installed
- Python 3
- Created ROS 2 workspace (`ros2_ws`)

## Author

Addison Sears-Collins

Modify the package.xml File

Now let’s open the package.xml file. Make sure it looks like this.

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>ros2_fundamentals_examples</name>
  <version>0.0.0</version>
  <description>Basic examples demonstrating ROS 2 concepts</description>
  <maintainer email="automaticaddison@example.com">Addison Sears-Collins</maintainer>
  <license>Apache-2.0</license>
 
  <!--Specify build tools that are needed to compile the package-->
  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>ament_cmake_python</buildtool_depend>
 
  <!--Declares package dependencies that are required for building the package-->
  <depend>rclcpp</depend>
  <depend>rclpy</depend>
  <depend>std_msgs</depend>
 
  <!--Specifies dependencies that are only needed for testing the package-->
  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>
 
  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

The package.xml file is an important part of any ROS 2 package. Think of package.xml as your ROS 2 package’s identification card. Just like how your ID card contains essential information about you – your name, date of birth, ID number, and address – package.xml holds key details about your package.

Here’s a breakdown of the key elements you’ll find in a typical package.xml file. You don’t need to memorize this. Just come back to this tutorial if you are ever in doubt:

1. Basic Information:

  • name: The unique identifier for the package, often corresponding to the folder name.
  • version: The package’s version.
  • description: A brief explanation of the package’s purpose and functionality.
  • maintainer: Who is maintaining the package.
  • license: Specifies the license under which the package is distributed, indicating how others are permitted to use, modify, and redistribute the package’s code and other assets.

2. Dependencies:

  • buildtool_depend: Build tools (like compilers) needed for building the package.
  • depend: Package dependencies that are required for building the package. Since we are publishing a message of type std_msgs/String, we need to make sure we declare a dependency on the std_msgs ROS 2 package.
  • test_depend: Tools used for checking the code for bugs and errors.

3. Build Configuration:

  • export: Defines properties and settings used during package installation.
  • build_type: Specifies the build system (e.g., ament_cmake).

Modify the CMakeLists.txt File

Now let’s configure the CMakeLists.txt file. A CMakeLists.txt file in ROS 2 defines how a ROS 2 package should be built. It contains instructions for building and linking the package’s executables, libraries, and other files.
I have commented out sections which we are going to use in the future:

cmake_minimum_required(VERSION 3.8)
project(ros2_fundamentals_examples)

# Check if the compiler being used is GNU's C++ compiler (g++) or Clang.
# Add compiler flags for all targets that will be defined later in the
# CMakeLists file. These flags enable extra warnings to help catch
# potential issues in the code.
# Add options to the compilation process
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# Locate and configure packages required by the project.
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclpy REQUIRED)
find_package(std_msgs REQUIRED)

# Define a CMake variable named dependencies that lists all
# ROS 2 packages and other dependencies the project requires.
set(dependencies
  rclcpp
  std_msgs
)

# Add the specified directories to the list of paths that the compiler
# uses to search for header files. This is important for C++
# projects where you have custom header files that are not located
# in the standard system include paths.
include_directories(
  include
)

# Tells CMake to create an executable target named minimal_cpp_publisher
# from the source file src/minimal_cpp_publisher.cpp. Also make sure CMake
# knows about the program's dependencies.
#add_executable(minimal_cpp_publisher src/minimal_cpp_publisher.cpp)
#ament_target_dependencies(minimal_cpp_publisher ${dependencies})

#add_executable(minimal_cpp_subscriber src/minimal_cpp_subscriber.cpp)
#ament_target_dependencies(minimal_cpp_subscriber ${dependencies})

# Copy necessary files to designated locations in the project
install (
  DIRECTORY ros2_fundamentals_examples
  DESTINATION share/${PROJECT_NAME}
)

install(
  DIRECTORY include/
  DESTINATION include
)

# Install cpp executables
#install(
#  TARGETS
#  minimal_cpp_publisher
#  minimal_cpp_subscriber
#  DESTINATION lib/${PROJECT_NAME}
#)

# Install Python modules for import
ament_python_install_package(${PROJECT_NAME})

# Add this section to install Python scripts
install(
  PROGRAMS
  ros2_fundamentals_examples/py_minimal_publisher.py
  DESTINATION lib/${PROJECT_NAME}
)

# Automates the process of setting up linting for the package, which
# is the process of running tools that analyze the code for potential
# errors, style issues, and other discrepancies that do not adhere to
# specified coding standards or best practices.
if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

# Used to export include directories of a package so that they can be easily
# included by other packages that depend on this package.
ament_export_include_directories(include)

# Generate and install all the necessary CMake and environment hooks that
# allow other packages to find and use this package.
ament_package()

The standard sections of a CMakeLists.txt file for ROS 2 are as follows:

1. cmake_minimum_required(VERSION X.X)

Specifies the minimum required version of CMake for building the package. This is typically set to a version that is known to be compatible with ROS 2.

2. project(ros2_fundamentals_examples)

Specifies the name of the ROS 2 package. 

3. find_package(ament_cmake REQUIRED)

Finds and loads the necessary dependencies for the ROS 2 package. 

4. set(dependencies…):

Lists all the packages and libraries needed by the project.

6. include_directories(include):

Tells the compiler where to find header files for C++ code. We aren’t going to use this line yet because our publisher is written in Python.

7. add_executable(…): 

Creates an executable program using the specified source file. We will add this publisher in the next tutorial.

8. ament_target_dependencies(<program name>. ${dependencies}): 

Links the program with the required dependencies.

9. install(DIRECTORY <project_name> scripts DESTINATION share/${PROJECT_NAME}): 

Copies folders to the project’s install directory for sharing.

10. install(DIRECTORY include/ DESTINATION include): 

Installs header files to the project’s install directory.

11. install(TARGETS <program names go here separated by space> DESTINATION lib/${PROJECT_NAME}): 

Installs the built C++ programs to the project’s install directory. We will write these C++ programs later. 

12. ament_python_install_package(${PROJECT_NAME}): 

Installs Python modules for the project.

13. if(BUILD_TESTING) … endif():

Sets up code checking (linting) if testing is enabled.

14. ament_export_include_directories(include): 

Allows other packages to use this package’s header files.

15. ament_package(): 

Generates necessary files for other packages to find and use this package.

Build the Workspace

Now that we have created our script and configured our build files, we need to build everything into executables so that we can run our code.

Open a terminal window, and type:

build

If this command doesn’t work, type these commands:

echo "alias build='cd ~/dev_ws/ && colcon build && source ~/.bashrc'" >> ~/.bashrc
build

Run the Node 

Let’s run our node. Here’s the general syntax:

ros2 run <package_name> <python_script_name>.py

Here’s a breakdown of the components:

  • <package_name>: Replace this with the name of your ROS 2 package containing the Python script.
  • <python_script_name>.py: Replace this with the name of your Python script file that contains the ROS 2 node.

Note that, you can use the tab button to autocomplete a partial command. For example, type the following and then press the TAB button on your keyboard. 

ros2 run ros2_fundamentals_examples py [TAB]

After autocompletion, the command looks like this:

ros2 run ros2_fundamentals_examples py_minimal_publisher.py

Now, press Enter.

Here is what the output looks like:

4-output-terminal

Examine Common ROS 2 Commands

Topics

Open a new terminal window.

Let’s see a list of all currently active topics.

ros2 topic list
5-ros2-topic-list

We see we have three active topics:

/parameter_events and /rosout topics appear even when no nodes are actively running due to the presence of system-level components and the underlying architecture of the ROS 2 middleware. 

The /parameter_events topic facilitates communication about parameter changes, and the /rosout topic provides a centralized way to access log messages generated by different nodes within the ROS 2 network. You can ignore both topics.

/py_example_topic is the topic we created with our Python node. Let’s see what data is being published to this topic.

ros2 topic echo /py_example_topic

You can see the string message that is being published to this topic, including the counter integer we created in the Python script.

6-ros2-topic-echo

Press CTRL + C in the terminal to stop the output.

At what frequency is data being published to this topic?

ros2 topic hz /py_example_topic

Data is being published at 2Hz, or every 0.5 seconds.

7-ros2-topic-hz

Press CTRL + C in the terminal to stop the output.

What type of data is being published to this topic, and how many nodes are publishing to this topic?

ros2 topic info /py_example_topic
8-ros2-topic-info

To get more detailed information about the topic, you could have typed:

ros2 topic info /py_example_topic --verbose
9-py-example-topic-verbose

You can see a list of the ROS 2 topic commands at this page here.

Nodes

What are the currently active nodes?

ros2 node list
10-ros2-node-list

Let’s find out some more information about our node.

ros2 node info /minimal_py_publisher
11-minimal-py-publisher

Check out how all the nodes are communicating using this command:

rqt_graph

Close the Node

Now go back to the terminal where your py_minimal_publisher.py script is running and press CTRL + C to stop its execution.

To clear the terminal window, type:

clear

Congratulations! You have written your first publisher in ROS 2.

In this example, you have written a publisher to publish a basic string message. And although it is not the most exciting node, it is similar to the kinds of nodes you will write again and again over the course of your robotics career.

On a real robot, you will write many different publishers that publish data that gets shared by the different components of a robot: strings, LIDAR scan readings, ultrasonic sensor readings, camera frames, 3D point cloud data, integers, float values, battery voltage readings, odometry data, and much more. 

The code you wrote in this tutorial serves as a template for creating these more complex publishers. All publishers in ROS 2 are based on the same basic framework as the node you just wrote, py_minimal_publisher.py.

That’s it. Keep building!