How To Create an Action in ROS 2 Galactic (Advanced)

ros2-actions

In this tutorial, I will show you how to create and implement a complex action using ROS 2 Galactic, the latest version of ROS 2. The official tutorial (with a basic example) is here, and other examples are here. I will walk through all the steps below. 

The action we will create in this tutorial can be used to simulate a mobile robot connecting to a battery charging dock.

Prerequisites

You can find the files for this post here on my Google Drive.

QuickStart

Let’s take a look at the ROS 2 demo repository so we can examine the sample action they created in their official tutorial

Open a new terminal window, and clone the action_tutorials repository from GitHub.

cd ~dev_ws/src
git clone https://github.com/ros2/demos.git

You only need the action_tutorials folder (which is inside the folder named ‘demos’), so cut and paste that folder into the ~/dev_ws/src folder using your File Explorer. You can then delete the demos folder.

1-file-structure

Once you have your ~dev_ws/src/action_tutorials folder, you need to build it. Open a terminal window, and type:

cd ..
colcon build

You now have three new packages: 

  1. action_tutorials_cpp
  2. action_tutorials_py
  3. action_tutorials_interfaces

First, we will work with the action_tutorials_interfaces tutorial package.

To see if everything is working, let’s take a look at the definition of the action created in the official tutorial.

Open a new terminal window, and type:

ros2 interface show action_tutorials_interfaces/action/Fibonacci

Here is the output you should see:

2-fibonacci

Now launch the action server.

cd ~/dev_ws/src/action_tutorials/action_tutorials_py/action_tutorials_py
python3 fibonacci_action_server.py

In another terminal, send a goal to the action server.

python3 fibonacci_action_client.py

Here is the output:

3-action-server
4-action-client

Define the ConnectToChargingDock Action

Now that we see everything is working properly, let’s define a new action. The action we will create will be used to connect a mobile robot to a charging dock.

Open a terminal window, and move to your package.

cd ~/dev_ws/src/action_tutorials/action_tutorials_interfaces/action

Create a file called ConnectToChargingDock.action.

gedit ConnectToChargingDock.action

Add this code.

sensor_msgs/BatteryState desired_battery_state
---
sensor_msgs/BatteryState final_battery_state
---
sensor_msgs/BatteryState current_battery_state

Save the file and close it.

You can see the action definition has three sections:

  1. Request: desired_battery_state
  2. Result: final_battery_state
  3. Feedback: current_battery_state

Build the Action

Go to your CMakeLists.txt file.

cd ~/dev_ws/src/action_tutorials/action_tutorials_interfaces
gedit CMakeLists.txt

Make sure your CMakeLists.txt file has the following lines added in the proper locations.

find_package(sensor_msgs REQUIRED)
find_package(std_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/Fibonacci.action"
  "action/ConnectToChargingDock.action"
  DEPENDENCIES sensor_msgs std_msgs 
)

Save the CMakeLists.txt file, and close it.

Now edit your package.xml file.

cd ~/dev_ws/src/action_tutorials/action_tutorials_interfaces
gedit package.xml

Add the dependencies.

<depend>sensor_msgs</depend>
<depend>std_msgs</depend>

Here is my full package.xml file.

Build the package.

cd ~/dev_ws/
colcon build

Let’s check to see if the action was built properly.

Open another terminal window, and type (all of this is a single command):

ros2 interface show action_tutorials_interfaces/action/ConnectToChargingDock

Here is the output:

5-connect-to-charging-dock-action-definition

Write the Action Server

Now let’s create our action server. We will create an action server that will send velocity commands to the /cmd_vel topic until the power_supply_status of the battery state switches from 3 (i.e. not charging) to 1 (i.e. charging).

Open a terminal window and type:

cd ~/dev_ws/src/action_tutorials/action_tutorials_py/action_tutorials_py

Open a file called connect_to_charging_dock_action_server.py.

gedit connect_to_charging_dock_action_server.py

Add the following code:

#!/usr/bin/env python3 
"""
Description:
  Action Server to connect to a battery charging dock.
  Action is successful when the power_supply_status goes from
  NOT_CHARGING(i.e. 3) to CHARGING(i.e. 1)
-------
Subscription Topics:
  Current battery state
  /battery_status – sensor_msgs/BatteryState
-------
Publishing Topics:
  Velocity command to navigate to the charging dock.
  /cmd_vel - geometry_msgs/Twist
-------
Author: Addison Sears-Collins
Website: AutomaticAddison.com
Date: November 12, 2021
"""
import time

