The Bug2 Algorithm for Robot Motion Planning

In this tutorial, we will learn about the Bug2 algorithm for robot motion planning. The Bug2 algorithm is used when you have a mobile robot:

  • With a known starting location
  • With a known goal location
  • Inside an unexplored environment
  • Contains a distance sensor that can detect the distances to objects and walls in the environment (e.g. like an ultrasonic sensor or a laser distance sensor.)
  • Contains an encoder that the robot can use to estimate how far the robot has traveled from the starting location.

Here is a video of a simulated robot I developed in Gazebo and ROS2 that uses the Bug2 algorithm to move from a starting point to a goal point, avoiding walls along the way.

Real-World Applications

Imagine an automatic vacuum cleaner like the one below. It vacuums a room and then needs to move autonomously to another room so that it can clean that room. Along the way, the robot must avoid bumping into walls.

roomba_discovery

Algorithm Description

On a high level, the Bug2 algorithm has two main modes:

  1. Go to Goal Mode: Move from the current location towards the goal (x,y) coordinate.
  2. Wall Following Mode: Move along a wall.

Here is pseudocode for the algorithm:

1.      Calculate a start-goal line. The start-goal line is an imaginary line that connects the starting position to the goal position.

2.      While Not at the Goal

  • Move towards the goal along the start-goal line.
    • If a wall is encountered:
      • Remember the location where the wall was first encountered. This is the “hit point.”
      • Follow the wall until you encounter the start-goal line. This point is known as the “leave point.”
        •  If the leave point is closer to the goal than the hit point, leave the wall, and move towards the goal again.
        • Otherwise, continue following the wall.

That’s the algorithm. In the image below, I have labeled the hit points and leave points.

1-hit-points-leave-points-bug2-algorithmJPG

Python Implementation (ROS2)

The robot you see in the video at the beginning of this tutorial has a laser distance scanner mounted on top of it that enables it to detect distances from 0 degrees (right side of the robot) to 180 degrees (left side of the robot). It is using three Python functions, bug2, follow_wall, and go_to_goal.  

Below is my complete code. It runs in ROS2 Foxy Fitzroy and Gazebo and makes use of the Differential Drive plugin. 

Since we’re just focusing on the algorithms in this post, I won’t go into the details of how I created the robot, built the simulated maze, and created publisher and subscriber nodes for the goal locations, robot pose, and laser distance sensor information (I might cover this in a future post).

Don’t be intimidated by the code. It is long but well-commented. Go through each piece and function one step at a time. Focus only on the following three methods:

  • go_to_goal(self) method 
  • follow_wall(self) method
  • bug2(self) method

Those three methods encapsulate the implementation of Bug2. 

I created descriptive variable names so that you can compare the code to the pseudocode I wrote earlier in this tutorial. 

# Author: Addison Sears-Collins
# Date: December 17, 2020
# ROS Version: ROS 2 Foxy Fitzroy

############## IMPORT LIBRARIES #################
# Python math library
import math 

# ROS client library for Python
import rclpy 

# Enables pauses in the execution of code
from time import sleep 

# Used to create nodes
from rclpy.node import Node

# Enables the use of the string message type
from std_msgs.msg import String 

# Twist is linear and angular velocity
from geometry_msgs.msg import Twist 	
					
# Handles LaserScan messages to sense distance to obstacles (i.e. walls)      	
from sensor_msgs.msg import LaserScan	 

# Handle Pose messages
from geometry_msgs.msg import Pose 

# Handle float64 arrays
from std_msgs.msg import Float64MultiArray
					
# Handles quality of service for LaserScan data
from rclpy.qos import qos_profile_sensor_data 

# Scientific computing library
import numpy as np 

class PlaceholderController(Node):
	"""
	Create a Placeholder Controller class, which is a subclass of the Node 
	class for ROS2.
	"""

	def __init__(self):
		"""
		Class constructor to set up the node
		"""
		####### INITIALIZE ROS PUBLISHERS AND SUBSCRIBERS##############
		# Initiate the Node class's constructor and give it a name
		super().__init__('PlaceholderController')
		
		# Create a subscriber
		# This node subscribes to messages of type Float64MultiArray  
		# over a topic named: /en613/state_est
		# The message represents the current estimated state:
		#   [x, y, yaw]
		# The callback function is called as soon as a message 
		# is received.
		# The maximum number of queued messages is 10.
		self.subscription = self.create_subscription(
			Float64MultiArray,
			'/en613/state_est',
			self.state_estimate_callback,
			10)
		self.subscription  # prevent unused variable warning
		
		# Create a subscriber
		# This node subscribes to messages of type 
		# sensor_msgs/LaserScan		
		self.scan_subscriber = self.create_subscription(
			LaserScan,
			'/en613/scan',
			self.scan_callback,
			qos_profile=qos_profile_sensor_data)
			
		# Create a subscriber
		# This node subscribes to messages of type geometry_msgs/Pose 
		# over a topic named: /en613/goal
		# The message represents the the goal position.
		# The callback function is called as soon as a message 
		# is received.
		# The maximum number of queued messages is 10.
		self.subscription_goal_pose = self.create_subscription(
			Pose,
			'/en613/goal',
			self.pose_received,
			10)
			
		# Create a publisher
		# This node publishes the desired linear and angular velocity 
		# of the robot (in the robot chassis coordinate frame) to the 
		# /en613/cmd_vel topic. Using the diff_drive
		# plugin enables the basic_robot model to read this 
		# /end613/cmd_vel topic and execute the motion accordingly.
		self.publisher_ = self.create_publisher(
			Twist, 
			'/en613/cmd_vel', 
			10)
		
		# Initialize the LaserScan sensor readings to some large value
		# Values are in meters.
		self.left_dist = 999999.9 # Left
		self.leftfront_dist = 999999.9 # Left-front
		self.front_dist = 999999.9 # Front
		self.rightfront_dist = 999999.9 # Right-front
		self.right_dist = 999999.9 # Right

		################### ROBOT CONTROL PARAMETERS ##################
		
		# Maximum forward speed of the robot in meters per second
		# Any faster than this and the robot risks falling over.
		self.forward_speed = 0.035	
		
		# Current position and orientation of the robot in the global 
		# reference frame
		self.current_x = 0.0
		self.current_y = 0.0
		self.current_yaw = 0.0
		
		# By changing the value of self.robot_mode, you can alter what
		# the robot will do when the program is launched.
		# 	"obstacle avoidance mode": Robot will avoid obstacles
		#  	"go to goal mode": Robot will head to an x,y coordinate   
		# 	"wall following mode": Robot will follow a wall 
		self.robot_mode = "go to goal mode"
		
		############# OBSTACLE AVOIDANCE MODE PARAMETERS ##############
		
		# Obstacle detection distance threshold
		self.dist_thresh_obs = 0.25 # in meters
		
		# Maximum left-turning speed	
		self.turning_speed = 0.25 # rad/s

		############# GO TO GOAL MODE PARAMETERS ######################
		# Finite states for the go to goal mode
		# 	"adjust heading": Orient towards a goal x, y coordinate
		# 	"go straight": Go straight towards goal x, y coordinate
		# 	"goal achieved": Reached goal x, y coordinate
		self.go_to_goal_state = "adjust heading"
		
		# List the goal destinations
		# We create a list of the (x,y) coordinate goals
		self.goal_x_coordinates = False # [ 0.0, 3.0, 0.0, -1.5, -1.5,  4.5, 0.0]
		self.goal_y_coordinates = False # [-4.0, 1.0, 1.5,  1.0, -3.0, -4.0, 0.0]
		
		# Keep track of which goal we're headed towards
		self.goal_idx = 0 
		
		# Keep track of when we've reached the end of the goal list
		self.goal_max_idx =  None # len(self.goal_x_coordinates) - 1 
		
		# +/- 2.0 degrees of precision
		self.yaw_precision = 2.0 * (math.pi / 180) 
		
		# How quickly we need to turn when we need to make a heading
		# adjustment (rad/s)
		self.turning_speed_yaw_adjustment = 0.0625
		
		# Need to get within +/- 0.2 meter (20 cm) of (x,y) goal
		self.dist_precision = 0.2 

		############# WALL FOLLOWING MODE PARAMETERS ##################		
		# Finite states for the wall following mode
		# 	"turn left": Robot turns towards the left
		#	"search for wall": Robot tries to locate the wall		
		# 	"follow wall": Robot moves parallel to the wall
		self.wall_following_state = "turn left"
		
		# Set turning speeds (to the left) in rad/s 
		# These values were determined by trial and error.
		self.turning_speed_wf_fast = 1.0  # Fast turn
		self.turning_speed_wf_slow = 0.125 # Slow turn
		
		# Wall following distance threshold.
		# We want to try to keep within this distance from the wall.
		self.dist_thresh_wf = 0.45 # in meters	
		
		# We don't want to get too close to the wall though.
		self.dist_too_close_to_wall = 0.15 # in meters
		
		################### BUG2 PARAMETERS ###########################
		
		# Bug2 Algorithm Switch
		# Can turn "ON" or "OFF" depending on if you want to run Bug2
		# Motion Planning Algorithm
		self.bug2_switch = "ON"
		
		# Start-Goal Line Calculated?
		self.start_goal_line_calculated = False
		
		# Start-Goal Line Parameters
		self.start_goal_line_slope_m = 0
		self.start_goal_line_y_intercept = 0
		self.start_goal_line_xstart = 0
		self.start_goal_line_xgoal = 0
		self.start_goal_line_ystart = 0
		self.start_goal_line_ygoal = 0
		
		# Anything less than this distance means we have encountered
		# a wall. Value determined through trial and error.
		self.dist_thresh_bug2 = 0.15 
		
		# Leave point must be within +/- 0.1m of the start-goal line
		# in order to go from wall following mode to go to goal mode
		self.distance_to_start_goal_line_precision = 0.1
		
		# Used to record the (x,y) coordinate where the robot hit
		# a wall.
		self.hit_point_x = 0
		self.hit_point_y = 0
		
		# Distance between the hit point and the goal in meters
		self.distance_to_goal_from_hit_point = 0.0
		
		# Used to record the (x,y) coordinate where the robot left
		# a wall.		
		self.leave_point_x = 0
		self.leave_point_y = 0
		
		# Distance between the leave point and the goal in meters
		self.distance_to_goal_from_leave_point = 0.0
		
		# The hit point and leave point must be far enough 
		# apart to change state from wall following to go to goal
		# This value helps prevent the robot from getting stuck and
		# rotating in endless circles.
		# This distance was determined through trial and error.
		self.leave_point_to_hit_point_diff = 0.25 # in meters
		
	def pose_received(self,msg):
		"""
		Populate the pose.
		"""
		self.goal_x_coordinates = [msg.position.x]
		self.goal_y_coordinates = [msg.position.y]
		self.goal_max_idx = len(self.goal_x_coordinates) - 1		
	
	def scan_callback(self, msg):
		"""
		This method gets called every time a LaserScan message is 
		received on the /en613/scan ROS topic	
		"""
		# Read the laser scan data that indicates distances
		# to obstacles (e.g. wall) in meters and extract
		# 5 distinct laser readings to work with.
		# Each reading is separated by 45 degrees.
		# Assumes 181 laser readings, separated by 1 degree. 
		# (e.g. -90 degrees to 90 degrees....0 to 180 degrees)
		self.left_dist = msg.ranges[180]
		self.leftfront_dist = msg.ranges[135]
		self.front_dist = msg.ranges[90]
		self.rightfront_dist = msg.ranges[45]
		self.right_dist = msg.ranges[0]
		
		# The total number of laser rays. Used for testing.
		#number_of_laser_rays = str(len(msg.ranges))		
			
		# Print the distance values (in meters) for testing
		#self.get_logger().info('L:%f LF:%f F:%f RF:%f R:%f' % (
		#	self.left_dist,
		#	self.leftfront_dist,
		#	self.front_dist,
		#	self.rightfront_dist,
		#	self.right_dist))
		
		if self.robot_mode == "obstacle avoidance mode":
			self.avoid_obstacles()
			
	def state_estimate_callback(self, msg):
		"""
		Extract the position and orientation data. 
		This callback is called each time
		a new message is received on the '/en613/state_est' topic
		"""
		# Update the current estimated state in the global reference frame
		curr_state = msg.data
		self.current_x = curr_state[0]
		self.current_y = curr_state[1]
		self.current_yaw = curr_state[2]
		
		# Wait until we have received some goal destinations.
		if self.goal_x_coordinates == False and self.goal_y_coordinates == False:
			return
				
		# Print the pose of the robot
		# Used for testing
		#self.get_logger().info('X:%f Y:%f YAW:%f' % (
		#	self.current_x,
		#	self.current_y,
		#	np.rad2deg(self.current_yaw)))  # Goes from -pi to pi 
		
		# See if the Bug2 algorithm is activated. If yes, call bug2()
		if self.bug2_switch == "ON":
			self.bug2()
		else:
			
			if self.robot_mode == "go to goal mode":
				self.go_to_goal()
			elif self.robot_mode == "wall following mode":
				self.follow_wall()
			else:
				pass # Do nothing			
		
	def avoid_obstacles(self):
		"""
		Wander around the maze and avoid obstacles.
		"""
		# Create a Twist message and initialize all the values 
		# for the linear and angular velocities
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0	
		
		# Logic for avoiding obstacles (e.g. walls)
		# >d means no obstacle detected by that laser beam
		# <d means an obstacle was detected by that laser beam
		d = self.dist_thresh_obs
		if   self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist > d:
			msg.linear.x = self.forward_speed # Go straight forward
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist > d:
			msg.angular.z = self.turning_speed  # Turn left
		elif self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed  
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist > d:
			msg.angular.z = -self.turning_speed # Turn right
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist > d:
			msg.angular.z = -self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist < d:
			msg.linear.x = self.forward_speed
		else:
			pass 
			
		# Send the velocity commands to the robot by publishing 
		# to the topic
		self.publisher_.publish(msg)
							
	def go_to_goal(self):
		"""
		This code drives the robot towards to the goal destination
		"""
		# Create a geometry_msgs/Twist message
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0
		
		# If Bug2 algorithm is activated
		if self.bug2_switch == "ON":
		
			# If the wall is in the way
			d = self.dist_thresh_bug2
			if (	self.leftfront_dist < d or 
				self.front_dist < d or 
				self.rightfront_dist < d):
			
				# Change the mode to wall following mode.
				self.robot_mode = "wall following mode"
				
				# Record the hit point	
				self.hit_point_x = self.current_x
				self.hit_point_y = self.current_y
				
				# Record the distance to the goal from the 
				# hit point
				self.distance_to_goal_from_hit_point = (
					math.sqrt((
					pow(self.goal_x_coordinates[self.goal_idx] - self.hit_point_x, 2)) + (
					pow(self.goal_y_coordinates[self.goal_idx] - self.hit_point_y, 2)))) 	
					
				# Make a hard left to begin following wall
				msg.angular.z = self.turning_speed_wf_fast
						
				# Send command to the robot
				self.publisher_.publish(msg)
				
				# Exit this function 		
				return
			
		# Fix the heading		
		if (self.go_to_goal_state == "adjust heading"):
			
			# Calculate the desired heading based on the current position 
			# and the desired position
			desired_yaw = math.atan2(
					self.goal_y_coordinates[self.goal_idx] - self.current_y,
					self.goal_x_coordinates[self.goal_idx] - self.current_x)
			
			# How far off is the current heading in radians?		
			yaw_error = desired_yaw - self.current_yaw
			
			# Adjust heading if heading is not good enough
			if math.fabs(yaw_error) > self.yaw_precision:
			
				if yaw_error > 0:	
					# Turn left (counterclockwise)		
					msg.angular.z = self.turning_speed_yaw_adjustment 				
				else:
					# Turn right (clockwise)
					msg.angular.z = -self.turning_speed_yaw_adjustment
				
				# Command the robot to adjust the heading
				self.publisher_.publish(msg)
				
			# Change the state if the heading is good enough
			else:				
				# Change the state
				self.go_to_goal_state = "go straight"
				
				# Command the robot to stop turning
				self.publisher_.publish(msg)		

		# Go straight										
		elif (self.go_to_goal_state == "go straight"):
			
			position_error = math.sqrt(
						pow(
						self.goal_x_coordinates[self.goal_idx] - self.current_x, 2)
						+ pow(
						self.goal_y_coordinates[self.goal_idx] - self.current_y, 2)) 
						
			
			# If we are still too far away from the goal						
			if position_error > self.dist_precision:

				# Move straight ahead
				msg.linear.x = self.forward_speed
					
				# Command the robot to move
				self.publisher_.publish(msg)
			
				# Check our heading			
				desired_yaw = math.atan2(
					self.goal_y_coordinates[self.goal_idx] - self.current_y,
					self.goal_x_coordinates[self.goal_idx] - self.current_x)
				
				# How far off is the heading?	
				yaw_error = desired_yaw - self.current_yaw		
		
				# Check the heading and change the state if there is too much heading error
				if math.fabs(yaw_error) > self.yaw_precision:
					
					# Change the state
					self.go_to_goal_state = "adjust heading"
				
			# We reached our goal. Change the state.
			else:			
				# Change the state
				self.go_to_goal_state = "goal achieved"
				
				# Command the robot to stop
				self.publisher_.publish(msg)
		
		# Goal achieved			
		elif (self.go_to_goal_state == "goal achieved"):
				
			self.get_logger().info('Goal achieved! X:%f Y:%f' % (
				self.goal_x_coordinates[self.goal_idx],
				self.goal_y_coordinates[self.goal_idx]))
			
			# Get the next goal
			self.goal_idx = self.goal_idx + 1
		
			# Do we have any more goals left?			
			# If we have no more goals left, just stop
			if (self.goal_idx > self.goal_max_idx):
				self.get_logger().info('Congratulations! All goals have been achieved.')
				while True:
					pass

			# Let's achieve our next goal
			else: 
				# Change the state
				self.go_to_goal_state = "adjust heading"				

			# We need to recalculate the start-goal line if Bug2 is running
			self.start_goal_line_calculated = False				
		
		else:
			pass
			
	def follow_wall(self):
		"""
		This method causes the robot to follow the boundary of a wall.
		"""
		# Create a geometry_msgs/Twist message
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0			

		# Special code if Bug2 algorithm is activated
		if self.bug2_switch == "ON":
		
			# Calculate the point on the start-goal 
			# line that is closest to the current position
			x_start_goal_line = self.current_x
			y_start_goal_line = (
				self.start_goal_line_slope_m * (
				x_start_goal_line)) + (
				self.start_goal_line_y_intercept)
						
			# Calculate the distance between current position 
			# and the start-goal line
			distance_to_start_goal_line = math.sqrt(pow(
						x_start_goal_line - self.current_x, 2) + pow(
						y_start_goal_line - self.current_y, 2)) 
							
			# If we hit the start-goal line again				
			if distance_to_start_goal_line < self.distance_to_start_goal_line_precision:
			
				# Determine if we need to leave the wall and change the mode
				# to 'go to goal'
				# Let this point be the leave point
				self.leave_point_x = self.current_x
				self.leave_point_y = self.current_y

				# Record the distance to the goal from the leave point
				self.distance_to_goal_from_leave_point = math.sqrt(
					pow(self.goal_x_coordinates[self.goal_idx] 
					- self.leave_point_x, 2)
					+ pow(self.goal_y_coordinates[self.goal_idx]  
					- self.leave_point_y, 2)) 
			
				# Is the leave point closer to the goal than the hit point?
				# If yes, go to goal. 
				diff = self.distance_to_goal_from_hit_point - self.distance_to_goal_from_leave_point
				if diff > self.leave_point_to_hit_point_diff:
						
					# Change the mode. Go to goal.
					self.robot_mode = "go to goal mode"


				# Exit this function
				return				
		
		# Logic for following the wall
		# >d means no wall detected by that laser beam
		# <d means an wall was detected by that laser beam
		d = self.dist_thresh_wf
		
		if self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist > d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist > d:
			self.wall_following_state = "turn left"
			msg.angular.z = self.turning_speed_wf_fast
			
			
		elif (self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist < d):
			if (self.rightfront_dist < self.dist_too_close_to_wall):
				# Getting too close to the wall
				self.wall_following_state = "turn left"
				msg.linear.x = self.forward_speed
				msg.angular.z = self.turning_speed_wf_fast		
			else: 			
				# Go straight ahead
				self.wall_following_state = "follow wall"  
				msg.linear.x = self.forward_speed	
									
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist > d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist < d:
			self.wall_following_state = "turn left"
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist > d:
			self.wall_following_state = "turn left" 
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist < d:
			self.wall_following_state = "turn left" 
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist < d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		else:
			pass 

		# Send velocity command to the robot
		self.publisher_.publish(msg)	
		
	def bug2(self):
	
		# Each time we start towards a new goal, we need to calculate the start-goal line
		if self.start_goal_line_calculated == False:
		
			# Make sure go to goal mode is set.
			self.robot_mode = "go to goal mode"				

			self.start_goal_line_xstart = self.current_x
			self.start_goal_line_xgoal = self.goal_x_coordinates[self.goal_idx]
			self.start_goal_line_ystart = self.current_y
			self.start_goal_line_ygoal = self.goal_y_coordinates[self.goal_idx]
			
			# Calculate the slope of the start-goal line m
			self.start_goal_line_slope_m = (
				(self.start_goal_line_ygoal - self.start_goal_line_ystart) / (
				self.start_goal_line_xgoal - self.start_goal_line_xstart))
			
			# Solve for the intercept b
			self.start_goal_line_y_intercept = self.start_goal_line_ygoal - (
					self.start_goal_line_slope_m * self.start_goal_line_xgoal) 
				
			# We have successfully calculated the start-goal line
			self.start_goal_line_calculated = True
			
		if self.robot_mode == "go to goal mode":
			self.go_to_goal()			
		elif self.robot_mode == "wall following mode":
			self.follow_wall()
def main(args=None):

    # Initialize rclpy library
    rclpy.init(args=args)
    
    # Create the node
    controller = PlaceholderController()

    # Spin the node so the callback function is called
    # Pull messages from any topics this node is subscribed to
    # Publish any pending messages to the topics
    rclpy.spin(controller)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    controller.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Here is the state estimator that goes with the code above. I used an Extended Kalman Filter to filter the odometry measurements from the mobile robot.

# Author: Addison Sears-Collins
# Date: December 8, 2020
# ROS Version: ROS 2 Foxy Fitzroy

# Python math library
import math

# ROS client library for Python
import rclpy