# Import our action definition
from action_tutorials_interfaces.action import ConnectToChargingDock

# ROS Client Library for Python
import rclpy

# ActionServer library for ROS 2 Python
from rclpy.action import ActionServer

# Enables publishers, subscribers, and action servers to be in a single node
from rclpy.executors import MultiThreadedExecutor

# Handles the creation of nodes
from rclpy.node import Node

# Handle Twist messages, linear and angular velocity
from geometry_msgs.msg import Twist

# Handles BatteryState message
from sensor_msgs.msg import BatteryState

# Holds the current state of the battery
this_battery_state = BatteryState()

class ConnectToChargingDockActionServer(Node):
  """
  Create a ConnectToChargingDockActionServer class, 
  which is a subclass of the Node class.
  """
  def __init__(self):
  
    # Initialize the class using the constructor
    super().__init__('connect_to_charging_dock_action_server')
    
    # Instantiate a new action server
    # self, type of action, action name, callback function for executing goals
    self._action_server = ActionServer(
      self,
      ConnectToChargingDock,
      'connect_to_charging_dock',
      self.execute_callback)
      
    # Create a publisher
    # This node publishes the desired linear and angular velocity of the robot
    self.publisher_cmd_vel = self.create_publisher(
      Twist,
      '/cmd_vel',
      10)   

    # Declare velocities
    self.linear_velocity = 0.0
    self.angular_velocity = 0.15
      
  def execute_callback(self, goal_handle):
    """
    Action server callback to execute accepted goals.
    """  
    self.get_logger().info('Executing goal...')

    # Interim feedback
    feedback_msg = ConnectToChargingDock.Feedback()
    feedback_msg.current_battery_state = this_battery_state
    
    # While the battery is not charging
    while this_battery_state.power_supply_status != 1:
    
      # Publish the current battery state
      feedback_msg.current_battery_state = this_battery_state
      self.get_logger().info('NOT CHARGING...')
      goal_handle.publish_feedback(feedback_msg)
      
      # Send the velocity command to the robot by publishing to the topic
      cmd_vel_msg = Twist()
      cmd_vel_msg.linear.x = self.linear_velocity
      cmd_vel_msg.angular.z = self.angular_velocity
      self.publisher_cmd_vel.publish(cmd_vel_msg)
      
      time.sleep(0.1)
    
    # Stop the robot
    cmd_vel_msg = Twist()
    cmd_vel_msg.linear.x = 0.0
    cmd_vel_msg.angular.z = 0.0
    self.publisher_cmd_vel.publish(cmd_vel_msg)

    # Update feedback    
    feedback_msg.current_battery_state = this_battery_state
    goal_handle.publish_feedback(feedback_msg)
  
    # Indicate the goal was successful
    goal_handle.succeed()
    self.get_logger().info('CHARGING...')
    self.get_logger().info('Successfully connected to the charging dock!')
  
    # Create a result message of the action type
    result = ConnectToChargingDock.Result()
    
    # Update the final battery state
    result.final_battery_state = feedback_msg.current_battery_state
    
    return result

class BatteryStateSubscriber(Node):
    """
    Subscriber node to the current battery state
    """      
    def __init__(self):
  
      # Initialize the class using the constructor
      super().__init__('battery_state_subscriber')
    
      # Create a subscriber 
      # This node subscribes to messages of type
      # sensor_msgs/BatteryState
      self.subscription_battery_state = self.create_subscription(
        BatteryState,
        '/battery_status',
        self.get_battery_state,
        10)
      
    def get_battery_state(self, msg):
      """
      Update the current battery state.
      """
      global this_battery_state
      this_battery_state = msg
    