# Used to create nodes
from rclpy.node import Node

# Twist is linear and angular velocity
from geometry_msgs.msg import Twist 

# Position, orientation, linear velocity, angular velocity
from nav_msgs.msg import Odometry

# Handles laser distance scan to detect obstacles
from sensor_msgs.msg import LaserScan

# Used for laser scan
from rclpy.qos import qos_profile_sensor_data

# Enable use of std_msgs/Float64MultiArray message
from std_msgs.msg import Float64MultiArray 

# Scientific computing library for Python
import numpy as np

class PlaceholderEstimator(Node):
	"""
	Class constructor to set up the node
	"""
	def __init__(self):
		############## INITIALIZE ROS PUBLISHERS AND SUBSCRIBERS ######
	
	
		# Initiate the Node class's constructor and give it a name
		super().__init__('PlaceholderEstimator')
		
		# Create a subscriber 
		# This node subscribes to messages of type 
		# geometry_msgs/Twist.msg
		# The maximum number of queued messages is 10.
		self.velocity_subscriber = self.create_subscription(
			Twist,
			'/en613/cmd_vel',
			self.velocity_callback,
			10)
			
		# Create a subscriber
		# This node subscribes to messages of type
		# nav_msgs/Odometry
		self.odom_subscriber = self.create_subscription(
			Odometry,
			'/en613/odom',
			self.odom_callback,
			10)
		
		# Create a publisher
		# This node publishes the estimated position (x, y, yaw) 
		# The type of message is std_msgs/Float64MultiArray
		self.publisher_state_est = self.create_publisher(
			Float64MultiArray, 
			'/en613/state_est', 
			10)
			
		############# STATE TRANSITION MODEL PARAMETERS ###############	
		
		# Time step from one time step t-1 to the next time step t
		self.delta_t = 0.002 # seconds
		
		# Keep track of the estimate of the yaw angle
		# for control input vector calculation.
		self.est_yaw = 0.0
		
		# A matrix
		# 3x3 matrix -> number of states x number of states matrix
		# Expresses how the state of the system [x,y,yaw] changes 
		# from t-1 to t when no control command is executed. Typically 
		# a robot on wheels only drives when the wheels are commanded 
		# to turn.
		# For this case, A is the identity matrix.
		# A is sometimes F in the literature.
		self.A_t_minus_1 = np.array([ 	[1.0,  0,   0],
						[  0,1.0,   0],
						[  0,  0, 1.0]])
		
		# The estimated state vector at time t-1 in the global 
		# reference frame
		# [x_t_minus_1, y_t_minus_1, yaw_t_minus_1]
		# [meters, meters, radians]
		self.state_vector_t_minus_1 = np.array([0.0,0.0,0.0])
		
		# The control input vector at time t-1 in the global 
		# reference frame
		# [v,v,yaw_rate]
		# [meters/second, meters/second, radians/second]
		# In the literature, this is commonly u.
		self.control_vector_t_minus_1 = np.array([0.001,0.001,0.001])
		
		# Noise applied to the forward kinematics (calculation
		# of the estimated state at time t from the state transition
		# model of the mobile robot. This is a vector with the
		# number of elements equal to the number of states.
		self.process_noise_v_t_minus_1 = np.array([0.096,0.096,0.032])

		############# MEASUREMENT MODEL PARAMETERS ####################	
		
		# Measurement matrix H_t
		# Used to convert the predicted state estimate at time t=1
		# into predicted sensor measurements at time t=1.
		# In this case, H will be the identity matrix since the 
		# estimated state maps directly to state measurements from the 
		# odometry data [x, y, yaw]
		# H has the same number of rows as sensor measurements
		# and same number of columns as states.
		self.H_t = np.array([ 	[1.0,  0,   0],
					[  0,1.0,   0],
					[  0,  0, 1.0]])
				
		# Sensor noise. This is a vector with the
		# number of elements as the number of sensor measurements.
		self.sensor_noise_w_t = np.array([0.07,0.07,0.04])

		############# EXTENDED KALMAN FILTER PARAMETERS ###############
		
		# State covariance matrix P_t_minus_1
		# This matrix has the same number of rows (and columns) as the 
		# number of states (i.e. 3x3 matrix). P is sometimes referred
		# to as Sigma in the literature. It represents an estimate of 
		# the accuracy of the state estimate at t=1 made using the
		# state transition matrix. We start off with guessed values.
		self.P_t_minus_1 = np.array([ 	[0.1,  0,   0],
						[  0,0.1,   0],
						[  0,  0, 0.1]])
						
		# State model noise covariance matrix Q_t
		# When Q is large, the Kalman Filter tracks large changes in 
		# the sensor measurements more closely than for smaller Q.
		# Q is a square matrix that has the same number of rows as 
		# states.
		self.Q_t = np.array([ 		[1.0,   0,   0],
						[  0, 1.0,   0],
						[  0,   0, 1.0]])
						
		# Sensor measurement noise covariance matrix R_t
		# Has the same number of rows and columns as sensor measurements.
		# If we are sure about the measurements, R will be near zero.
		self.R_t = np.array([ 		[1.0,   0,    0],
						[  0, 1.0,    0],
						[  0,    0, 1.0]])
		
	def velocity_callback(self, msg):
		"""
		Listen to the velocity commands (linear forward velocity 
		in the x direction in the robot's reference frame and 
		angular velocity (yaw rate) around the robot's z-axis.
		Convert those velocity commands into a 3-element control
		input vector ... 
		[v,v,yaw_rate]
		[meters/second, meters/second, radians/second]
		"""
		# Forward velocity in the robot's reference frame
		v = msg.linear.x
		
		# Angular velocity around the robot's z axis
		yaw_rate = msg.angular.z
		
		# [v,v,yaw_rate]		
		self.control_vector_t_minus_1[0] = v
		self.control_vector_t_minus_1[1] = v
		self.control_vector_t_minus_1[2] = yaw_rate

	def odom_callback(self, msg):
		"""
		Receive the odometry information containing the position and orientation
		of the robot in the global reference frame. 
		The position is x, y, z.
		The orientation is a x,y,z,w quaternion. 
		"""						
		roll, pitch, yaw = self.euler_from_quaternion(
					msg.pose.pose.orientation.x,
					msg.pose.pose.orientation.y,
					msg.pose.pose.orientation.z,
					msg.pose.pose.orientation.w)
				
		obs_state_vector_x_y_yaw = [msg.pose.pose.position.x,msg.pose.pose.position.y,yaw]
		
		# These are the measurements taken by the odometry in Gazebo
		z_t_observation_vector =  np.array([obs_state_vector_x_y_yaw[0],
						obs_state_vector_x_y_yaw[1],
						obs_state_vector_x_y_yaw[2]])
		
		# Apply the Extended Kalman Filter
		# This is the updated state estimate after taking the latest
		# sensor (odometry) measurements into account.				
		updated_state_estimate_t = self.ekf(z_t_observation_vector)
		
		# Publish the estimate state
		self.publish_estimated_state(updated_state_estimate_t)

	def publish_estimated_state(self, state_vector_x_y_yaw):
		"""
		Publish the estimated pose (position and orientation) of the 
		robot to the '/en613/state_est' topic. 
		:param: state_vector_x_y_yaw [x, y, yaw] 
			x is in meters, y is in meters, yaw is in radians
		"""
		msg = Float64MultiArray()
		msg.data = state_vector_x_y_yaw
		self.publisher_state_est.publish(msg)

	def euler_from_quaternion(self, x, y, z, w):
		"""
		Convert a quaternion into euler angles (roll, pitch, yaw)
		roll is rotation around x in radians (counterclockwise)
		pitch is rotation around y in radians (counterclockwise)
		yaw is rotation around z in radians (counterclockwise)
		"""
		t0 = +2.0 * (w * x + y * z)
		t1 = +1.0 - 2.0 * (x * x + y * y)
		roll_x = math.atan2(t0, t1)
    
		t2 = +2.0 * (w * y - z * x)
		t2 = +1.0 if t2 > +1.0 else t2
		t2 = -1.0 if t2 < -1.0 else t2
		pitch_y = math.asin(t2)
    
		t3 = +2.0 * (w * z + x * y)
		t4 = +1.0 - 2.0 * (y * y + z * z)
		yaw_z = math.atan2(t3, t4)
    
		return roll_x, pitch_y, yaw_z # in radians
		
	def getB(self,yaw,dt):
		"""
		Calculates and returns the B matrix
		
		3x3 matix -> number of states x number of control inputs
		The control inputs are the forward speed and the rotation 
		rate around the z axis from the x-axis in the 
		counterclockwise direction.
		[v,v,yaw_rate]
		Expresses how the state of the system [x,y,yaw] changes 
		from t-1 to t
		due to the control commands (i.e. inputs).
		:param yaw: The yaw (rotation angle around the z axis) in rad 
		:param dt: The change in time from time step t-1 to t in sec
		"""
		B = np.array([	[np.cos(yaw) * dt,  0,   0],
				[  0, np.sin(yaw) * dt,   0],
				[  0,  0, dt]]) 				
		return B
		
	def ekf(self,z_t_observation_vector):
		"""
		Extended Kalman Filter. Fuses noisy sensor measurement to 
		create an optimal estimate of the state of the robotic system.
		
		INPUT
		:param z_t_observation_vector The observation from the Odometry
			3x1 NumPy Array [x,y,yaw] in the global reference frame
			in [meters,meters,radians].
		
		OUTPUT
		:return state_estimate_t optimal state estimate at time t	
			[x,y,yaw]....3x1 list --->
			[meters,meters,radians]
					
		"""
		######################### Predict #############################
		# Predict the state estimate at time t based on the state 
		# estimate at time t-1 and the control input applied at time
		# t-1.
		state_estimate_t = self.A_t_minus_1 @ (
			self.state_vector_t_minus_1) + (
			self.getB(self.est_yaw,self.delta_t)) @ (
			self.control_vector_t_minus_1) + (
			self.process_noise_v_t_minus_1)
			
		# Predict the state covariance estimate based on the previous
		# covariance and some noise
		P_t = self.A_t_minus_1 @ self.P_t_minus_1 @ self.A_t_minus_1.T + (
			self.Q_t)
		
		################### Update (Correct) ##########################
		# Calculate the difference between the actual sensor measurements
		# at time t minus what the measurement model predicted 
		# the sensor measurements would be for the current timestep t.
		measurement_residual_y_t = z_t_observation_vector - (
			(self.H_t @ state_estimate_t) + (
			self.sensor_noise_w_t))
			
		# Calculate the measurement residual covariance
		S_t = self.H_t @ P_t @ self.H_t.T + self.R_t
		
		# Calculate the near-optimal Kalman gain
		# We use pseudoinverse since some of the matrices might be
		# non-square or singular.
		K_t = P_t @ self.H_t.T @ np.linalg.pinv(S_t)
		
		# Calculate an updated state estimate for time t
		state_estimate_t = state_estimate_t + (K_t @ measurement_residual_y_t)
		
		# Update the state covariance estimate for time t
		P_t = P_t - (K_t @ self.H_t @ P_t)
		
		#### Update global variables for the next iteration of EKF ####		
		# Update the estimated yaw		
		self.est_yaw = state_estimate_t[2]
		
		# Update the state vector for t-1
		self.state_vector_t_minus_1 = state_estimate_t
		
		# Update the state covariance matrix
		self.P_t_minus_1 = P_t
		
		######### Return the updated state estimate ###################
		state_estimate_t = state_estimate_t.tolist()
		return state_estimate_t


def main(args=None):
    """
    Entry point for the progam.
    """
    
    # Initialize rclpy library
    rclpy.init(args=args)

    # Create the node
    estimator = PlaceholderEstimator()

    # Spin the node so the callback function is called.
    # Pull messages from any topics this node is subscribed to.
    # Publish any pending messages to the topics.
    rclpy.spin(estimator)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    estimator.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Wall Following Robot Mode Demo

Just for fun, I created a switch in the first section of the code (see previous section) that enables you to have the robot do nothing but follow walls. Here is a video demonstration of that.

Obstacle Avoidance Robot Mode Demo

Here is a demo of what the robot looks like when it does nothing but wander around the room, avoiding obstacles and walls.

Combine the Extended Kalman Filter With LQR

I recently built a project to combine the Extended Kalman Filter (for state estimation) and the Linear Quadratic Regulator (for control) to drive a simulated path-following race car (which is modeled as a point) around a track that is the same shape as the Formula 1 Singapore Grand Prix Marina Bay Street Circuit race track. 

singapore-grand-prix-robot-car
Output of the program I developed (see the Python code later in this post).
track_sgJPG
Marina Bay Street Circuit in Singapore. Marina_Bay_Street_Circuit

How the Program Works

In a real-world application, it is common for a robot to use the Extended Kalman Filter to calculate near-optimal estimates of the state of a robotic system and to use LQR to generate the control values that move the robot from one state to the next.

Here is a block diagram of that process:

1-feedback-control-lqr-ekf-diagramJPG

At each timestep: 

  • Control commands are sent by the LQR algorithm
  • The robot changes state
  • Sensor measurements are made
  • The sensor measurements are used to generate near-optimal estimates of the state
  • These estimates of the state get fed into the LQR controller to determine what the next control commands should be

The kinematics of the system are modeled according to the following diagrams:

2-kinematic-diagramJPG
3-kinematic-diagram-2JPG

In my application, I assume there are sensors mounted on the race car that help it to localize itself in the x-y coordinate plane. These sensors measure the range (r) and bearing (b) from the race car to each landmark. Each landmark has a known x and y position.

3-localization-to-landmarkJPG

The programs I developed were written in Python and use the same sequence of actions as indicated in the block diagram (in the first part of this post) in order to follow the race track (which is a list of waypoints).

Code

You will need to put these five Python (3.7 or higher) programs into a single directory.

cubic_spine_planner.py

"""
Cubic spline planner

Program: cubic_spine_planner.py

Author: Atsushi Sakai(@Atsushi_twi)
Source: https://github.com/AtsushiSakai/PythonRobotics

This program implements a cubic spline. For more information on 
cubic splines, check out this link:
https://mathworld.wolfram.com/CubicSpline.html
"""
import math
import numpy as np
import bisect

class Spline:
    """
    Cubic Spline class
    """

    def __init__(self, x, y):
        self.b, self.c, self.d, self.w = [], [], [], []

        self.x = x
        self.y = y

        self.nx = len(x)  # dimension of x
        h = np.diff(x)

        # calc coefficient c
        self.a = [iy for iy in y]

        # calc coefficient c
        A = self.__calc_A(h)
        B = self.__calc_B(h)
        self.c = np.linalg.solve(A, B)
        #  print(self.c1)

        # calc spline coefficient b and d
        for i in range(self.nx - 1):
            self.d.append((self.c[i + 1] - self.c[i]) / (3.0 * h[i]))
            tb = (self.a[i + 1] - self.a[i]) / h[i] - h[i] * \
                (self.c[i + 1] + 2.0 * self.c[i]) / 3.0
            self.b.append(tb)

    def calc(self, t):
        """
        Calc position

        if t is outside of the input x, return None

        """

        if t < self.x[0]:
            return None
        elif t > self.x[-1]:
            return None

        i = self.__search_index(t)
        dx = t - self.x[i]
        result = self.a[i] + self.b[i] * dx + \
            self.c[i] * dx ** 2.0 + self.d[i] * dx ** 3.0

        return result

    def calcd(self, t):
        """
        Calc first derivative

        if t is outside of the input x, return None
        """

        if t < self.x[0]:
            return None
        elif t > self.x[-1]:
            return None

        i = self.__search_index(t)
        dx = t - self.x[i]
        result = self.b[i] + 2.0 * self.c[i] * dx + 3.0 * self.d[i] * dx ** 2.0
        return result

    def calcdd(self, t):
        """
        Calc second derivative
        """

        if t < self.x[0]:
            return None
        elif t > self.x[-1]:
            return None

        i = self.__search_index(t)
        dx = t - self.x[i]
        result = 2.0 * self.c[i] + 6.0 * self.d[i] * dx
        return result

    def __search_index(self, x):
        """
        search data segment index
        """
        return bisect.bisect(self.x, x) - 1

    def __calc_A(self, h):
        """
        calc matrix A for spline coefficient c
        """
        A = np.zeros((self.nx, self.nx))
        A[0, 0] = 1.0
        for i in range(self.nx - 1):
            if i != (self.nx - 2):
                A[i + 1, i + 1] = 2.0 * (h[i] + h[i + 1])
            A[i + 1, i] = h[i]
            A[i, i + 1] = h[i]

        A[0, 1] = 0.0
        A[self.nx - 1, self.nx - 2] = 0.0
        A[self.nx - 1, self.nx - 1] = 1.0
        #  print(A)
        return A

    def __calc_B(self, h):
        """
        calc matrix B for spline coefficient c
        """
        B = np.zeros(self.nx)
        for i in range(self.nx - 2):
            B[i + 1] = 3.0 * (self.a[i + 2] - self.a[i + 1]) / \
                h[i + 1] - 3.0 * (self.a[i + 1] - self.a[i]) / h[i]
        return B


class Spline2D:
    """
    2D Cubic Spline class

    """

    def __init__(self, x, y):
        self.s = self.__calc_s(x, y)
        self.sx = Spline(self.s, x)
        self.sy = Spline(self.s, y)

    def __calc_s(self, x, y):
        dx = np.diff(x)
        dy = np.diff(y)
        self.ds = [math.sqrt(idx ** 2 + idy ** 2)
                   for (idx, idy) in zip(dx, dy)]
        s = [0]
        s.extend(np.cumsum(self.ds))
        return s

    def calc_position(self, s):
        """
        calc position
        """
        x = self.sx.calc(s)
        y = self.sy.calc(s)

        return x, y

    def calc_curvature(self, s):
        """
        calc curvature
        """
        dx = self.sx.calcd(s)
        ddx = self.sx.calcdd(s)
        dy = self.sy.calcd(s)
        ddy = self.sy.calcdd(s)
        k = (ddy * dx - ddx * dy) / ((dx ** 2 + dy ** 2)**(3 / 2))
        return k

    def calc_yaw(self, s):
        """
        calc yaw
        """
        dx = self.sx.calcd(s)
        dy = self.sy.calcd(s)
        yaw = math.atan2(dy, dx)
        return yaw


def calc_spline_course(x, y, ds=0.1):
    sp = Spline2D(x, y)
    s = list(np.arange(0, sp.s[-1], ds))

    rx, ry, ryaw, rk = [], [], [], []
    for i_s in s:
        ix, iy = sp.calc_position(i_s)
        rx.append(ix)
        ry.append(iy)
        ryaw.append(sp.calc_yaw(i_s))
        rk.append(sp.calc_curvature(i_s))

    return rx, ry, ryaw, rk, s


def main():
    print("Spline 2D test")
    import matplotlib.pyplot as plt
    x = [-2.5, 0.0, 2.5, 5.0, 7.5, 3.0, -1.0]
    y = [0.7, -6, 5, 6.5, 0.0, 5.0, -2.0]
    ds = 0.1  # [m] distance of each intepolated points

    sp = Spline2D(x, y)
    s = np.arange(0, sp.s[-1], ds)

    rx, ry, ryaw, rk = [], [], [], []
    for i_s in s:
        ix, iy = sp.calc_position(i_s)
        rx.append(ix)
        ry.append(iy)
        ryaw.append(sp.calc_yaw(i_s))
        rk.append(sp.calc_curvature(i_s))

    flg, ax = plt.subplots(1)
    plt.plot(x, y, "xb", label="input")
    plt.plot(rx, ry, "-r", label="spline")
    plt.grid(True)
    plt.axis("equal")
    plt.xlabel("x[m]")
    plt.ylabel("y[m]")
    plt.legend()

    flg, ax = plt.subplots(1)
    plt.plot(s, [math.degrees(iyaw) for iyaw in ryaw], "-r", label="yaw")
    plt.grid(True)
    plt.legend()
    plt.xlabel("line length[m]")
    plt.ylabel("yaw angle[deg]")

    flg, ax = plt.subplots(1)
    plt.plot(s, rk, "-r", label="curvature")
    plt.grid(True)
    plt.legend()
    plt.xlabel("line length[m]")
    plt.ylabel("curvature [1/m]")

    plt.show()

if __name__ == '__main__':
    main()

kinematics.py

""" 
Implementation of the two-wheeled differential drive robot car 
and its controller.

Our goal in using LQR is to find the optimal control inputs:
  [linear velocity of the car, angular velocity of the car]
	
We want to both minimize the error between the current state 
and a desired state, while minimizing actuator effort 
(e.g. wheel rotation rate). These are competing objectives because a 
large u (i.e. wheel rotation rates) expends a lot of
actuator energy but can drive the state error to 0 really fast.
LQR helps us balance these competing objectives.

If a system is linear, LQR gives the optimal control inputs that 
takes a system's state to 0, where the state is
"current state - desired state".

Implemented by Addison Sears-Collins
Date: December 10, 2020

"""
# Import required libraries
import numpy as np
import scipy.linalg as la