def main(args=None):
  """
  Entry point for the program.
  """

  # Initialize the rclpy library
  rclpy.init(args=args)
  
  try: 
  
    # Create the Action Server node
    connect_to_charging_dock_action_server = ConnectToChargingDockActionServer()
    
    # Create the Battery State subscriber node
    battery_state_subscriber = BatteryStateSubscriber()
    
    # Set up mulithreading
    executor = MultiThreadedExecutor(num_threads=4)
    executor.add_node(connect_to_charging_dock_action_server)
    executor.add_node(battery_state_subscriber)
    
    try:
      # Spin the nodes to execute the callbacks
      executor.spin()
    finally:
      # Shutdown the nodes
      executor.shutdown()
      connect_to_charging_dock_action_server.destroy_node()
      battery_state_subscriber.destroy_node()

  finally:
    # Shutdown
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Save the code and close it.

Open your package.xml file.

cd ~/dev_ws/src/action_tutorials/action_tutorials_py/
gedit package.xml

Add the dependencies:

<exec_depend>geometry_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>std_msgs</exec_depend>

Save the file and close it.

cd ~/dev_ws/src/action_tutorials/action_tutorials_py/

Open your setup.py file.

Add the following line inside the entry_points block. 

'connect_to_charging_dock_server = action_tutorials_py.connect_to_charging_dock_action_server:main',

Save the file and close it.

Run the node.

cd ~/dev_ws
colcon build

Run the Action Server

Now let’s run a simulation. First, let’s assume the battery voltage on our robot is below 25%, and the robot is currently positioned at a location just in front of the charging dock. Our robot is running on a 9V battery, and we want the robot to connect to the charging dock so the power supply status switches from NOT CHARGING to CHARGING.

To simulate a ‘low battery’, we publish a message to the /battery_status topic.

Open a terminal window, and type:

ros2 topic pub /battery_status sensor_msgs/BatteryState '{voltage: 2.16, percentage: 0.24, power_supply_status: 3}' 

The above command means the battery voltage is 2.16V (24% of 9V), and the battery is NOT CHARGING because the power_supply_status variable is 3.

Now, let’s launch the action server. Open a new terminal window, and type:

ros2 run action_tutorials_py connect_to_charging_dock_server

Open a new terminal window, and send the action server a goal.

ros2 action send_goal --feedback connect_to_charging_dock action_tutorials_interfaces/action/ConnectToChargingDock "desired_battery_state: {voltage: 2.16, percentage: 0.24, power_supply_status: 1}}"

The goal above means we want the power_supply_status to be 1 (i.e. CHARGING). Remember, it is currently 3 (NOT CHARGING).

You should see this feedback:

6-this-feedback

Let’s see what is going on behind the scenes. Open a new terminal window, and check out the list of topics:

ros2 topic list
7-ros2-topic-list

To see the node graphs, type:

rqt_graph

To see the active nodes, type:

ros2 node list
8-active-nodes

To see the available actions, type:

ros2 action list
9-action-list

You can see full information about the node, by typing:

ros2 node info /connect_to_charging_dock_action_server

You can see more information about the action by typing:

ros2 action info /connect_to_charging_dock

Now type:

ros2 topic echo /cmd_vel

You can see that our action server is publishing velocity commands to the /cmd_vel topic. Right now, I have the robot doing a continuous spin at 0.15 radians per second. In a real-world application, you will want to add code to the action server to read from infrared sensors or an ARTag in order to send the appropriate velocity commands.

10-velocity-commands

Finally, let’s assume our robot has connected to the charging dock. To make the power_supply_status switch from 3 (NOT CHARGING) to 1 (CHARGING), stop the /battery_status publisher by going back to that terminal window, and typing:

CTRL + C

Then open a terminal window, and type:

ros2 topic pub /battery_status sensor_msgs/BatteryState '{voltage: 2.16, percentage: 0.24, power_supply_status: 1}' 

Here is the output on the action server window:

11-output-on-actionserver-window

The action client window has the following output:

12-action-client

Write the Action Client

Now that we have written the action server, let’s write the action client. The official tutorial is here.

The action client’s job is to send a goal to the action server. In the previous section, we used a terminal command to do this, but now we will use code.

Open a terminal window and type:

cd ~/dev_ws/src/action_tutorials/action_tutorials_py/action_tutorials_py

Open a file called connect_to_charging_dock_action_client.py.

gedit connect_to_charging_dock_action_client.py

Add the following code:

#!/usr/bin/env python3 
"""
Description:
  Action Client to connect to a battery charging dock.
  Action is successful when the power_supply_status goes from
  NOT_CHARGING(i.e. 3) to CHARGING(i.e. 1)
-------
Author: Addison Sears-Collins
Website: AutomaticAddison.com
Date: November 13, 2021
"""
# Import our action definition
from action_tutorials_interfaces.action import ConnectToChargingDock

# ROS Client Library for Python
import rclpy

# ActionClient library for ROS 2 Python
from rclpy.action import ActionClient

# Handles the creation of nodes
from rclpy.node import Node

# Handles BatteryState message
from sensor_msgs.msg import BatteryState

class ConnectToChargingDockActionClient(Node):
  """
  Create a ConnectToChargingDockActionClient class, 
  which is a subclass of the Node class.
  """
  def __init__(self):
  
    # Initialize the class using the constructor
    super().__init__('connect_to_charging_dock_action_client')
    
    # Instantiate a new action client
    # self, type of action, action name
    self._action_client = ActionClient(
      self,
      ConnectToChargingDock,
      'connect_to_charging_dock')
      
  def send_goal(self, desired_battery_state):
    """
    Action client to send the goal
    """  
    # Set the goal message
    goal_msg = ConnectToChargingDock.Goal()
    goal_msg.desired_battery_state = desired_battery_state    
   
    # Wait for the Action Server to launch
    self._action_client.wait_for_server()
    
    # Register a callback for when the future is complete
    self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)    
    self._send_goal_future.add_done_callback(self.goal_response_callback) 
    
  def goal_response_callback(self, future): 
    """
    Get the goal_handle
    """
    goal_handle = future.result()
    if not goal_handle.accepted:
      self.get_logger().info('Goal rejected...')
      return
    
    self.get_logger().info('Goal accepted...')
    
    self._get_result_future = goal_handle.get_result_async()
    self._get_result_future.add_done_callback(self.get_result_callback)
  
  def get_result_callback(self, future):
    """
    Gets the result 
    """
    result = future.result().result
    self.get_logger().info('Result (1 = CHARGING, 3 = NOT CHARGING): {0}'.format(
      result.final_battery_state.power_supply_status))
    rclpy.shutdown()
    
  def feedback_callback(self, feedback_msg):
    feedback = feedback_msg.feedback
    self.get_logger().info(
      'Received feedback (1 = CHARGING, 3 = NOT CHARGING): {0}'.format(
      feedback.current_battery_state.power_supply_status))

def main(args=None):
  """
  Entry point for the program.
  """

  # Initialize the rclpy library
  rclpy.init(args=args)
  
  # Create the goal message
  desired_battery_state = BatteryState()
  desired_battery_state.voltage = 2.16
  desired_battery_state.percentage = 0.24
  desired_battery_state.power_supply_status = 1
  
  # Create the Action Client node
  action_client = ConnectToChargingDockActionClient()
  
  # Send the goal  
  action_client.send_goal(desired_battery_state)
  
  # Spin to execute callbacks
  rclpy.spin(action_client)
  
if __name__ == '__main__':
    main()

Save the code and close it.

Now type:

cd ~/dev_ws/src/action_tutorials/action_tutorials_py/

Open your setup.py file.

Add the following line inside the entry_points block. 

'connect_to_charging_dock_client = action_tutorials_py.connect_to_charging_dock_action_client:main',

Save the file and close it.

Run the node.

cd ~/dev_ws
colcon build

Run the Action Server and Client Together

To simulate a ‘low battery’, we publish a message to the /battery_status topic.

Open a terminal window, and type:

ros2 topic pub /battery_status sensor_msgs/BatteryState '{voltage: 2.16, percentage: 0.24, power_supply_status: 3}' 

Now, let’s launch the action server. Open a new terminal window, and type:

ros2 run action_tutorials_py connect_to_charging_dock_server

Open a new terminal window, and send the action server a goal using the action client.

ros2 run action_tutorials_py connect_to_charging_dock_client

Stop the /battery_status publisher by going back to that terminal window, and typing:

CTRL + C

Then open a terminal window, and type:

ros2 topic pub /battery_status sensor_msgs/BatteryState '{voltage: 2.16, percentage: 0.24, power_supply_status: 1}' 

Here is the output on the action server window:

13-action-server-window

The action client window has the following output:

14-action-client-window

There you have it. The code in this tutorial can be used as a template for autonomous docking at a charging station. 

Keep building!