class DifferentialDrive(object):
  """
  Implementation of Differential Drive kinematics.
  This represents a two-wheeled vehicle defined by the following states
  state = [x,y,theta] where theta is the yaw angle
  and accepts the following control inputs
  input = [linear velocity of the car, angular velocity of the car]
  """
  def __init__(self):
    """
    Initializes the class
    """
    # Covariance matrix representing action noise 
    # (i.e. noise on the control inputs)
    self.V = None

  def get_state_size(self):
    """
    The state is (X, Y, THETA) in global coordinates, so the state size is 3.
    """
    return 3

  def get_input_size(self):
    """
    The control input is ([linear velocity of the car, angular velocity of the car]), 
    so the input size is 2.
    """
    return 2

  def get_V(self):
    """
    This function provides the covariance matrix V which 
    describes the noise that can be applied to the forward kinematics.
    
    Feel free to experiment with different levels of noise.

    Output
      :return: V: input cost matrix (3 x 3 matrix)
    """
    # The np.eye function returns a 2D array with ones on the diagonal
    # and zeros elsewhere.
    if self.V is None:
      self.V = np.eye(3)
      self.V[0,0] = 0.01
      self.V[1,1] = 0.01
      self.V[2,2] = 0.1
    return 1e-5*self.V

  def forward(self,x0,u,v,dt=0.1):
    """
    Computes the forward kinematics for the system.

    Input
      :param x0: The starting state (position) of the system (units:[m,m,rad])  
                 np.array with shape (3,) -> 
                 (X, Y, THETA)
      :param u:  The control input to the system  
                 2x1 NumPy Array given the control input vector is  
                 [linear velocity of the car, angular velocity of the car] 
                 [meters per second, radians per second]
      :param v:  The noise applied to the system (units:[m, m, rad]) ->np.array 
                 with shape (3,)
      :param dt: Change in time (units: [s])

    Output
      :return: x1: The new state of the system (X, Y, THETA)
    """
    u0 = u 
		
    # If there is no noise applied to the system
    if v is None:
      v = 0
    
		# Starting state of the vehicle
    X = x0[0]
    Y = x0[1]
    THETA = x0[2]

    # Control input
    u_linvel = u0[0]
    u_angvel = u0[1]

    # Velocity in the x and y direction in m/s
    x_dot = u_linvel * np.cos(THETA)
    y_dot = u_linvel * np.sin(THETA)
  
    # The new state of the system
    x1 = np.empty(3)
	
    # Calculate the new state of the system
    # Noise is added like in slide 34 in Lecture 7
    x1[0] = x0[0] + x_dot * dt + v[0] # X
    x1[1] = x0[1] + y_dot * dt + v[1] # Y
    x1[2] = x0[2] + u_angvel * dt + v[2] # THETA

    return x1

  def linearize(self, x, dt=0.1):
    """
    Creates a linearized version of the dynamics of the differential 
    drive robotic system (i.e. a
    robotic car where each wheel is controlled separately.

    The system's forward kinematics are nonlinear due to the sines and 
    cosines, so we need to linearize 
    it by taking the Jacobian of the forward kinematics equations with respect 
     to the control inputs.

    Our goal is to have a discrete time system of the following form: 
    x_t+1 = Ax_t + Bu_t where:

    Input
      :param x: The state of the system (units:[m,m,rad]) -> 
                np.array with shape (3,) ->
                (X, Y, THETA) ->
                X_system = [x1, x2, x3]
      :param dt: The change in time from time step t to time step t+1      

    Output
      :return: A: Matrix A is a 3x3 matrix (because there are 3 states) that 
                  describes how the state of the system changes from t to t+1 
                  when no control command is executed. Typically, 
                  a robotic car only drives when the wheels are turning. 
                  Therefore, in this case, A is the identity matrix.
      :return: B: Matrix B is a 3 x 2 matrix (because there are 3 states and 
                  2 control inputs) that describes how
                  the state (X, Y, and THETA) changes from t to t + 1 due to 
                  the control command u.
                  Matrix B is found by taking the The Jacobian of the three 
                  forward kinematics equations (for X, Y, THETA) 
                  with respect to u (3 x 2 matrix)

    """
    THETA = x[2]

    ####### A Matrix #######
    # A matrix is the identity matrix
    A = np.array([[1.0,   0,  0],
                  [  0, 1.0,  0],
                  [  0,   0, 1.0]])

    ####### B Matrix #######
    B = np.array([[np.cos(THETA)*dt, 0],
                  [np.sin(THETA)*dt, 0],
                  [0, dt]])
		
    return A, B

def dLQR(F,Q,R,x,xf,dt=0.1):
    """
    Discrete-time linear quadratic regulator for a non-linear system.

    Compute the optimal control given a nonlinear system, cost matrices, 
    a current state, and a final state.
    Compute the control variables that minimize the cumulative cost.

    Solve for P using the dynamic programming method.

    Assume that Qf = Q

    Input:
      :param F: The dynamics class object (has forward and linearize functions 
                implemented)
      :param Q: The state cost matrix Q -> np.array with shape (3,3)
      :param R: The input cost matrix R -> np.array with shape (2,2)
      :param x: The current state of the system x -> np.array with shape (3,)
      :param xf: The desired state of the system xf -> np.array with shape (3,)
      :param dt: The size of the timestep -> float

    Output
      :return: u_t_star: Optimal action u for the current state 
                   [linear velocity of the car, angular velocity of the car]
                   [meters per second, radians per second]
    """
    # We want the system to stabilize at xf, 
    # so we let x - xf be the state.
    # Actual state - desired state
    x_error = x - xf

    # Calculate the A and B matrices
    A, B = F.linearize(x, dt)

    # Solutions to discrete LQR problems are obtained using dynamic 
    # programming.
    # The optimal solution is obtained recursively, starting at the last 
    # time step and working backwards.
    N = 50

    # Create a list of N + 1 elements
    P = [None] * (N + 1)

    # Assume Qf = Q
    Qf = Q

    # 1. LQR via Dynamic Programming 
    P[N] = Qf 

    # 2. For t = N, ..., 1
    for t in range(N, 0, -1):

      # Discrete-time Algebraic Riccati equation to calculate the optimal 
      # state cost matrix
      P[t-1] = Q + A.T @ P[t] @ A - (A.T @ P[t] @ B) @ la.pinv(
               R + B.T @ P[t] @ B) @ (B.T @ P[t] @ A)      

    # Create a list of N elements
    K = [None] * N
    u = [None] * N

    # 3 and 4. For t = 0, ..., N - 1
    for t in range(N):

      # Calculate the optimal feedback gain K_t
      K[t] = -la.pinv(R + B.T @ P[t+1] @ B) @ B.T @ P[t+1] @ A

    for t in range(N):
	
      # Calculate the optimal control input
      u[t] = K[t] @ x_error

    # Optimal control input is u_t_star
    u_t_star = u[N-1]
  
    # Return the optimal control inputs
    return u_t_star

def get_R():
    """
    This function provides the R matrix to the lqr_ekf_control simulator.
    
    Returns the input cost matrix R.

    Experiment with different gains.
    This matrix penalizes actuator effort 
    (i.e. rotation of the motors on the wheels).
    The R matrix has the same number of rows as are actuator states 
    [linear velocity of the car, angular velocity of the car]
    [meters per second, radians per second]
    This matrix often has positive values along the diagonal.
    We can target actuator states where we want low actuator 
    effort by making the corresponding value of R large.   

    Output
      :return: R: Input cost matrix
    """
    R = np.array([[0.01, 0],  # Penalization for linear velocity effort
                  [0, 0.01]]) # Penalization for angular velocity effort

    return R

def get_Q():
    """
    This function provides the Q matrix to the lqr_ekf_control simulator.
    
    Returns the state cost matrix Q.

    Experiment with different gains to see their effect on the vehicle's 
    behavior.
    Q helps us weight the relative importance of each state in the state 
    vector (X, Y, THETA). 
    Q is a square matrix that has the same number of rows as there are states.
    Q penalizes bad performance.
    Q has positive values along the diagonal and zeros elsewhere.
    Q enables us to target states where we want low error by making the 
    corresponding value of Q large.
    We can start with the identity matrix and tweak the values through trial 
    and error.

    Output
      :return: Q: State cost matrix (3x3 matrix because the state vector is 
                  (X, Y, THETA))
    """
    Q = np.array([[0.4, 0, 0], # Penalize X position error (global coordinates)
                  [0, 0.4, 0], # Penalize Y position error (global coordinates)
                  [0, 0, 0.4]])# Penalize heading error (global coordinates)
    
    return Q

lqr_ekf_control.py

"""
A robot will follow a racetrack using an LQR controller and EKF (with landmarks)
to estimate the state (i.e. x position, y position, yaw angle) at each timestep

####################

Adapted from code authored by Atsushi Sakai (@Atsushi_twi)
Source: https://github.com/AtsushiSakai/PythonRobotics

"""
# Import important libraries
import numpy as np
import math
import matplotlib.pyplot as plt
from kinematics import *
from sensors import *
from utils import *
from time import sleep

show_animation = True

def closed_loop_prediction(desired_traj, landmarks):

    ## Simulation Parameters
    T = desired_traj.shape[0]  # Maximum simulation time
    goal_dis = 0.1 # How close we need to get to the goal
    goal = desired_traj[-1,:] # Coordinates of the goal
    dt = 0.1 # Timestep interval
    time = 0.0 # Starting time


    ## Initial States 
    state = np.array([8.3,0.69,0]) # Initial state of the car
    state_est = state.copy()
    sigma = 0.1*np.ones(3) # State covariance matrix

    ## Get the Cost-to-go and input cost matrices for LQR
    Q = get_Q() # Defined in kinematics.py
    R = get_R() # Defined in kinematics.py


    ## Initialize the Car and the Car's landmark sensor 
    DiffDrive = DifferentialDrive()
    LandSens = LandmarkDetector(landmarks)
		
    # Process noise and sensor measurement noise
    V = DiffDrive.get_V()
    W = LandSens.get_W()

    ## Create objects for storing states and estimated state
    t = [time]
    traj = np.array([state])
    traj_est = np.array([state_est])

    ind = 0
    while T >= time:
		
        ## Point to track
        ind = int(np.floor(time))
        goal_i = desired_traj[ind,:]

        ## Generate noise
        # v = process noise, w = measurement noise
        v = np.random.multivariate_normal(np.zeros(V.shape[0]),V)
        w = np.random.multivariate_normal(np.zeros(W.shape[0]),W)

        ## Generate optimal control commands
        u_lqr = dLQR(DiffDrive,Q,R,state_est,goal_i[0:3],dt)

        ## Move forwad in time
        state = DiffDrive.forward(state,u_lqr,v,dt)
				
        # Take a sensor measurement for the new state
        y = LandSens.measure(state,w)

        # Update the estimate of the state using the EKF
        state_est, sigma = EKF(DiffDrive,LandSens,y,state_est,sigma,u_lqr,dt)

        # Increment time
        time = time + dt

        # Store the trajectory and estimated trajectory
        t.append(time)
        traj = np.concatenate((traj,[state]),axis=0)
        traj_est = np.concatenate((traj_est,[state_est]),axis=0)

        # Check to see if the robot reached goal
        if np.linalg.norm(state[0:2]-goal[0:2]) <= goal_dis:
            print("Goal reached")
            break

        ## Plot the vehicles trajectory
        if time % 1 < 0.1 and show_animation:
            plt.cla()
            plt.plot(desired_traj[:,0], desired_traj[:,1], "-r", label="course")
            plt.plot(traj[:,0], traj[:,1], "ob", label="trajectory")
            plt.plot(traj_est[:,0], traj_est[:,1], "sk", label="estimated trajectory")

            plt.plot(goal_i[0], goal_i[1], "xg", label="target")
            plt.axis("equal")
            plt.grid(True)
            plt.title("SINGAPORE GRAND PRIX\n" + "speed[m/s]:" + str(
											round(np.mean(u_lqr), 2)) +
                      ",target index:" + str(ind))
            plt.pause(0.0001)

        #input()

    return t, traj


def main():

    # Countdown to start
    print("\n*** SINGAPORE GRAND PRIX ***\n")
    print("Start your engines!")
    print("3.0")
    sleep(1.0)
    print("2.0")
    sleep(1.0)
    print("1.0\n")
    sleep(1.0)
    print("LQR + EKF steering control tracking start!!")

    # Create the track waypoints
    ax = [8.3,8.0, 7.2, 6.5, 6.2, 6.5, 1.5,-2.0,-3.5,-5.0,-7.9,
       -6.7,-6.7,-5.2,-3.2,-1.2, 0.0, 0.2, 2.5, 2.8, 4.4, 4.5, 7.8, 8.5, 8.3]
    ay = [0.7,4.3, 4.5, 5.2, 4.0, 0.7, 1.3, 3.3, 1.5, 3.0,-1.0,
       -2.0,-3.0,-4.5, 1.1,-0.7,-1.0,-2.0,-2.2,-1.2,-1.5,-2.4,-2.7,-1.7,-0.1]
    
    # These landmarks help the mobile robot localize itself
    landmarks = [[4,3],
                 [8,2],
                 [-1,-4]]
		
    # Compute the desired trajectory
    desired_traj = compute_traj(ax,ay)

    t, traj = closed_loop_prediction(desired_traj,landmarks)

    # Display the trajectory that the mobile robot executed
    if show_animation:
        plt.close()
        flg, _ = plt.subplots(1)
        plt.plot(ax, ay, "xb", label="input")
        plt.plot(desired_traj[:,0], desired_traj[:,1], "-r", label="spline")
        plt.plot(traj[:,0], traj[:,1], "-g", label="tracking")
        plt.grid(True)
        plt.axis("equal")
        plt.xlabel("x[m]")
        plt.ylabel("y[m]")
        plt.legend()

        plt.show()


if __name__ == '__main__':
    main()

sensors.py

"""
Extended Kalman Filter implementation using landmarks to localize

Implemented by Addison Sears-Collins
Date: December 10, 2020
"""
import numpy as np
import scipy.linalg as la
import math

class LandmarkDetector(object):
  """
  This class represents the sensor mounted on the robot that is used to 
  to detect the location of landmarks in the environment.
  """
  def __init__(self,landmark_list):
    """
    Calculates the sensor measurements for the landmark sensor
		
    Input
      :param landmark_list: 2D list of landmarks [L1,L2,L3,...,LN], 
                            where each row is a 2D landmark defined by 
						                Li =[l_i_x,l_i_y]
                            which corresponds to its x position and y 
                            position in the global coordinate frame.
    """
    # Store the x and y position of each landmark in the world
    self.landmarks = np.array(landmark_list) 

    # Record the total number of landmarks
    self.N_landmarks = self.landmarks.shape[0]
		
    # Variable that represents landmark sensor noise (i.e. error)
    self.W = None 

  def get_W(self):
    """
    This function provides the covariance matrix W which describes the noise 
    that can be applied to the sensor measurements.

    Feel free to experiment with different levels of noise.
    
    Output
      :return: W: measurement noise matrix (4x4 matrix given two landmarks)
                  In this implementation there are two landmarks. 
                  Each landmark has an x location and y location, so there are 
                  4 individual sensor measurements. 
    """
    # In the EKF code, we will condense this 4x4 matrix so that it is a 4x1 
    # landmark sensor measurement noise vector (i.e. [x1 noise, y1 noise, x2 
    # noise, y2 noise]
    if self.W is None:
      self.W = 0.0095*np.eye(2*self.N_landmarks)
    return self.W

  def measure(self,x,w=None):
    """
    Computes the landmark sensor measurements for the system. 
    This will be a list of range (i.e. distance) 
    and relative angles between the robot and a 
    set of landmarks defined with [x,y] positions.

    Input
      :param x: The state [x_t,y_t,theta_t] of the system -> np.array with 
                shape (3,). x is a 3x1 vector
      :param w: The noise (assumed Gaussian) applied to the measurement -> 
                np.array with shape (2 * N_Landmarks,)
                w is a 4x1 vector
    Output
      :return: y: The resulting observation vector (4x1 in this 
                  implementation because there are 2 landmarks). The 
                  resulting measurement is 
                  y = [h1_range_1,h1_angle_1, h2_range_2, h2_angle_2]
    """
    # Create the observation vector
    y = np.zeros(shape=(2*self.N_landmarks,))

    # Initialize state variables	
    x_t = x[0]
    y_t = x[1]
    THETA_t = x[2]
	
    # Fill in the observation model y
    for i in range(0, self.N_landmarks):

      # Set the x and y position for landmark i
      x_l_i = self.landmarks[i][0]
      y_l_i = self.landmarks[i][1]

      # Set the noise value
      w_r = w[i*2]
      w_b = w[i*2+1]	  

      # Calculate the range (distance from robot to the landmark) and 
			# bearing angle to the landmark			
      range_i = math.sqrt(((x_t - x_l_i)**2) + ((y_t - y_l_i)**2)) + w_r
      angle_i = math.atan2(y_t - y_l_i,x_t - x_l_i) - THETA_t + w_b

      # Populate the predicted sensor observation vector	  
      y[i*2] = range_i
      y[i*2+1] = angle_i

    return y

  def jacobian(self,x):
    """
    Computes the first order jacobian around the state x

    Input
      :param x: The starting state (position) of the system -> np.array 
                with shape (3,)

    Output
      :return: H: The resulting Jacobian matrix H for the sensor. 
                  The number of rows is equal to two times the number of 
                  landmarks H. 2 rows (for the range (distance) and bearing 
                  to landmark measurement) x 3 columns for the number of 
                  states (X, Y, THETA).
  
	                In other words, the number of rows is equal to the total 
                  number of individual sensor measurements.
				          Since we have 2 landmarks, H will be a 4x3 matrix 
                    (2 * N_Landmarks x Number of states)      
    """
    # Create the Jacobian matrix H
    # For example, the first two rows of the Jacobian matrix H pertain to 
    # the first landmark:      
    #   dh_range/dx_t    dh_range/dy_t    dh_range/dtheta_t
    #   dh_bearing/dx_t  dh_bearing/dy_t  dh_bearing/dtheta_t
    # Note that 'bearing' and 'angle' are synonymous in this program.
    H = np.zeros(shape=(2 * self.N_landmarks, 3))

    # Extract the state
    X_t = x[0]
    Y_t = x[1]
    THETA_t = x[2]
   
    # Fill in H, the Jacobian matrix with the partial derivatives of the 
    # observation (measurement) model h
    # Partial derivatives were calculated using an online partial derivatives 
    # calculator.
    # atan2 derivatives are listed here https://en.wikipedia.org/wiki/Atan2
    for i in range(0, self.N_landmarks):
	
      # Set the x and y position for landmark i
      x_l_i = self.landmarks[i][0]
      y_l_i = self.landmarks[i][1]
	  
      dh_range_dx_t = (X_t-x_l_i) / math.sqrt((X_t-x_l_i)**2 + (Y_t-y_l_i)**2)
      H[i*2][0] = dh_range_dx_t      

      dh_range_dy_t = (Y_t-y_l_i) / math.sqrt((X_t-x_l_i)**2 + (Y_t-y_l_i)**2)
      H[i*2][1] = dh_range_dy_t     

      dh_range_dtheta_t = 0
      H[i*2][2] = dh_range_dtheta_t

      dh_bearing_dx_t = -(Y_t-y_l_i) / ((X_t-x_l_i)**2 + (Y_t-y_l_i)**2)
      H[i*2+1][0] = dh_bearing_dx_t	  
	  
      dh_bearing_dy_t = (X_t-x_l_i) / ((X_t-x_l_i)**2 + (Y_t-y_l_i)**2)
      H[i*2+1][1] = dh_bearing_dy_t    

      dh_bearing_dtheta_t = -1.0   
      H[i*2+1][2] = dh_bearing_dtheta_t

    return H

def EKF(DiffDrive,Sensor,y,x_hat,sigma,u,dt,V=None,W=None):
    """
    Some of these matrices will be non-square or singular. Utilize the 
    pseudo-inverse function la.pinv instead of inv to avoid these errors.

    Input
      :param DiffDrive: The DifferentialDrive object defined in kinematics.py
      :param Sensor: The Landmark Sensor object defined in this class
      :param y: The observation vector (4x1 in this implementation because 
                there are 2 landmarks). The measurement is 
                y = [h1_range_1,h1_angle_1, h2_range_2, h2_angle_2]
      :param x_hat: The starting estimate of the state at time t -> 
                    np.array with shape (3,)
                    (X_t, Y_t, THETA_t)
      :param sigma: The state covariance matrix at time t -> np.array with 
                    shape (3,1) initially, then 3x3
      :param u: The input to the system at time t -> np.array with shape (2,1)
                These are the control inputs to the system.
                [left wheel rotational velocity, right wheel rotational 
                velocity]
      :param dt: timestep size delta t
      :param V: The state noise covariance matrix  -> np.array with shape (3,3)
      :param W: The measurment noise covariance matrix -> np.array with shape 
                (2*N_Landmarks,2*N_Landmarks)
                4x4 matrix

    Output
      :return: x_hat_2: The estimate of the state at time t+1 
                        [X_t+1, Y_t+1, THETA_t+1]
      :return: sigma_est: The new covariance matrix at time t+1 (3x3 matrix)
      
    """	
    V = DiffDrive.get_V() # 3x3 matrix for the state noise
    W = Sensor.get_W() # 4x4 matrix for the measurement noise

    ## Generate noise
    # v = process noise, w = measurement noise
    v = np.random.multivariate_normal(np.zeros(V.shape[0]),V) # 3x1 vector
    w = np.random.multivariate_normal(np.zeros(W.shape[0]),W) # 4x1 vector	
    
	##### Prediction Step #####

    # Predict the state estimate based on the previous state and the 
    # control input
    # x_predicted is a 3x1 vector (X_t+1, Y_t+1, THETA_t+1)
    x_predicted = DiffDrive.forward(x_hat,u,v,dt)

    # Calculate the A and B matrices	
    A, B = DiffDrive.linearize(x=x_hat)
    
    # Predict the covariance estimate based on the 
    # previous covariance and some noise
    # A and V are 3x3 matrices
    sigma_3by3 = None
    if (sigma.size == 3):
      sigma_3by3 = sigma * np.eye(3)
    else:
      sigma_3by3 = sigma
	  
    sigma_new = A @ sigma_3by3 @ A.T + V

    ##### Correction Step #####  

    # Get H, the 4x3 Jacobian matrix for the sensor
    H = Sensor.jacobian(x_hat) 

    # Calculate the observation model	
    # y_predicted is a 4x1 vector
    y_predicted = Sensor.measure(x_predicted, w)

    # Measurement residual (delta_y is a 4x1 vector)
    delta_y = y - y_predicted

    # Residual covariance 
    # 4x3 @ 3x3 @ 3x4 -> 4x4 matrix
    S = H @ sigma_new @ H.T + W

    # Compute the Kalman gain
    # The Kalman gain indicates how certain you are about
    # the observations with respect to the motion
    # 3x3 @ 3x4 @ 4x4 -> 3x4
    K = sigma_new @ H.T @ la.pinv(S)

    # Update the state estimate
    # 3x1 + (3x4 @ 4x1 -> 3x1)
    x_hat_2 = x_predicted + (K @ delta_y)
    
    # Update the covariance estimate
    # 3x3 - (3x4 @ 4x3) @ 3x3
    sigma_est = sigma_new - (K @ H @ sigma_new)

    return x_hat_2 , sigma_est

utils.py

"""
Program: utils.py
This program helps calculate the waypoints along the racetrack
that the robot needs to follow.

Modified from code developed by Atsushi Sakai
Source: https://github.com/AtsushiSakai/PythonRobotics
"""
import numpy as np
import math
import cubic_spline_planner

def pi_2_pi(angle):
    return (angle + math.pi) % (2*math.pi) - math.pi

def compute_traj(ax,ay):
    cx, cy, cyaw, ck, s = cubic_spline_planner.calc_spline_course(
        ax, ay, ds=0.1)
    target_speed = 1  # simulation parameter km/h -> m/s
    sp = calc_speed_profile(cx, cy, cyaw, target_speed)

    desired_traj = np.array([cx,cy,cyaw,sp]).T
    return desired_traj

def calc_nearest_index(state, traj):
    cx = traj[:,0]
    cy = traj[:,1]
    cyaw = traj[:,2]
    dx = [state[0] - icx for icx in cx]
    dy = [state[1] - icy for icy in cy]

    d = [idx ** 2 + idy ** 2 for (idx, idy) in zip(dx, dy)]

    mind = min(d)

    ind = d.index(mind)

    mind = math.sqrt(mind)

    dxl = cx[ind] - state[0]
    dyl = cy[ind] - state[1]

    angle = pi_2_pi(cyaw[ind] - math.atan2(dyl, dxl))
    if angle < 0:
        mind *= -1

    return ind, mind


def calc_speed_profile(cx, cy, cyaw, target_speed):
    speed_profile = [target_speed] * len(cx)

    direction = 1.0

    # Set stop point
    for i in range(len(cx) - 1):
        dyaw = abs(cyaw[i + 1] - cyaw[i])
        switch = math.pi / 4.0 <= dyaw < math.pi / 2.0

        if switch:
            direction *= -1

        if direction != 1.0:
            speed_profile[i] = - target_speed
        else:
            speed_profile[i] = target_speed

        if switch:
            speed_profile[i] = 0.0

    speed_profile[-1] = 0.0

    return speed_profile

To run the code, navigate to the directory and run the lqr_ekf_control.py program. I’m using Anaconda to run my Python programs, so I’ll type:

python lqr_ekf_control.py

Keep building!

Linear Quadratic Regulator (LQR) With Python Code Example

In this tutorial, we will learn about the Linear Quadratic Regulator (LQR). At the end, I’ll show you my example implementation of LQR in Python.

To get started, let’s take a look at what LQR is all about. Since LQR is an optimal feedback control technique, let’s start with the definition of optimal feedback control and then build our way up to LQR.

Real-World Applications

drone-in-rain

Here are a couple of real-world examples where you might find LQR control:

  • Enabling a self-driving car to stay in the center of a lane or maintain a certain speed
  • Enabling a drone to hover at a specific altitude
  • Enabling a wheeled robot to follow a predetermined path

Prerequisites

  • To understand this tutorial, it is helpful if you have completed my state space model tutorial.
  • Otherwise, if you understand the concept of state space representation, jump right into this tutorial.

What is Optimal Feedback Control?

Consider these two real world examples:

  • Example 1: You want a robot car to go from point A to point B along a predetermined path. 
  • Example 2: You have a drone, and you want it to hover in the air at a specific altitude. You want it to take aerial photos of you. 

How do you accomplish both of these tasks?

Both the robot and the drone need to have a feedback control algorithm running inside them. 

The control algorithm’s job will be to output control signals (e.g. velocity of the wheels on the robot car, propeller speed, etc.) that reduce the error between the actual state (e.g. current position of the robot car or altitude for the drone) and the desired state. 

The sensors on the robot car/drone need to read the state of the system and feed that information back to the controller. 

The controller compares the actual state (i.e. the sensor information) with the desired state and then generates the best possible (i.e. optimal) control signals (often known as “system input”) that will reduce or eliminate that error.

In Example 1, the desired state is the (x,y) coordinate or GPS position along a predetermined path. At each timestep, you want the control algorithm to generate velocity commands for your robot that minimize the error between your current actual state (i.e. position) and the desired position (i.e. on top of the predetermined path).

In Example 2, with the drone, you want to minimize the error between your desired altitude of the drone and the actual altitude of the drone.

Here is how all that looks in a picture:

2-feedback-controlJPG

Your robot car and drone go through a continuous process of sensing the state of the system and then generating the best possible control commands to minimize the difference between the desired state xdesired and actual state xt

The goal at each timestep (as the robot car drives along the path or the drone hovers in the air) is to use sensor feedback to drive the state error towards 0, where:

State error = xt – xdesired

How Can We Drive State Error to 0 Efficiently?

Let’s revisit our examples from the previous section. 

Suppose in Example 2, a strong wind from a thunderstorm began to blow the drone downward, causing it to fall way below the desired altitude. 

3-drone_flying_technology_motion

To correct its altitude, the drone could:

  • Action 1: Spin its propellers at top speed so that it shoots right up to the desired altitude as quickly as possible.
  • Action 2: Gradually increase the speed of its propellers until it reaches the desired altitude.

Both Action 1 and 2 have pros and cons. 

Action 1

  • Pros: Your drone returns to your desired altitude quickly. 
  • Cons: Because the motors were spinning at top speed, you drained your drone’s battery significantly. Your drone now has only a few minutes of battery left before it dies, and the drone will soon come crashing down to Earth.

Action 2

  • Pros: Less battery usage than Action 1
  • Cons: It took seemingly forever for the drone to return to the desired altitude. 

How can we write a smart controller that balances both objectives? We want the system to both:

  1. Have good control (i.e. stay on the desired path or altitude as closely as possible).
  2. Minimize actuator (i.e. motor) effort as much as possible so the control inputs don’t cause the battery to drain so quickly. 

This, my friends, is where the Linear Quadratic Regulator (LQR) comes to the rescue. 

LQR Problem

LQR solves a common problem when we’re trying to optimally control a robotic system. Going back to our robot car example, let’s draw its diagram. Here is our real world robot:

obstacle_avoiding_robot

Let’s model this robot in a 3D diagram with coordinate axes x, y, and z:

4-model-in-3d

Here is the bird’s-eye view in 2D form:

5-birds-eyeJPG

Suppose the robot car’s state at time t (position, orientation) is represented by vector xt (e.g. where xt = [xglobal, yglobal, orientation (yaw angle)].

6-state-vectorJPG

A state space model for this robotic system might look like the following:

7-state-space-modelJPG

where 

  • xt is the current state vector at time t in the global coordinate frame… [xt, ytt] = [x coordinate, y coordinate, yaw angle (i.e. rotation angle around the z-axis)]
  • xt-1 is the state vector at time t-1 in the global coordinate frame… [xt-1, yt-1t-1]
  • ut-1 represents the control input vector at time t-1… [vt-1, ωt-1] = [linear velocity, angular velocity]
  • A and B are the state transition matrices.

(If that equation above doesn’t make a lot of sense, please check out the tutorial in the Prerequisites of this article where I dive into it in detail)

We want to choose control inputs ut-1….such that

  1. xt actual – xt desired is small…i.e. we get good control
  2. ut-1 is small…i.e. we use a minimal amount of actuator effort (i.e. control input, motor, etc.).

Both of these are competing objectives. A large u can drive the state error down really fast. A small u means the state error will remain really large. LQR was designed to solve this problem.

How LQR Works

The purpose of LQR is to compute the optimal control inputs ut-1 (i.e. velocity of the wheels of a robotic car, motors for propellers of a drone) given:

  1. Actual state xt
  2. Desired state xt
  3. State cost matrix Q (I’ll explain this term later in this tutorial)
  4. Input cost matrix R (I’ll explain this term later in this tutorial) 
  5. A matrix (See my state space modeling tutorial for how to calculate A)
  6. B matrix (See my state space modeling tutorial for how to calculate B)
  7. Time interval dt

that minimize the cumulative “cost.”

LQR is all about the cost function. It does some fancy math to come up with a function (this Wikipedia page shows the function) that expresses a “cost” (you could also substitute the word “penalty”) that is a function of both state error and actuator effort.

LQR’s goal is to minimize the “cost” of the state error, while minimizing the “cost” of actuator effort. 

  • The higher the state error term, the higher the overall cost. 
  • The higher the actuator effort term, the higher the overall cost. 

LQR determines the point along the cost curve where the “cost function” is minimized…thereby balancing both objectives of keeping state error and actuator effort minimized.

The cost curve below is for illustration purposes only. I’m using it just to explain this concept. In reality, the cost curve does not look like this. I’m using this graphic to convey to you the idea that LQR is all about finding the optimal control commands (i.e. linear velocity, angular velocity in the case of our robot car example) at which the actuator-effort state-error cost function is at a minimum. And from calculus, we know that the minimum of this cost function occurs at the point where the derivative (i.e. slope) is equal to 0.

We use some fancy math (I’ll get into this later) to find out what this point is.

cost-curve

What is Q?

Q is called the state cost matrix. Q helps us weigh the relative importance of each state in the state vector (i.e. [x error,y error,yaw angle error]). 

Q is a square matrix that has the same number of rows and columns as there are states.

Q penalizes bad performance (e.g. penalizes large differences between where you want the robot to be vs. where it actually is in the world)

Q has positive values along the diagonal and zeros elsewhere.

Q enables us to target states where we want low error by making the corresponding value of Q large.

I typically start out with the identity matrix and then tweak the values through trial and error. In a 3-state system, such as with our two-wheeled robot car, Q would be the following 3×3 matrix:

rsz_1q-value

 

What is R?

Similarly, R is the input cost matrix. This matrix penalizes actuator effort (i.e. rotation of the motors on the wheels). 

The R matrix has the same number of rows as are control inputs and the same number of columns as are control inputs. 

For example, imagine if we had a two-wheeled car with two control inputs, one for the linear velocity v and one for the angular velocity ω. 

8-imagine-ifJPG

The control input vector ut-1 would be [linear velocity v in meters per second, angular velocity ω in radians per second].

The input cost matrix often has positive values along the diagonal. We can use this matrix to target actuator states where we want low actuator effort by making the corresponding value of R large.   

A lot of people start with the identity matrix, but I’ll start out with some low numbers. You want to tweak the values through trial and error.

In a case where we have two control inputs into a robotic system, here would be my starting 2×2 R matrix:

rmatrix

LQR Algorithm Pseudocode

LQR uses a technique called dynamic programming. You don’t need to worry what that means. 

At each timestep t as the robot or drone moves in the world, the optimal control input is calculated using a series of for loops (where we run each for loop ~N times) that spit out the u (i.e. control inputs) that corresponds to the minimal overall cost.

Here is the pseudocode for LQR. Hang tight. Go through this slowly. There is a lot of mathematical notation thrown around, and I want you to leave this tutorial with a solid understanding of LQR.

Let’s walk through this together. The pseudocode is in bold italics, and my comments are in plain text. You see below that the LQR function accepts 7 parameters.

LQR(Actual State x, Desired State xf, Q, R, A, B, dt):

x_error = Actual State x – Desired State xf

Initialize N = ## 

I typically set N to some arbitrary integer like 50. Solutions to LQR problems are obtained recursively, starting at the last time step, and working backwards in time to the first time step. 

In other words, the for loops (we’ll get to these in a second) need to go through lots of iterations (50 in this case) to reach a stable value for P (we’ll cover P in the next step).

Set P = [Empty list of N + 1 elements]

Let P[N] = Q

For i = N, …, 1

P[i-1] = Q + ATP[i]A – (ATP[i]B)(R + BTP[i]B)-1(BTP[i]A)   

This ugly equation above is called the Discrete-time Algebraic Riccati equation. Don’t be intimidated by it. You’ll notice that you have the values for all the terms on the right side of the equation. All that is required is plugging in values and computing the answer at each iteration step i so that you can fill in the list P.

K = [Empty list of N elements]

u = [Empty list of N elements]

For i = 0, …, N – 1

      K[i] = -(R + BTP[i+1]B)-1BTP[i+1]A

K will hold the optimal feedback gain values. This is the important matrix that, when multiplied by the state error, will generate the optimal control inputs (see next step).

For i = 0, …, N – 1

      u[i] = K[i] @ x_error

We do N iterations of the for loop until we get a stable value for the optimal control input u (e.g. u = [linear velocity v, angular velocity ω]). I assume that a stable value is achieved when N=50.

u_star = u[N-1]  

Optimal control input is u_star. It is the last value we calculated in the previous step above. It is at the very end of the u list.

Return u_star  

Return the optimal control inputs. These inputs will be sent to our robot so that it can move to a new state (i.e. new x position, y position, and yaw angle γ).

Just for your future reference, you’ll commonly see the LQR algorithm written like this in the literature (and in college lectures):

lqr_algorithmJPG

LQR Python Code

Assumptions

Let’s get back to our two-wheeled mobile robot. It has three states (like in the state space modeling tutorial.) The forward kinematics can be modeled as:

9-assumptionsJPG

The state space model is as follows:

10-state-space-modelJPG

This model above shows the state of the robot at time t [x position, y position, yaw angle] given the state at time t-1, and the control inputs that were applied at time t-1.

The control inputs in this example are the linear velocity (v) — which has components in the x and y direction —  and the angular velocity around the z axis, ω (also known as the “yaw rate”).

Our robot starts out at the origin (x=0 meters, y=0 meters), and the yaw angle is 0 radians. Thus the state at t-1 is:

11-assumption1JPG
11-assumption2JPG

We let dt = 1 second. dt represents the interval between timesteps.

Now suppose we want our robot to drive to coordinate (x=2,y=2). This is our desired state (i.e. our goal destination).

12-goal-pointJPG

Also, we want the robot to face due north (i.e. 90 degrees (1.5708 radians) yaw angle) once it reaches the desired state. Therefore, our desired state is:

13-desired-stateJPG
14-desired-stateJPG

We launch our robot at time t-1, and we feed it our desired state. The robot must use the LQR algorithm to solve for the optimal control inputs that minimize both the state error and actuator effort. In other words, we want the robot to get from (x=0, y=0) to (x=2, y=2) and face due north once we get there. However, we don’t want to get there so fast so that we drain the car’s battery.

These optimal control inputs then get fed into the control input vector to predict the state at time t.

In other words, we use LQR to solve for this:

15-actuator-effortJPG

And then plug that vector into this:

16-plug-into-thisJPG

to get the state at time t.

One more thing to make this simulation more realistic. Let’s assume the maximum linear velocity of the robot car is 3.0 meters per second, and the maximum angular velocity is 90 degrees per second (1.5708 radians per second).

Let’s take a look at all this in Python code.

Code

Here is my Python implementation of the LQR. Don’t be intimidated by all the lines of code. Just go through it one step at a time. Take your time. You can copy and paste the entire thing into your favorite IDE.

Remember to play with the values along the diagonals of the Q and R matrices. You’ll see in my code that the Q matrix has been tweaked a bit.

import numpy as np

# Author: Addison Sears-Collins
# https://automaticaddison.com
# Description: Linear Quadratic Regulator example 
#	(two-wheeled differential drive robot car)

######################## DEFINE CONSTANTS #####################################
# Supress scientific notation when printing NumPy arrays
np.set_printoptions(precision=3,suppress=True)

# Optional Variables
max_linear_velocity = 3.0 # meters per second
max_angular_velocity = 1.5708 # radians per second

def getB(yaw, deltat):
	"""
	Calculates and returns the B matrix
	3x2 matix ---> number of states x number of control inputs

	Expresses how the state of the system [x,y,yaw] changes
	from t-1 to t due to the control commands (i.e. control inputs).
	
	:param yaw: The yaw angle (rotation angle around the z axis) in radians 
	:param deltat: The change in time from timestep t-1 to t in seconds
	
	:return: B matrix ---> 3x2 NumPy array
	"""
	B = np.array([	[np.cos(yaw)*deltat, 0],
									[np.sin(yaw)*deltat, 0],
									[0, deltat]])
	return B


def state_space_model(A, state_t_minus_1, B, control_input_t_minus_1):
	"""
	Calculates the state at time t given the state at time t-1 and
	the control inputs applied at time t-1
	
	:param: A	The A state transition matrix
		3x3 NumPy Array
	:param: state_t_minus_1 	The state at time t-1  
		3x1 NumPy Array given the state is [x,y,yaw angle] ---> 
		[meters, meters, radians]
	:param: B 	The B state transition matrix
		3x2 NumPy Array
	:param: control_input_t_minus_1 	Optimal control inputs at time t-1  
		2x1 NumPy Array given the control input vector is 
		[linear velocity of the car, angular velocity of the car]
		[meters per second, radians per second]
		
	:return: State estimate at time t
		3x1 NumPy Array given the state is [x,y,yaw angle] --->
		[meters, meters, radians]
	"""	
	# These next 6 lines of code which place limits on the angular and linear 
	# velocities of the robot car can be removed if you desire.
	control_input_t_minus_1[0] = np.clip(control_input_t_minus_1[0],
																			-max_linear_velocity,
																			max_linear_velocity)
	control_input_t_minus_1[1] = np.clip(control_input_t_minus_1[1],
																			-max_angular_velocity,
																			max_angular_velocity)
	state_estimate_t = (A @ state_t_minus_1) + (B @ control_input_t_minus_1) 
			
	return state_estimate_t
	
def lqr(actual_state_x, desired_state_xf, Q, R, A, B, dt):
	"""
	Discrete-time linear quadratic regulator for a nonlinear system.

	Compute the optimal control inputs given a nonlinear system, cost matrices, 
	current state, and a final state.
	
	Compute the control variables that minimize the cumulative cost.

	Solve for P using the dynamic programming method.

	:param actual_state_x: The current state of the system 
		3x1 NumPy Array given the state is [x,y,yaw angle] --->
		[meters, meters, radians]
	:param desired_state_xf: The desired state of the system
		3x1 NumPy Array given the state is [x,y,yaw angle] --->
		[meters, meters, radians]	
	:param Q: The state cost matrix
		3x3 NumPy Array
	:param R: The input cost matrix
		2x2 NumPy Array
	:param dt: The size of the timestep in seconds -> float

	:return: u_star: Optimal action u for the current state 
		2x1 NumPy Array given the control input vector is
		[linear velocity of the car, angular velocity of the car]
		[meters per second, radians per second]
    """
	# We want the system to stabilize at desired_state_xf.
	x_error = actual_state_x - desired_state_xf

	# Solutions to discrete LQR problems are obtained using the dynamic 
	# programming method.
	# The optimal solution is obtained recursively, starting at the last 
	# timestep and working backwards.
	# You can play with this number
	N = 50

	# Create a list of N + 1 elements
	P = [None] * (N + 1)
	
	Qf = Q

	# LQR via Dynamic Programming
	P[N] = Qf

	# For i = N, ..., 1
	for i in range(N, 0, -1):

		# Discrete-time Algebraic Riccati equation to calculate the optimal 
		# state cost matrix
		P[i-1] = Q + A.T @ P[i] @ A - (A.T @ P[i] @ B) @ np.linalg.pinv(
			R + B.T @ P[i] @ B) @ (B.T @ P[i] @ A)      

	# Create a list of N elements
	K = [None] * N
	u = [None] * N

	# For i = 0, ..., N - 1
	for i in range(N):

		# Calculate the optimal feedback gain K
		K[i] = -np.linalg.pinv(R + B.T @ P[i+1] @ B) @ B.T @ P[i+1] @ A

		u[i] = K[i] @ x_error

	# Optimal control input is u_star
	u_star = u[N-1]

	return u_star

def main():
	
	# Let the time interval be 1.0 seconds
	dt = 1.0
	
	# Actual state
	# Our robot starts out at the origin (x=0 meters, y=0 meters), and 
	# the yaw angle is 0 radians. 
	actual_state_x = np.array([0,0,0]) 

	# Desired state [x,y,yaw angle]
	# [meters, meters, radians]
	desired_state_xf = np.array([2.000,2.000,np.pi/2])	
	
	# A matrix
	# 3x3 matrix -> number of states x number of states matrix
	# Expresses how the state of the system [x,y,yaw] changes 
	# from t-1 to t when no control command is executed.
	# Typically a robot on wheels only drives when the wheels are told to turn.
	# For this case, A is the identity matrix.
	# Note: A is sometimes F in the literature.
	A = np.array([	[1.0,  0,   0],
									[  0,1.0,   0],
									[  0,  0, 1.0]])

	# R matrix
	# The control input cost matrix
	# Experiment with different R matrices
	# This matrix penalizes actuator effort (i.e. rotation of the 
	# motors on the wheels that drive the linear velocity and angular velocity).
	# The R matrix has the same number of rows as the number of control
	# inputs and same number of columns as the number of 
	# control inputs.
	# This matrix has positive values along the diagonal and 0s elsewhere.
	# We can target control inputs where we want low actuator effort 
	# by making the corresponding value of R large. 
	R = np.array([[0.01,   0],  # Penalty for linear velocity effort
                [  0, 0.01]]) # Penalty for angular velocity effort

	# Q matrix
	# The state cost matrix.
	# Experiment with different Q matrices.
	# Q helps us weigh the relative importance of each state in the 
	# state vector (X, Y, YAW ANGLE). 
	# Q is a square matrix that has the same number of rows as 
	# there are states.
	# Q penalizes bad performance.
	# Q has positive values along the diagonal and zeros elsewhere.
	# Q enables us to target states where we want low error by making the 
	# corresponding value of Q large.
	Q = np.array([[0.639, 0, 0],  # Penalize X position error 
								[0, 1.0, 0],  # Penalize Y position error 
								[0, 0, 1.0]]) # Penalize YAW ANGLE heading error 
				  
	# Launch the robot, and have it move to the desired goal destination
	for i in range(100):
		print(f'iteration = {i} seconds')
		print(f'Current State = {actual_state_x}')
		print(f'Desired State = {desired_state_xf}')
		
		state_error = actual_state_x - desired_state_xf
		state_error_magnitude = np.linalg.norm(state_error)		
		print(f'State Error Magnitude = {state_error_magnitude}')
		
		B = getB(actual_state_x[2], dt)
		
		# LQR returns the optimal control input
		optimal_control_input = lqr(actual_state_x, 
									desired_state_xf, 
									Q, R, A, B, dt)	
		
		print(f'Control Input = {optimal_control_input}')
									
		
		# We apply the optimal control to the robot
		# so we can get a new actual (estimated) state.
		actual_state_x = state_space_model(A, actual_state_x, B, 
										optimal_control_input)	

		# Stop as soon as we reach the goal
		# Feel free to change this threshold value.
		if state_error_magnitude < 0.01:
			print("\nGoal Has Been Reached Successfully!")
			break
			
		print()

# Entry point for the program
main()

Output

You can see in the output that it took about 3 seconds for the robot to reach the goal position and orientation. The LQR algorithm did its job well.

outputJPG

Note that you can remove the maximum linear and angular velocity constraints if you have issues in getting the correct output. You can remove these constraints by going to the state_space_model method inside the code I pasted in the previous section.

Conclusion

That’s it for the fundamentals of LQR. If you ever run across LQR in the future and need a refresher, just come back to this tutorial. 

Keep building!