The Ultimate Guide to Real-Time Lane Detection Using OpenCV

In this tutorial, we will go through the entire process, step by step, of how to detect lanes on a road in real time using the OpenCV computer vision library and Python. By the end of this tutorial, you will know how to build (from scratch) an application that can automatically detect lanes in a video stream from a front-facing camera mounted on a car. You’ll be able to generate this video below.

final_lane_detection_gif-1

Our goal is to create a program that can read a video stream and output an annotated video that shows the following:

  1. The current lane
  2. The radius of curvature of the lane
  3. The position of the vehicle relative to the middle of the lane

In a future post, we will use #3 to control the steering angle of a self-driving car in the CARLA autonomous driving simulator.

Real-World Applications

  • Self-Driving Cars

Prerequisites

Helpful Tip

As you work through this tutorial, focus on the end goals I listed in the beginning. Don’t get bogged down in trying to understand every last detail of the math and the OpenCV operations we’ll use in our code (e.g. bitwise AND, Sobel edge detection algorithm etc.). Trust the developers at Intel who manage the OpenCV computer vision package.

We are trying to build products not publish research papers. Focus on the inputs, the outputs, and what the algorithm is supposed to do at a high level.

Get a working lane detection application up and running; and, at some later date when you want to add more complexity to your project or write a research paper, you can dive deeper under the hood to understand all the details.

Trying to understand every last detail is like trying to build your own database from scratch in order to start a website or taking a course on internal combustion engines to learn how to drive a car. 

Let’s get started!

Find Some Videos and an Image

The first thing we need to do is find some videos and an image to serve as our test cases.

We want to download videos and an image that show a road with lanes from the perspective of a person driving a car. 

I found some good candidates on Pixabay.com. Type “driving” or “lanes” in the video search on that website.

Here is an example of what a frame from one of your videos should look like. This frame is 600 pixels in width and 338 pixels in height:

original_lane_detection_5

Installation and Setup

We now need to make sure we have all the software packages installed. Check to see if you have OpenCV installed on your machine. If you are using Anaconda, you can type:

conda install -c conda-forge opencv

Alternatively, you can type:

pip install opencv-python

Make sure you have NumPy installed, a scientific computing library for Python.

If you’re using Anaconda, you can type:

conda install numpy

Alternatively, you can type:

pip install numpy

Install Matplotlib, a plotting library for Python.

For Anaconda users:

conda install -c conda-forge matplotlib

Otherwise, you can install like this:

pip install matplotlib

Python Code for Detection of Lane Lines in an Image

Before, we get started, I’ll share with you the full code you need to perform lane detection in an image. The two programs below are all you need to detect lane lines in an image.

You need to make sure that you save both programs below, edge_detection.py and lane.py in the same directory as the image.

edge_detection.py will be a collection of methods that helps isolate lane line edges and lane lines. 

lane.py is where we will implement a Lane class that represents a lane on a road or highway.

Don’t be scared at how long the code appears. I always include a lot of comments in my code since I have the tendency to forget why I did what I did. I always want to be able to revisit my code at a later date and have a clear understanding what I did and why:

Here is edge_detection.py. Don’t worry, I’ll explain the code later in this post.

import cv2 # Import the OpenCV library to enable computer vision
import numpy as np # Import the NumPy scientific computing library

# Author: Addison Sears-Collins
# https://automaticaddison.com
# Description: A collection of methods to detect help with edge detection

def binary_array(array, thresh, value=0):
  """
  Return a 2D binary array (mask) in which all pixels are either 0 or 1
	
  :param array: NumPy 2D array that we want to convert to binary values
  :param thresh: Values used for thresholding (inclusive)
  :param value: Output value when between the supplied threshold
  :return: Binary 2D array...
           number of rows x number of columns = 
           number of pixels from top to bottom x number of pixels from
             left to right 
  """
  if value == 0:
    # Create an array of ones with the same shape and type as 
    # the input 2D array.
    binary = np.ones_like(array) 
		
  else:
    # Creates an array of zeros with the same shape and type as 
    # the input 2D array.
    binary = np.zeros_like(array)  
    value = 1

  # If value == 0, make all values in binary equal to 0 if the 
  # corresponding value in the input array is between the threshold 
  # (inclusive). Otherwise, the value remains as 1. Therefore, the pixels 
  # with the high Sobel derivative values (i.e. sharp pixel intensity 
  # discontinuities) will have 0 in the corresponding cell of binary.
  binary[(array >= thresh[0]) & (array <= thresh[1])] = value

  return binary

def blur_gaussian(channel, ksize=3):
  """
  Implementation for Gaussian blur to reduce noise and detail in the image
	
  :param image: 2D or 3D array to be blurred
  :param ksize: Size of the small matrix (i.e. kernel) used to blur
                i.e. number of rows and number of columns
  :return: Blurred 2D image
  """
  return cv2.GaussianBlur(channel, (ksize, ksize), 0)
		
def mag_thresh(image, sobel_kernel=3, thresh=(0, 255)):
  """
  Implementation of Sobel edge detection

  :param image: 2D or 3D array to be blurred
  :param sobel_kernel: Size of the small matrix (i.e. kernel) 
                       i.e. number of rows and columns
  :return: Binary (black and white) 2D mask image
  """
  # Get the magnitude of the edges that are vertically aligned on the image
  sobelx = np.absolute(sobel(image, orient='x', sobel_kernel=sobel_kernel))
		
  # Get the magnitude of the edges that are horizontally aligned on the image
  sobely = np.absolute(sobel(image, orient='y', sobel_kernel=sobel_kernel))

  # Find areas of the image that have the strongest pixel intensity changes
  # in both the x and y directions. These have the strongest gradients and 
  # represent the strongest edges in the image (i.e. potential lane lines)
  # mag is a 2D array .. number of rows x number of columns = number of pixels
  # from top to bottom x number of pixels from left to right
  mag = np.sqrt(sobelx ** 2 + sobely ** 2)

  # Return a 2D array that contains 0s and 1s	
  return binary_array(mag, thresh)

def sobel(img_channel, orient='x', sobel_kernel=3):
  """
  Find edges that are aligned vertically and horizontally on the image
	
  :param img_channel: Channel from an image
  :param orient: Across which axis of the image are we detecting edges?
  :sobel_kernel: No. of rows and columns of the kernel (i.e. 3x3 small matrix)
  :return: Image with Sobel edge detection applied
  """
  # cv2.Sobel(input image, data type, prder of the derivative x, order of the
  # derivative y, small matrix used to calculate the derivative)
  if orient == 'x':
    # Will detect differences in pixel intensities going from 
		# left to right on the image (i.e. edges that are vertically aligned)
    sobel = cv2.Sobel(img_channel, cv2.CV_64F, 1, 0, sobel_kernel)
  if orient == 'y':
    # Will detect differences in pixel intensities going from 
    # top to bottom on the image (i.e. edges that are horizontally aligned)
    sobel = cv2.Sobel(img_channel, cv2.CV_64F, 0, 1, sobel_kernel)

  return sobel

def threshold(channel, thresh=(128,255), thresh_type=cv2.THRESH_BINARY):
  """
  Apply a threshold to the input channel
	
  :param channel: 2D array of the channel data of an image/video frame
  :param thresh: 2D tuple of min and max threshold values
  :param thresh_type: The technique of the threshold to apply
  :return: Two outputs are returned:
             ret: Threshold that was used
   	         thresholded_image: 2D thresholded data.
  """
  # If pixel intensity is greater than thresh[0], make that value
  # white (255), else set it to black (0)
  return cv2.threshold(channel, thresh[0], thresh[1], thresh_type)

Here is lane.py.

import cv2 # Import the OpenCV library to enable computer vision
import numpy as np # Import the NumPy scientific computing library
import edge_detection as edge # Handles the detection of lane lines
import matplotlib.pyplot as plt # Used for plotting and error checking

# Author: Addison Sears-Collins
# https://automaticaddison.com
# Description: Implementation of the Lane class 

filename = 'original_lane_detection_5.jpg'

class Lane:
  """
  Represents a lane on a road.
  """
  def __init__(self, orig_frame):
    """
	  Default constructor
		
    :param orig_frame: Original camera image (i.e. frame)
    """
    self.orig_frame = orig_frame

    # This will hold an image with the lane lines		
    self.lane_line_markings = None

    # This will hold the image after perspective transformation
    self.warped_frame = None
    self.transformation_matrix = None
    self.inv_transformation_matrix = None

    # (Width, Height) of the original video frame (or image)
    self.orig_image_size = self.orig_frame.shape[::-1][1:]

    width = self.orig_image_size[0]
    height = self.orig_image_size[1]
    self.width = width
    self.height = height
	
    # Four corners of the trapezoid-shaped region of interest
    # You need to find these corners manually.
    self.roi_points = np.float32([
      (274,184), # Top-left corner
      (0, 337), # Bottom-left corner			
      (575,337), # Bottom-right corner
      (371,184) # Top-right corner
    ])
		
    # The desired corner locations  of the region of interest
    # after we perform perspective transformation.
    # Assume image width of 600, padding == 150.
    self.padding = int(0.25 * width) # padding from side of the image in pixels
    self.desired_roi_points = np.float32([
      [self.padding, 0], # Top-left corner
      [self.padding, self.orig_image_size[1]], # Bottom-left corner			
      [self.orig_image_size[
        0]-self.padding, self.orig_image_size[1]], # Bottom-right corner
      [self.orig_image_size[0]-self.padding, 0] # Top-right corner
    ]) 
		
    # Histogram that shows the white pixel peaks for lane line detection
    self.histogram = None
		
    # Sliding window parameters
    self.no_of_windows = 10
    self.margin = int((1/12) * width)  # Window width is +/- margin
    self.minpix = int((1/24) * width)  # Min no. of pixels to recenter window
		
    # Best fit polynomial lines for left line and right line of the lane
    self.left_fit = None
    self.right_fit = None
    self.left_lane_inds = None
    self.right_lane_inds = None
    self.ploty = None
    self.left_fitx = None
    self.right_fitx = None
    self.leftx = None
    self.rightx = None
    self.lefty = None
    self.righty = None
		
    # Pixel parameters for x and y dimensions
    self.YM_PER_PIX = 10.0 / 1000 # meters per pixel in y dimension
    self.XM_PER_PIX = 3.7 / 781 # meters per pixel in x dimension
		
    # Radii of curvature and offset
    self.left_curvem = None
    self.right_curvem = None
    self.center_offset = None

  def calculate_car_position(self, print_to_terminal=False):
    """
    Calculate the position of the car relative to the center
		
    :param: print_to_terminal Display data to console if True		
    :return: Offset from the center of the lane
    """
    # Assume the camera is centered in the image.
    # Get position of car in centimeters
    car_location = self.orig_frame.shape[1] / 2

    # Fine the x coordinate of the lane line bottom
    height = self.orig_frame.shape[0]
    bottom_left = self.left_fit[0]*height**2 + self.left_fit[
      1]*height + self.left_fit[2]
    bottom_right = self.right_fit[0]*height**2 + self.right_fit[
      1]*height + self.right_fit[2]

    center_lane = (bottom_right - bottom_left)/2 + bottom_left 
    center_offset = (np.abs(car_location) - np.abs(
      center_lane)) * self.XM_PER_PIX * 100

    if print_to_terminal == True:
      print(str(center_offset) + 'cm')
			
    self.center_offset = center_offset
      
    return center_offset

  def calculate_curvature(self, print_to_terminal=False):
    """
    Calculate the road curvature in meters.

    :param: print_to_terminal Display data to console if True
    :return: Radii of curvature
    """
    # Set the y-value where we want to calculate the road curvature.
    # Select the maximum y-value, which is the bottom of the frame.
    y_eval = np.max(self.ploty)    

    # Fit polynomial curves to the real world environment
    left_fit_cr = np.polyfit(self.lefty * self.YM_PER_PIX, self.leftx * (
      self.XM_PER_PIX), 2)
    right_fit_cr = np.polyfit(self.righty * self.YM_PER_PIX, self.rightx * (
      self.XM_PER_PIX), 2)
			
    # Calculate the radii of curvature
    left_curvem = ((1 + (2*left_fit_cr[0]*y_eval*self.YM_PER_PIX + left_fit_cr[
                    1])**2)**1.5) / np.absolute(2*left_fit_cr[0])
    right_curvem = ((1 + (2*right_fit_cr[
                    0]*y_eval*self.YM_PER_PIX + right_fit_cr[
                    1])**2)**1.5) / np.absolute(2*right_fit_cr[0])
	
    # Display on terminal window
    if print_to_terminal == True:
      print(left_curvem, 'm', right_curvem, 'm')
			
    self.left_curvem = left_curvem
    self.right_curvem = right_curvem

    return left_curvem, right_curvem		
		
  def calculate_histogram(self,frame=None,plot=True):
    """
    Calculate the image histogram to find peaks in white pixel count
		
    :param frame: The warped image
    :param plot: Create a plot if True
    """
    if frame is None:
      frame = self.warped_frame
			
    # Generate the histogram
    self.histogram = np.sum(frame[int(
		      frame.shape[0]/2):,:], axis=0)

    if plot == True:
		
      # Draw both the image and the histogram
      figure, (ax1, ax2) = plt.subplots(2,1) # 2 row, 1 columns
      figure.set_size_inches(10, 5)
      ax1.imshow(frame, cmap='gray')
      ax1.set_title("Warped Binary Frame")
      ax2.plot(self.histogram)
      ax2.set_title("Histogram Peaks")
      plt.show()
			
    return self.histogram

  def display_curvature_offset(self, frame=None, plot=False):
    """
    Display curvature and offset statistics on the image
		
    :param: plot Display the plot if True
    :return: Image with lane lines and curvature
    """	
    image_copy = None
    if frame is None:
      image_copy = self.orig_frame.copy()
    else:
      image_copy = frame

    cv2.putText(image_copy,'Curve Radius: '+str((
      self.left_curvem+self.right_curvem)/2)[:7]+' m', (int((
      5/600)*self.width), int((
      20/338)*self.height)), cv2.FONT_HERSHEY_SIMPLEX, (float((
      0.5/600)*self.width)),(
      255,255,255),2,cv2.LINE_AA)
    cv2.putText(image_copy,'Center Offset: '+str(
      self.center_offset)[:7]+' cm', (int((
      5/600)*self.width), int((
      40/338)*self.height)), cv2.FONT_HERSHEY_SIMPLEX, (float((
      0.5/600)*self.width)),(
      255,255,255),2,cv2.LINE_AA)
			
    if plot==True:       
      cv2.imshow("Image with Curvature and Offset", image_copy)

    return image_copy
    
  def get_lane_line_previous_window(self, left_fit, right_fit, plot=False):
    """
    Use the lane line from the previous sliding window to get the parameters
    for the polynomial line for filling in the lane line
    :param: left_fit Polynomial function of the left lane line
    :param: right_fit Polynomial function of the right lane line
    :param: plot To display an image or not
    """
    # margin is a sliding window parameter
    margin = self.margin

    # Find the x and y coordinates of all the nonzero 
    # (i.e. white) pixels in the frame.			
    nonzero = self.warped_frame.nonzero()  
    nonzeroy = np.array(nonzero[0])
    nonzerox = np.array(nonzero[1])
		
    # Store left and right lane pixel indices
    left_lane_inds = ((nonzerox > (left_fit[0]*(
      nonzeroy**2) + left_fit[1]*nonzeroy + left_fit[2] - margin)) & (
      nonzerox < (left_fit[0]*(
      nonzeroy**2) + left_fit[1]*nonzeroy + left_fit[2] + margin))) 
    right_lane_inds = ((nonzerox > (right_fit[0]*(
      nonzeroy**2) + right_fit[1]*nonzeroy + right_fit[2] - margin)) & (
      nonzerox < (right_fit[0]*(
      nonzeroy**2) + right_fit[1]*nonzeroy + right_fit[2] + margin))) 			
    self.left_lane_inds = left_lane_inds
    self.right_lane_inds = right_lane_inds

    # Get the left and right lane line pixel locations	
    leftx = nonzerox[left_lane_inds]
    lefty = nonzeroy[left_lane_inds] 
    rightx = nonzerox[right_lane_inds]
    righty = nonzeroy[right_lane_inds]	

    self.leftx = leftx
    self.rightx = rightx
    self.lefty = lefty
    self.righty = righty		
	
    # Fit a second order polynomial curve to each lane line
    left_fit = np.polyfit(lefty, leftx, 2)
    right_fit = np.polyfit(righty, rightx, 2)
    self.left_fit = left_fit
    self.right_fit = right_fit
		
    # Create the x and y values to plot on the image
    ploty = np.linspace(
      0, self.warped_frame.shape[0]-1, self.warped_frame.shape[0]) 
    left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
    right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]
    self.ploty = ploty
    self.left_fitx = left_fitx
    self.right_fitx = right_fitx
		
    if plot==True:
		
      # Generate images to draw on
      out_img = np.dstack((self.warped_frame, self.warped_frame, (
                           self.warped_frame)))*255
      window_img = np.zeros_like(out_img)
			
      # Add color to the left and right line pixels
      out_img[nonzeroy[left_lane_inds], nonzerox[left_lane_inds]] = [255, 0, 0]
      out_img[nonzeroy[right_lane_inds], nonzerox[right_lane_inds]] = [
                                                                     0, 0, 255]
      # Create a polygon to show the search window area, and recast 
      # the x and y points into a usable format for cv2.fillPoly()
      margin = self.margin
      left_line_window1 = np.array([np.transpose(np.vstack([
                                    left_fitx-margin, ploty]))])
      left_line_window2 = np.array([np.flipud(np.transpose(np.vstack([
                                    left_fitx+margin, ploty])))])
      left_line_pts = np.hstack((left_line_window1, left_line_window2))
      right_line_window1 = np.array([np.transpose(np.vstack([
                                     right_fitx-margin, ploty]))])
      right_line_window2 = np.array([np.flipud(np.transpose(np.vstack([
                                     right_fitx+margin, ploty])))])
      right_line_pts = np.hstack((right_line_window1, right_line_window2))
			
      # Draw the lane onto the warped blank image
      cv2.fillPoly(window_img, np.int_([left_line_pts]), (0,255, 0))
      cv2.fillPoly(window_img, np.int_([right_line_pts]), (0,255, 0))
      result = cv2.addWeighted(out_img, 1, window_img, 0.3, 0)
      
      # Plot the figures 
      figure, (ax1, ax2, ax3) = plt.subplots(3,1) # 3 rows, 1 column
      figure.set_size_inches(10, 10)
      figure.tight_layout(pad=3.0)
      ax1.imshow(cv2.cvtColor(self.orig_frame, cv2.COLOR_BGR2RGB))
      ax2.imshow(self.warped_frame, cmap='gray')
      ax3.imshow(result)
      ax3.plot(left_fitx, ploty, color='yellow')
      ax3.plot(right_fitx, ploty, color='yellow')
      ax1.set_title("Original Frame")  
      ax2.set_title("Warped Frame")
      ax3.set_title("Warped Frame With Search Window")
      plt.show()
			
  def get_lane_line_indices_sliding_windows(self, plot=False):
    """
    Get the indices of the lane line pixels using the 
    sliding windows technique.
		
    :param: plot Show plot or not
    :return: Best fit lines for the left and right lines of the current lane 
    """
    # Sliding window width is +/- margin
    margin = self.margin

    frame_sliding_window = self.warped_frame.copy()

    # Set the height of the sliding windows
    window_height = np.int(self.warped_frame.shape[0]/self.no_of_windows)		

    # Find the x and y coordinates of all the nonzero 
    # (i.e. white) pixels in the frame.	
    nonzero = self.warped_frame.nonzero()
    nonzeroy = np.array(nonzero[0])
    nonzerox = np.array(nonzero[1])	
		
    # Store the pixel indices for the left and right lane lines
    left_lane_inds = []
    right_lane_inds = []
		
    # Current positions for pixel indices for each window,
    # which we will continue to update
    leftx_base, rightx_base = self.histogram_peak()
    leftx_current = leftx_base
    rightx_current = rightx_base

    # Go through one window at a time
    no_of_windows = self.no_of_windows
		
    for window in range(no_of_windows):
      
      # Identify window boundaries in x and y (and right and left)
      win_y_low = self.warped_frame.shape[0] - (window + 1) * window_height
      win_y_high = self.warped_frame.shape[0] - window * window_height
      win_xleft_low = leftx_current - margin
      win_xleft_high = leftx_current + margin
      win_xright_low = rightx_current - margin
      win_xright_high = rightx_current + margin
      cv2.rectangle(frame_sliding_window,(win_xleft_low,win_y_low),(
        win_xleft_high,win_y_high), (255,255,255), 2)
      cv2.rectangle(frame_sliding_window,(win_xright_low,win_y_low),(
        win_xright_high,win_y_high), (255,255,255), 2)

      # Identify the nonzero pixels in x and y within the window
      good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                          (nonzerox >= win_xleft_low) & (
                           nonzerox < win_xleft_high)).nonzero()[0]
      good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                           (nonzerox >= win_xright_low) & (
                            nonzerox < win_xright_high)).nonzero()[0]
														
      # Append these indices to the lists
      left_lane_inds.append(good_left_inds)
      right_lane_inds.append(good_right_inds)
        
      # If you found > minpix pixels, recenter next window on mean position
      minpix = self.minpix
      if len(good_left_inds) > minpix:
        leftx_current = np.int(np.mean(nonzerox[good_left_inds]))
      if len(good_right_inds) > minpix:        
        rightx_current = np.int(np.mean(nonzerox[good_right_inds]))
					
    # Concatenate the arrays of indices
    left_lane_inds = np.concatenate(left_lane_inds)
    right_lane_inds = np.concatenate(right_lane_inds)

    # Extract the pixel coordinates for the left and right lane lines
    leftx = nonzerox[left_lane_inds]
    lefty = nonzeroy[left_lane_inds] 
    rightx = nonzerox[right_lane_inds] 
    righty = nonzeroy[right_lane_inds]

    # Fit a second order polynomial curve to the pixel coordinates for
    # the left and right lane lines
    left_fit = np.polyfit(lefty, leftx, 2)
    right_fit = np.polyfit(righty, rightx, 2) 
		
    self.left_fit = left_fit
    self.right_fit = right_fit

    if plot==True:
		
      # Create the x and y values to plot on the image  
      ploty = np.linspace(
        0, frame_sliding_window.shape[0]-1, frame_sliding_window.shape[0])
      left_fitx = left_fit[0]*ploty**2 + left_fit[1]*ploty + left_fit[2]
      right_fitx = right_fit[0]*ploty**2 + right_fit[1]*ploty + right_fit[2]

      # Generate an image to visualize the result
      out_img = np.dstack((
        frame_sliding_window, frame_sliding_window, (
        frame_sliding_window))) * 255
			
      # Add color to the left line pixels and right line pixels
      out_img[nonzeroy[left_lane_inds], nonzerox[left_lane_inds]] = [255, 0, 0]
      out_img[nonzeroy[right_lane_inds], nonzerox[right_lane_inds]] = [
        0, 0, 255]
				
      # Plot the figure with the sliding windows
      figure, (ax1, ax2, ax3) = plt.subplots(3,1) # 3 rows, 1 column
      figure.set_size_inches(10, 10)
      figure.tight_layout(pad=3.0)
      ax1.imshow(cv2.cvtColor(self.orig_frame, cv2.COLOR_BGR2RGB))
      ax2.imshow(frame_sliding_window, cmap='gray')
      ax3.imshow(out_img)
      ax3.plot(left_fitx, ploty, color='yellow')
      ax3.plot(right_fitx, ploty, color='yellow')
      ax1.set_title("Original Frame")  
      ax2.set_title("Warped Frame with Sliding Windows")
      ax3.set_title("Detected Lane Lines with Sliding Windows")
      plt.show()  		
			
    return self.left_fit, self.right_fit

  def get_line_markings(self, frame=None):
    """
    Isolates lane lines.
  
	  :param frame: The camera frame that contains the lanes we want to detect
    :return: Binary (i.e. black and white) image containing the lane lines.
    """
    if frame is None:
      frame = self.orig_frame
			
    # Convert the video frame from BGR (blue, green, red) 
    # color space to HLS (hue, saturation, lightness).
    hls = cv2.cvtColor(frame, cv2.COLOR_BGR2HLS)

    ################### Isolate possible lane line edges ######################
		
    # Perform Sobel edge detection on the L (lightness) channel of 
    # the image to detect sharp discontinuities in the pixel intensities 
    # along the x and y axis of the video frame.		     
    # sxbinary is a matrix full of 0s (black) and 255 (white) intensity values
    # Relatively light pixels get made white. Dark pixels get made black.
    _, sxbinary = edge.threshold(hls[:, :, 1], thresh=(120, 255))
    sxbinary = edge.blur_gaussian(sxbinary, ksize=3) # Reduce noise
		
    # 1s will be in the cells with the highest Sobel derivative values
    # (i.e. strongest lane line edges)
    sxbinary = edge.mag_thresh(sxbinary, sobel_kernel=3, thresh=(110, 255))

    ######################## Isolate possible lane lines ######################
  
    # Perform binary thresholding on the S (saturation) channel 
    # of the video frame. A high saturation value means the hue color is pure.
    # We expect lane lines to be nice, pure colors (i.e. solid white, yellow)
    # and have high saturation channel values.
    # s_binary is matrix full of 0s (black) and 255 (white) intensity values
    # White in the regions with the purest hue colors (e.g. >80...play with
    # this value for best results).
    s_channel = hls[:, :, 2] # use only the saturation channel data
    _, s_binary = edge.threshold(s_channel, (80, 255))
	
    # Perform binary thresholding on the R (red) channel of the 
		# original BGR video frame. 
    # r_thresh is a matrix full of 0s (black) and 255 (white) intensity values
    # White in the regions with the richest red channel values (e.g. >120).
    # Remember, pure white is bgr(255, 255, 255).
    # Pure yellow is bgr(0, 255, 255). Both have high red channel values.
    _, r_thresh = edge.threshold(frame[:, :, 2], thresh=(120, 255))

    # Lane lines should be pure in color and have high red channel values 
    # Bitwise AND operation to reduce noise and black-out any pixels that
    # don't appear to be nice, pure, solid colors (like white or yellow lane 
    # lines.)		
    rs_binary = cv2.bitwise_and(s_binary, r_thresh)

    ### Combine the possible lane lines with the possible lane line edges ##### 
    # If you show rs_binary visually, you'll see that it is not that different 
    # from this return value. The edges of lane lines are thin lines of pixels.
    self.lane_line_markings = cv2.bitwise_or(rs_binary, sxbinary.astype(
                              np.uint8))	
    return self.lane_line_markings
		
  def histogram_peak(self):
    """
    Get the left and right peak of the histogram

    Return the x coordinate of the left histogram peak and the right histogram
    peak.
    """
    midpoint = np.int(self.histogram.shape[0]/2)
    leftx_base = np.argmax(self.histogram[:midpoint])
    rightx_base = np.argmax(self.histogram[midpoint:]) + midpoint

    # (x coordinate of left peak, x coordinate of right peak)
    return leftx_base, rightx_base
		
  def overlay_lane_lines(self, plot=False):
    """
    Overlay lane lines on the original frame
    :param: Plot the lane lines if True
    :return: Lane with overlay
    """
    # Generate an image to draw the lane lines on 
    warp_zero = np.zeros_like(self.warped_frame).astype(np.uint8)
    color_warp = np.dstack((warp_zero, warp_zero, warp_zero))		
		
    # Recast the x and y points into usable format for cv2.fillPoly()
    pts_left = np.array([np.transpose(np.vstack([
                         self.left_fitx, self.ploty]))])
    pts_right = np.array([np.flipud(np.transpose(np.vstack([
                          self.right_fitx, self.ploty])))])
    pts = np.hstack((pts_left, pts_right))
		
    # Draw lane on the warped blank image
    cv2.fillPoly(color_warp, np.int_([pts]), (0,255, 0))

    # Warp the blank back to original image space using inverse perspective 
    # matrix (Minv)
    newwarp = cv2.warpPerspective(color_warp, self.inv_transformation_matrix, (
                                  self.orig_frame.shape[
                                  1], self.orig_frame.shape[0]))
    
    # Combine the result with the original image
    result = cv2.addWeighted(self.orig_frame, 1, newwarp, 0.3, 0)
		
    if plot==True:
     
      # Plot the figures 
      figure, (ax1, ax2) = plt.subplots(2,1) # 2 rows, 1 column
      figure.set_size_inches(10, 10)
      figure.tight_layout(pad=3.0)
      ax1.imshow(cv2.cvtColor(self.orig_frame, cv2.COLOR_BGR2RGB))
      ax2.imshow(cv2.cvtColor(result, cv2.COLOR_BGR2RGB))
      ax1.set_title("Original Frame")  
      ax2.set_title("Original Frame With Lane Overlay")
      plt.show()   

    return result			
	
  def perspective_transform(self, frame=None, plot=False):
    """
    Perform the perspective transform.
    :param: frame Current frame
    :param: plot Plot the warped image if True
    :return: Bird's eye view of the current lane
    """
    if frame is None:
      frame = self.lane_line_markings
			
    # Calculate the transformation matrix
    self.transformation_matrix = cv2.getPerspectiveTransform(
      self.roi_points, self.desired_roi_points)

    # Calculate the inverse transformation matrix			
    self.inv_transformation_matrix = cv2.getPerspectiveTransform(
      self.desired_roi_points, self.roi_points)

    # Perform the transform using the transformation matrix
    self.warped_frame = cv2.warpPerspective(
      frame, self.transformation_matrix, self.orig_image_size, flags=(
     cv2.INTER_LINEAR))	

    # Convert image to binary
    (thresh, binary_warped) = cv2.threshold(
      self.warped_frame, 127, 255, cv2.THRESH_BINARY)			
    self.warped_frame = binary_warped

    # Display the perspective transformed (i.e. warped) frame
    if plot == True:
      warped_copy = self.warped_frame.copy()
      warped_plot = cv2.polylines(warped_copy, np.int32([
                    self.desired_roi_points]), True, (147,20,255), 3)

      # Display the image
      while(1):
        cv2.imshow('Warped Image', warped_plot)
			
        # Press any key to stop
        if cv2.waitKey(0):
          break

      cv2.destroyAllWindows()	
			
    return self.warped_frame		
	
  def plot_roi(self, frame=None, plot=False):
    """
    Plot the region of interest on an image.
    :param: frame The current image frame
    :param: plot Plot the roi image if True
    """
    if plot == False:
      return
			
    if frame is None:
      frame = self.orig_frame.copy()

    # Overlay trapezoid on the frame
    this_image = cv2.polylines(frame, np.int32([
      self.roi_points]), True, (147,20,255), 3)

    # Display the image
    while(1):
      cv2.imshow('ROI Image', this_image)
			
      # Press any key to stop
      if cv2.waitKey(0):
        break

    cv2.destroyAllWindows()
	
def main():
	
  # Load a frame (or image)
  original_frame = cv2.imread(filename)

  # Create a Lane object
  lane_obj = Lane(orig_frame=original_frame)

  # Perform thresholding to isolate lane lines
  lane_line_markings = lane_obj.get_line_markings()

  # Plot the region of interest on the image
  lane_obj.plot_roi(plot=False)

  # Perform the perspective transform to generate a bird's eye view
  # If Plot == True, show image with new region of interest
  warped_frame = lane_obj.perspective_transform(plot=False)

  # Generate the image histogram to serve as a starting point
  # for finding lane line pixels
  histogram = lane_obj.calculate_histogram(plot=False)	
	
  # Find lane line pixels using the sliding window method 
  left_fit, right_fit = lane_obj.get_lane_line_indices_sliding_windows(
    plot=False)

  # Fill in the lane line
  lane_obj.get_lane_line_previous_window(left_fit, right_fit, plot=False)
	
  # Overlay lines on the original frame
  frame_with_lane_lines = lane_obj.overlay_lane_lines(plot=False)

  # Calculate lane line curvature (left and right lane lines)
  lane_obj.calculate_curvature(print_to_terminal=False)

  # Calculate center offset  																
  lane_obj.calculate_car_position(print_to_terminal=False)
	
  # Display curvature and center offset on image
  frame_with_lane_lines2 = lane_obj.display_curvature_offset(
    frame=frame_with_lane_lines, plot=True)
	
  # Create the output file name by removing the '.jpg' part
  size = len(filename)
  new_filename = filename[:size - 4]
  new_filename = new_filename + '_thresholded.jpg'		
    
  # Save the new image in the working directory
  #cv2.imwrite(new_filename, lane_line_markings)

  # Display the image 
  #cv2.imshow("Image", lane_line_markings) 
	
  # Display the window until any key is pressed
  cv2.waitKey(0) 
	
  # Close all windows
  cv2.destroyAllWindows() 
	
main()

Now that you have all the code to detect lane lines in an image, let’s explain what each piece of the code does.

Isolate Pixels That Could Represent Lane Lines

The first part of the lane detection process is to apply thresholding (I’ll explain what this term means in a second) to each video frame so that we can eliminate things that make it difficult to detect lane lines. By applying thresholding, we can isolate the pixels that represent lane lines.

Glare from the sun, shadows, car headlights, and road surface changes can all make it difficult to find lanes in a video frame or image.

What does thresholding mean? Basic thresholding involves replacing each pixel in a video frame with a black pixel if the intensity of that pixel is less than some constant, or a white pixel if the intensity of that pixel is greater than some constant. The end result is a binary (black and white) image of the road. A binary image is one in which each pixel is either 1 (white) or 0 (black).

Before thresholding:

pavlovsk_railing_of_bridge_yellow_palace_winter
Image Source: Wikipedia

After thresholding:

pavlovsk_railing_of_bridge_yellow_palace_winter_bw_threshold
Image Source: Wikipedia

Thresholding Steps

1. Convert the video frame from BGR (blue, green, red) color space to HLS (hue, saturation, lightness).

There are a lot of ways to represent colors in an image. If you’ve ever used a program like Microsoft Paint or Adobe Photoshop, you know that one way to represent a color is by using the RGB color space (in OpenCV it is BGR instead of RGB), where every color is a mixture of three colors, red, green, and blue. You can play around with the RGB color space here at this website.

The HLS color space is better than the BGR color space for detecting image issues due to lighting, such as shadows, glare from the sun, headlights, etc. We want to eliminate all these things to make it easier to detect lane lines. For this reason, we use the HLS color space, which divides all colors into hue, saturation, and lightness values.

If you want to play around with the HLS color space, there are a lot of HLS color picker websites to choose from if you do a Google search.

2. Perform Sobel edge detection on the L (lightness) channel of the image to detect sharp discontinuities in the pixel intensities along the x and y axis of the video frame. 

Sharp changes in intensity from one pixel to a neighboring pixel means that an edge is likely present. We want to detect the strongest edges in the image so that we can isolate potential lane line edges.

3. Perform binary thresholding on the S (saturation) channel of the video frame. 

Doing this helps to eliminate dull road colors. 

A high saturation value means the hue color is pure. We expect lane lines to be nice, pure colors, such as solid white and solid yellow. Both solid white and solid yellow, have high saturation channel values. 

Binary thresholding generates an image that is full of 0s (black) and 255 (white) intensity values. Pixels with high saturation values (e.g. > 80 on a scale from 0 to 255) will be set to white, while everything else will be set to black.

Feel free to play around with that threshold value. I set it to 80, but you can set it to another number, and see if you get better results.

4. Perform binary thresholding on the R (red) channel of the original BGR video frame. 

This step helps extract the yellow and white color values, which are the typical colors of lane lines. 

Remember, pure white is bgr(255, 255, 255). Pure yellow is bgr(0, 255, 255). Both have high red channel values.

To generate our binary image at this stage, pixels that have rich red channel values (e.g. > 120 on a scale from 0 to 255) will be set to white. All other pixels will be set to black.

5. Perform the bitwise AND operation to reduce noise in the image caused by shadows and variations in the road color.

Lane lines should be pure in color and have high red channel values. The bitwise AND operation reduces noise and blacks-out any pixels that don’t appear to be nice, pure, solid colors (like white or yellow lane lines.)

The get_line_markings(self, frame=None) method in lane.py performs all the steps I have mentioned above.

If you uncomment this line below, you will see the output:

cv2.imshow("Image", lane_line_markings)

To see the output, you run this command from within the directory with your test image and the lane.py and edge_detection.py program.

python lane.py
Image_screenshot_03.01.2021

For best results, play around with this line on the lane.py program. Move the 80 value up or down, and see what results you get.

_, s_binary = edge.threshold(s_channel, (80, 255))

Now that we know how to isolate lane lines in an image, let’s continue on to the next step of the lane detection process.

Apply Perspective Transformation to Get a Bird’s Eye View

We now know how to isolate lane lines in an image, but we still have some problems. Remember that one of the goals of this project was to calculate the radius of curvature of the road lane. Calculating the radius of curvature will enable us to know which direction the road is turning. But we can’t do this yet at this stage due to the perspective of the camera. Let me explain.

Why We Need to Do Perspective Transformation

Imagine you’re a bird. You’re flying high above the road lanes below.  From a birds-eye view, the lines on either side of the lane look like they are parallel.

However, from the perspective of the camera mounted on a car below, the lane lines make a trapezoid-like shape. We can’t properly calculate the radius of curvature of the lane because, from the camera’s perspective, the lane width appears to decrease the farther away you get from the car. 

In fact, way out on the horizon, the lane lines appear to converge to a point (known in computer vision jargon as vanishing point). You can see this effect in the image below:

road_endless_straight_vanishing

The camera’s perspective is therefore not an accurate representation of what is going on in the real world. We need to fix this so that we can calculate the curvature of the land and the road (which will later help us when we want to steer the car appropriately).

How Perspective Transformation Works

perspective_transform

Fortunately, OpenCV has methods that help us perform perspective transformation. These methods warp the camera’s perspective into a birds-eye view (i.e. aerial view) perspective.

For the first step of perspective transformation, we need to identify a region of interest (ROI). This step helps remove parts of the image we’re not interested in. We are only interested in the lane segment that is immediately in front of the car.

You can run lane.py from the previous section. With the image displayed, hover your cursor over the image and find the four key corners of the trapezoid. 

Write these corners down. These will be the roi_points (roi = region of interest) for the lane. In the code (which I’ll show below), these points appear in the __init__ constructor of the Lane class. They are stored in the self.roi_points variable.

In the following line of code in lane.py, change the parameter value from False to True so that the region of interest image will appear.

lane_obj.plot_roi(plot=True)

Run lane.py

python lane.py

Here is an example ROI output:

ROI-Image_screenshot_03.01.2021

You can see that the ROI is the shape of a trapezoid, with four distinct corners.

trapezoidsvg
Image Source: Wikipedia

Now that we have the region of interest, we use OpenCV’s getPerspectiveTransform and warpPerspective methods to transform the trapezoid-like perspective into a rectangle-like perspective. 

Change the parameter value in this line of code in lane.py from False to True.

warped_frame = lane_obj.perspective_transform(plot=True)

Here is an example of an image after this process. You can see how the perspective is now from a birds-eye view. The ROI lines are now parallel to the sides of the image, making it easier to calculate the curvature of the road and the lane.

warped-image-perspective-transformation

Identify Lane Line Pixels

We now need to identify the pixels on the warped image that make up lane lines. Looking at the warped image, we can see that white pixels represent pieces of the lane lines.

We start lane line pixel detection by generating a histogram to locate areas of the image that have high concentrations of white pixels. 

Ideally, when we draw the histogram, we will have two peaks. There will be a left peak and a right peak, corresponding to the left lane line and the right lane line, respectively.

In lane.py, make sure to change the parameter value in this line of code (inside the main() method) from False to True so that the histogram will display.

histogram = lane_obj.calculate_histogram(plot=True)
histogram-lane-detection

Set Sliding Windows for White Pixel Detection

The next step is to use a sliding window technique where we start at the bottom of the image and scan all the way to the top of the image. Each time we search within a sliding window, we add potential lane line pixels to a list. If we have enough lane line pixels in a window, the mean position of these pixels becomes the center of the next sliding window.

Once we have identified the pixels that correspond to the left and right lane lines, we draw a polynomial best-fit line through the pixels. This line represents our best estimate of the lane lines.

In this line of code, change the value from False to True.

left_fit, right_fit = lane_obj.get_lane_line_indices_sliding_windows(
     plot=True)

Here is the output:

sliding-window-pixels

Fill in the Lane Line

Now let’s fill in the lane line. Change the parameter value on this line from False to True.

lane_obj.get_lane_line_previous_window(left_fit, right_fit, plot=True)

Here is the output:

warped-frame-search-window

Overlay Lane Lines on Original Image

Now that we’ve identified the lane lines, we need to overlay that information on the original image.

Change the parameter on this line form False to True and run lane.py.

frame_with_lane_lines = lane_obj.overlay_lane_lines(plot=True)
overlay-lane-line-original-image

Calculate Lane Line Curvature

radii-of-curvature-1
Image Source: Wikipedia

Now, we need to calculate the curvature of the lane line. Change the parameter value on this line from False to True.

lane_obj.calculate_curvature(print_to_terminal=True)

Here is the output. You can see the radius of curvature from the left and right lane lines:

radii-of-curvature

Calculate the Center Offset

Now we need to calculate how far the center of the car is from the middle of the lane (i.e. the “center offset).

On the following line, change the parameter value from False to True.

lane_obj.calculate_car_position(print_to_terminal=True)

Run the program.

python lane.py

Here is the output. You can see the center offset in centimeters:

center-offset

Display Final Image

Now we will display the final image with the curvature and offset annotations as well as the highlighted lane.

In lane.py, change this line of code from False to True:

frame_with_lane_lines2 = lane_obj.display_curvature_offset(
     frame=frame_with_lane_lines, plot=True)

Run lane.py.

python lane.py

Here is the output:

Image-with-Curvature-and-Offset_screenshot_03.01.2021

You’ll notice that the curve radius is the average of the radius of curvature for the left and right lane lines.

Detect Lane Lines in a Video

Now that we know how to detect lane lines in an image, let’s see how to detect lane lines in a video stream.

All we need to do is make some minor changes to the main method in lane.py to accommodate video frames as opposed to images.

Here is the code for lane.py. It takes a video in mp4 format as input and outputs an annotated image with the lanes. I recommend you convert that pdf file into a text file and then save it as a python .py file so that you can run the code. There are a bunch of online tools that can do this, so do a Google search in order to find one.

Video Output

Here is the video output that results from running the code above on an mp4 video file.

Troubleshooting

For best results, play around with this line on the lane.py program. Move the 80 value up or down, and see what results you get.

_, s_binary = edge.threshold(s_channel, (80, 255))

If you run the code on different videos, you may see a warning that says “RankWarning: Polyfit may be poorly conditioned”. If you see this warning, try playing around with the dimensions of the region of interest as well as the thresholds.

You can also play with the length of the moving averages. I used a 10-frame moving average, but you can try another value like 5 or 25:

if len(prev_left_fit2) > 10:

Using an exponential moving average instead of a simple moving average might yield better results as well.

That’s it for lane line detection. Keep building!

How to Publish Goal Coordinates in ROS2

Last week, I built a mobile robot in Gazebo that uses ROS2 for message passing. I wanted the robot to go to predefined goal coordinates inside a maze. In order to do that, I needed to create a Publisher node that published the points to a topic.

Here is the ROS2 node code (in Python) that publishes a list of target coordinates (i.e. waypoints) for the mobile robot to go to. Feel free to use this code for your ROS2 projects:

''' ####################
    Publish (x,y) coordinate goals for a differential drive robot in a 
    Gazebo maze.
    ==================================
    Author: Addison Sears-Collins
    Date: November 21, 2020
    #################### '''
 
import rclpy # Import the ROS client library for Python
from rclpy.node import Node # Enables the use of rclpy's Node class
from std_msgs.msg import Float64MultiArray # Enable use of std_msgs/Float64MultiArray message
 
class GoalPublisher(Node):
  """
  Create a GoalPublisher class, which is a subclass of the Node class.
  The class publishes the goal positions (x,y) for a mobile robot in a Gazebo maze world.
  """
  
  def __init__(self):
    """
    Class constructor to set up the node
    """
   
    # Initiate the Node class's constructor and give it a name
    super().__init__('goal_publisher')
     
    # Create publisher(s)      
    self.publisher_goal_x_values = self.create_publisher(Float64MultiArray, '/goal_x_values', 10)
    self.publisher_goal_y_values = self.create_publisher(Float64MultiArray, '/goal_y_values', 10)
     
    # Call the callback functions every 0.5 seconds
    timer_period = 0.5
    self.timer1 = self.create_timer(timer_period, self.get_x_values)
    self.timer2 = self.create_timer(timer_period, self.get_y_values)
    
    # Initialize a counter variable
    self.i = 0  
    self.j = 0
     
    # List the goal destinations
    # We create a list of the (x,y) coordinate goals
    self.goal_x_coordinates = [ 0.0, 3.0, 0.0, -1.5, -1.5,  4.5, 0.0]
    self.goal_y_coordinates = [-4.0, 1.0, 1.5,  1.0, -3.0, -4.0, 0.0] 
   
  def get_x_values(self):
    """
    Callback function.
    """
    msg = Float64MultiArray()
    msg.data = self.goal_x_coordinates
    
    # Publish the x coordinates to the topic
    self.publisher_goal_x_values.publish(msg)
     
    # Increment counter variable
    self.i += 1
     
  def get_y_values(self):
    """
    Callback function.
    """
    msg = Float64MultiArray()
    msg.data = self.goal_y_coordinates
    
    # Publish the y coordinates to the topic
    self.publisher_goal_y_values.publish(msg)
     
    # Increment counter variable
    self.j += 1
     
def main(args=None):
 
  # Initialize the rclpy library
  rclpy.init(args=args)
 
  # Create the node
  goal_publisher = GoalPublisher()
 
  # Spin the node so the callback function is called.
  # Publish any pending messages to the topics.
  rclpy.spin(goal_publisher)
 
  # Destroy the node explicitly
  # (optional - otherwise it will be done automatically
  # when the garbage collector destroys the node object)
  goal_publisher.destroy_node()
 
  # Shutdown the ROS client library for Python
  rclpy.shutdown()
 
if __name__ == '__main__':
  main()

The Bug2 Algorithm for Robot Motion Planning

In this tutorial, we will learn about the Bug2 algorithm for robot motion planning. The Bug2 algorithm is used when you have a mobile robot:

  • With a known starting location
  • With a known goal location
  • Inside an unexplored environment
  • Contains a distance sensor that can detect the distances to objects and walls in the environment (e.g. like an ultrasonic sensor or a laser distance sensor.)
  • Contains an encoder that the robot can use to estimate how far the robot has traveled from the starting location.

Here is a video of a simulated robot I developed in Gazebo and ROS2 that uses the Bug2 algorithm to move from a starting point to a goal point, avoiding walls along the way.

Real-World Applications

Imagine an automatic vacuum cleaner like the one below. It vacuums a room and then needs to move autonomously to another room so that it can clean that room. Along the way, the robot must avoid bumping into walls.

roomba_discovery

Algorithm Description

On a high level, the Bug2 algorithm has two main modes:

  1. Go to Goal Mode: Move from the current location towards the goal (x,y) coordinate.
  2. Wall Following Mode: Move along a wall.

Here is pseudocode for the algorithm:

1.      Calculate a start-goal line. The start-goal line is an imaginary line that connects the starting position to the goal position.

2.      While Not at the Goal

  • Move towards the goal along the start-goal line.
    • If a wall is encountered:
      • Remember the location where the wall was first encountered. This is the “hit point.”
      • Follow the wall until you encounter the start-goal line. This point is known as the “leave point.”
        •  If the leave point is closer to the goal than the hit point, leave the wall, and move towards the goal again.
        • Otherwise, continue following the wall.

That’s the algorithm. In the image below, I have labeled the hit points and leave points.

1-hit-points-leave-points-bug2-algorithmJPG

Python Implementation (ROS2)

The robot you see in the video at the beginning of this tutorial has a laser distance scanner mounted on top of it that enables it to detect distances from 0 degrees (right side of the robot) to 180 degrees (left side of the robot). It is using three Python functions, bug2, follow_wall, and go_to_goal.  

Below is my complete code. It runs in ROS2 Foxy Fitzroy and Gazebo and makes use of the Differential Drive plugin. 

Since we’re just focusing on the algorithms in this post, I won’t go into the details of how I created the robot, built the simulated maze, and created publisher and subscriber nodes for the goal locations, robot pose, and laser distance sensor information (I might cover this in a future post).

Don’t be intimidated by the code. It is long but well-commented. Go through each piece and function one step at a time. Focus only on the following three methods:

  • go_to_goal(self) method 
  • follow_wall(self) method
  • bug2(self) method

Those three methods encapsulate the implementation of Bug2. 

I created descriptive variable names so that you can compare the code to the pseudocode I wrote earlier in this tutorial. 

# Author: Addison Sears-Collins
# Date: December 17, 2020
# ROS Version: ROS 2 Foxy Fitzroy

############## IMPORT LIBRARIES #################
# Python math library
import math 

# ROS client library for Python
import rclpy 

# Enables pauses in the execution of code
from time import sleep 

# Used to create nodes
from rclpy.node import Node

# Enables the use of the string message type
from std_msgs.msg import String 

# Twist is linear and angular velocity
from geometry_msgs.msg import Twist 	
					
# Handles LaserScan messages to sense distance to obstacles (i.e. walls)      	
from sensor_msgs.msg import LaserScan	 

# Handle Pose messages
from geometry_msgs.msg import Pose 

# Handle float64 arrays
from std_msgs.msg import Float64MultiArray
					
# Handles quality of service for LaserScan data
from rclpy.qos import qos_profile_sensor_data 

# Scientific computing library
import numpy as np 

class PlaceholderController(Node):
	"""
	Create a Placeholder Controller class, which is a subclass of the Node 
	class for ROS2.
	"""

	def __init__(self):
		"""
		Class constructor to set up the node
		"""
		####### INITIALIZE ROS PUBLISHERS AND SUBSCRIBERS##############
		# Initiate the Node class's constructor and give it a name
		super().__init__('PlaceholderController')
		
		# Create a subscriber
		# This node subscribes to messages of type Float64MultiArray  
		# over a topic named: /en613/state_est
		# The message represents the current estimated state:
		#   [x, y, yaw]
		# The callback function is called as soon as a message 
		# is received.
		# The maximum number of queued messages is 10.
		self.subscription = self.create_subscription(
			Float64MultiArray,
			'/en613/state_est',
			self.state_estimate_callback,
			10)
		self.subscription  # prevent unused variable warning
		
		# Create a subscriber
		# This node subscribes to messages of type 
		# sensor_msgs/LaserScan		
		self.scan_subscriber = self.create_subscription(
			LaserScan,
			'/en613/scan',
			self.scan_callback,
			qos_profile=qos_profile_sensor_data)
			
		# Create a subscriber
		# This node subscribes to messages of type geometry_msgs/Pose 
		# over a topic named: /en613/goal
		# The message represents the the goal position.
		# The callback function is called as soon as a message 
		# is received.
		# The maximum number of queued messages is 10.
		self.subscription_goal_pose = self.create_subscription(
			Pose,
			'/en613/goal',
			self.pose_received,
			10)
			
		# Create a publisher
		# This node publishes the desired linear and angular velocity 
		# of the robot (in the robot chassis coordinate frame) to the 
		# /en613/cmd_vel topic. Using the diff_drive
		# plugin enables the basic_robot model to read this 
		# /end613/cmd_vel topic and execute the motion accordingly.
		self.publisher_ = self.create_publisher(
			Twist, 
			'/en613/cmd_vel', 
			10)
		
		# Initialize the LaserScan sensor readings to some large value
		# Values are in meters.
		self.left_dist = 999999.9 # Left
		self.leftfront_dist = 999999.9 # Left-front
		self.front_dist = 999999.9 # Front
		self.rightfront_dist = 999999.9 # Right-front
		self.right_dist = 999999.9 # Right

		################### ROBOT CONTROL PARAMETERS ##################
		
		# Maximum forward speed of the robot in meters per second
		# Any faster than this and the robot risks falling over.
		self.forward_speed = 0.035	
		
		# Current position and orientation of the robot in the global 
		# reference frame
		self.current_x = 0.0
		self.current_y = 0.0
		self.current_yaw = 0.0
		
		# By changing the value of self.robot_mode, you can alter what
		# the robot will do when the program is launched.
		# 	"obstacle avoidance mode": Robot will avoid obstacles
		#  	"go to goal mode": Robot will head to an x,y coordinate   
		# 	"wall following mode": Robot will follow a wall 
		self.robot_mode = "go to goal mode"
		
		############# OBSTACLE AVOIDANCE MODE PARAMETERS ##############
		
		# Obstacle detection distance threshold
		self.dist_thresh_obs = 0.25 # in meters
		
		# Maximum left-turning speed	
		self.turning_speed = 0.25 # rad/s

		############# GO TO GOAL MODE PARAMETERS ######################
		# Finite states for the go to goal mode
		# 	"adjust heading": Orient towards a goal x, y coordinate
		# 	"go straight": Go straight towards goal x, y coordinate
		# 	"goal achieved": Reached goal x, y coordinate
		self.go_to_goal_state = "adjust heading"
		
		# List the goal destinations
		# We create a list of the (x,y) coordinate goals
		self.goal_x_coordinates = False # [ 0.0, 3.0, 0.0, -1.5, -1.5,  4.5, 0.0]
		self.goal_y_coordinates = False # [-4.0, 1.0, 1.5,  1.0, -3.0, -4.0, 0.0]
		
		# Keep track of which goal we're headed towards
		self.goal_idx = 0 
		
		# Keep track of when we've reached the end of the goal list
		self.goal_max_idx =  None # len(self.goal_x_coordinates) - 1 
		
		# +/- 2.0 degrees of precision
		self.yaw_precision = 2.0 * (math.pi / 180) 
		
		# How quickly we need to turn when we need to make a heading
		# adjustment (rad/s)
		self.turning_speed_yaw_adjustment = 0.0625
		
		# Need to get within +/- 0.2 meter (20 cm) of (x,y) goal
		self.dist_precision = 0.2 

		############# WALL FOLLOWING MODE PARAMETERS ##################		
		# Finite states for the wall following mode
		# 	"turn left": Robot turns towards the left
		#	"search for wall": Robot tries to locate the wall		
		# 	"follow wall": Robot moves parallel to the wall
		self.wall_following_state = "turn left"
		
		# Set turning speeds (to the left) in rad/s 
		# These values were determined by trial and error.
		self.turning_speed_wf_fast = 1.0  # Fast turn
		self.turning_speed_wf_slow = 0.125 # Slow turn
		
		# Wall following distance threshold.
		# We want to try to keep within this distance from the wall.
		self.dist_thresh_wf = 0.45 # in meters	
		
		# We don't want to get too close to the wall though.
		self.dist_too_close_to_wall = 0.15 # in meters
		
		################### BUG2 PARAMETERS ###########################
		
		# Bug2 Algorithm Switch
		# Can turn "ON" or "OFF" depending on if you want to run Bug2
		# Motion Planning Algorithm
		self.bug2_switch = "ON"
		
		# Start-Goal Line Calculated?
		self.start_goal_line_calculated = False
		
		# Start-Goal Line Parameters
		self.start_goal_line_slope_m = 0
		self.start_goal_line_y_intercept = 0
		self.start_goal_line_xstart = 0
		self.start_goal_line_xgoal = 0
		self.start_goal_line_ystart = 0
		self.start_goal_line_ygoal = 0
		
		# Anything less than this distance means we have encountered
		# a wall. Value determined through trial and error.
		self.dist_thresh_bug2 = 0.15 
		
		# Leave point must be within +/- 0.1m of the start-goal line
		# in order to go from wall following mode to go to goal mode
		self.distance_to_start_goal_line_precision = 0.1
		
		# Used to record the (x,y) coordinate where the robot hit
		# a wall.
		self.hit_point_x = 0
		self.hit_point_y = 0
		
		# Distance between the hit point and the goal in meters
		self.distance_to_goal_from_hit_point = 0.0
		
		# Used to record the (x,y) coordinate where the robot left
		# a wall.		
		self.leave_point_x = 0
		self.leave_point_y = 0
		
		# Distance between the leave point and the goal in meters
		self.distance_to_goal_from_leave_point = 0.0
		
		# The hit point and leave point must be far enough 
		# apart to change state from wall following to go to goal
		# This value helps prevent the robot from getting stuck and
		# rotating in endless circles.
		# This distance was determined through trial and error.
		self.leave_point_to_hit_point_diff = 0.25 # in meters
		
	def pose_received(self,msg):
		"""
		Populate the pose.
		"""
		self.goal_x_coordinates = [msg.position.x]
		self.goal_y_coordinates = [msg.position.y]
		self.goal_max_idx = len(self.goal_x_coordinates) - 1		
	
	def scan_callback(self, msg):
		"""
		This method gets called every time a LaserScan message is 
		received on the /en613/scan ROS topic	
		"""
		# Read the laser scan data that indicates distances
		# to obstacles (e.g. wall) in meters and extract
		# 5 distinct laser readings to work with.
		# Each reading is separated by 45 degrees.
		# Assumes 181 laser readings, separated by 1 degree. 
		# (e.g. -90 degrees to 90 degrees....0 to 180 degrees)
		self.left_dist = msg.ranges[180]
		self.leftfront_dist = msg.ranges[135]
		self.front_dist = msg.ranges[90]
		self.rightfront_dist = msg.ranges[45]
		self.right_dist = msg.ranges[0]
		
		# The total number of laser rays. Used for testing.
		#number_of_laser_rays = str(len(msg.ranges))		
			
		# Print the distance values (in meters) for testing
		#self.get_logger().info('L:%f LF:%f F:%f RF:%f R:%f' % (
		#	self.left_dist,
		#	self.leftfront_dist,
		#	self.front_dist,
		#	self.rightfront_dist,
		#	self.right_dist))
		
		if self.robot_mode == "obstacle avoidance mode":
			self.avoid_obstacles()
			
	def state_estimate_callback(self, msg):
		"""
		Extract the position and orientation data. 
		This callback is called each time
		a new message is received on the '/en613/state_est' topic
		"""
		# Update the current estimated state in the global reference frame
		curr_state = msg.data
		self.current_x = curr_state[0]
		self.current_y = curr_state[1]
		self.current_yaw = curr_state[2]
		
		# Wait until we have received some goal destinations.
		if self.goal_x_coordinates == False and self.goal_y_coordinates == False:
			return
				
		# Print the pose of the robot
		# Used for testing
		#self.get_logger().info('X:%f Y:%f YAW:%f' % (
		#	self.current_x,
		#	self.current_y,
		#	np.rad2deg(self.current_yaw)))  # Goes from -pi to pi 
		
		# See if the Bug2 algorithm is activated. If yes, call bug2()
		if self.bug2_switch == "ON":
			self.bug2()
		else:
			
			if self.robot_mode == "go to goal mode":
				self.go_to_goal()
			elif self.robot_mode == "wall following mode":
				self.follow_wall()
			else:
				pass # Do nothing			
		
	def avoid_obstacles(self):
		"""
		Wander around the maze and avoid obstacles.
		"""
		# Create a Twist message and initialize all the values 
		# for the linear and angular velocities
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0	
		
		# Logic for avoiding obstacles (e.g. walls)
		# >d means no obstacle detected by that laser beam
		# <d means an obstacle was detected by that laser beam
		d = self.dist_thresh_obs
		if   self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist > d:
			msg.linear.x = self.forward_speed # Go straight forward
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist > d:
			msg.angular.z = self.turning_speed  # Turn left
		elif self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed  
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist > d:
			msg.angular.z = -self.turning_speed # Turn right
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist > d:
			msg.angular.z = -self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist < d:
			msg.linear.x = self.forward_speed
		else:
			pass 
			
		# Send the velocity commands to the robot by publishing 
		# to the topic
		self.publisher_.publish(msg)
							
	def go_to_goal(self):
		"""
		This code drives the robot towards to the goal destination
		"""
		# Create a geometry_msgs/Twist message
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0
		
		# If Bug2 algorithm is activated
		if self.bug2_switch == "ON":
		
			# If the wall is in the way
			d = self.dist_thresh_bug2
			if (	self.leftfront_dist < d or 
				self.front_dist < d or 
				self.rightfront_dist < d):
			
				# Change the mode to wall following mode.
				self.robot_mode = "wall following mode"
				
				# Record the hit point	
				self.hit_point_x = self.current_x
				self.hit_point_y = self.current_y
				
				# Record the distance to the goal from the 
				# hit point
				self.distance_to_goal_from_hit_point = (
					math.sqrt((
					pow(self.goal_x_coordinates[self.goal_idx] - self.hit_point_x, 2)) + (
					pow(self.goal_y_coordinates[self.goal_idx] - self.hit_point_y, 2)))) 	
					
				# Make a hard left to begin following wall
				msg.angular.z = self.turning_speed_wf_fast
						
				# Send command to the robot
				self.publisher_.publish(msg)
				
				# Exit this function 		
				return
			
		# Fix the heading		
		if (self.go_to_goal_state == "adjust heading"):
			
			# Calculate the desired heading based on the current position 
			# and the desired position
			desired_yaw = math.atan2(
					self.goal_y_coordinates[self.goal_idx] - self.current_y,
					self.goal_x_coordinates[self.goal_idx] - self.current_x)
			
			# How far off is the current heading in radians?		
			yaw_error = desired_yaw - self.current_yaw
			
			# Adjust heading if heading is not good enough
			if math.fabs(yaw_error) > self.yaw_precision:
			
				if yaw_error > 0:	
					# Turn left (counterclockwise)		
					msg.angular.z = self.turning_speed_yaw_adjustment 				
				else:
					# Turn right (clockwise)
					msg.angular.z = -self.turning_speed_yaw_adjustment
				
				# Command the robot to adjust the heading
				self.publisher_.publish(msg)
				
			# Change the state if the heading is good enough
			else:				
				# Change the state
				self.go_to_goal_state = "go straight"
				
				# Command the robot to stop turning
				self.publisher_.publish(msg)		

		# Go straight										
		elif (self.go_to_goal_state == "go straight"):
			
			position_error = math.sqrt(
						pow(
						self.goal_x_coordinates[self.goal_idx] - self.current_x, 2)
						+ pow(
						self.goal_y_coordinates[self.goal_idx] - self.current_y, 2)) 
						
			
			# If we are still too far away from the goal						
			if position_error > self.dist_precision:

				# Move straight ahead
				msg.linear.x = self.forward_speed
					
				# Command the robot to move
				self.publisher_.publish(msg)
			
				# Check our heading			
				desired_yaw = math.atan2(
					self.goal_y_coordinates[self.goal_idx] - self.current_y,
					self.goal_x_coordinates[self.goal_idx] - self.current_x)
				
				# How far off is the heading?	
				yaw_error = desired_yaw - self.current_yaw		
		
				# Check the heading and change the state if there is too much heading error
				if math.fabs(yaw_error) > self.yaw_precision:
					
					# Change the state
					self.go_to_goal_state = "adjust heading"
				
			# We reached our goal. Change the state.
			else:			
				# Change the state
				self.go_to_goal_state = "goal achieved"
				
				# Command the robot to stop
				self.publisher_.publish(msg)
		
		# Goal achieved			
		elif (self.go_to_goal_state == "goal achieved"):
				
			self.get_logger().info('Goal achieved! X:%f Y:%f' % (
				self.goal_x_coordinates[self.goal_idx],
				self.goal_y_coordinates[self.goal_idx]))
			
			# Get the next goal
			self.goal_idx = self.goal_idx + 1
		
			# Do we have any more goals left?			
			# If we have no more goals left, just stop
			if (self.goal_idx > self.goal_max_idx):
				self.get_logger().info('Congratulations! All goals have been achieved.')
				while True:
					pass

			# Let's achieve our next goal
			else: 
				# Change the state
				self.go_to_goal_state = "adjust heading"				

			# We need to recalculate the start-goal line if Bug2 is running
			self.start_goal_line_calculated = False				
		
		else:
			pass
			
	def follow_wall(self):
		"""
		This method causes the robot to follow the boundary of a wall.
		"""
		# Create a geometry_msgs/Twist message
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0			

		# Special code if Bug2 algorithm is activated
		if self.bug2_switch == "ON":
		
			# Calculate the point on the start-goal 
			# line that is closest to the current position
			x_start_goal_line = self.current_x
			y_start_goal_line = (
				self.start_goal_line_slope_m * (
				x_start_goal_line)) + (
				self.start_goal_line_y_intercept)
						
			# Calculate the distance between current position 
			# and the start-goal line
			distance_to_start_goal_line = math.sqrt(pow(
						x_start_goal_line - self.current_x, 2) + pow(
						y_start_goal_line - self.current_y, 2)) 
							
			# If we hit the start-goal line again				
			if distance_to_start_goal_line < self.distance_to_start_goal_line_precision:
			
				# Determine if we need to leave the wall and change the mode
				# to 'go to goal'
				# Let this point be the leave point
				self.leave_point_x = self.current_x
				self.leave_point_y = self.current_y

				# Record the distance to the goal from the leave point
				self.distance_to_goal_from_leave_point = math.sqrt(
					pow(self.goal_x_coordinates[self.goal_idx] 
					- self.leave_point_x, 2)
					+ pow(self.goal_y_coordinates[self.goal_idx]  
					- self.leave_point_y, 2)) 
			
				# Is the leave point closer to the goal than the hit point?
				# If yes, go to goal. 
				diff = self.distance_to_goal_from_hit_point - self.distance_to_goal_from_leave_point
				if diff > self.leave_point_to_hit_point_diff:
						
					# Change the mode. Go to goal.
					self.robot_mode = "go to goal mode"


				# Exit this function
				return				
		
		# Logic for following the wall
		# >d means no wall detected by that laser beam
		# <d means an wall was detected by that laser beam
		d = self.dist_thresh_wf
		
		if self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist > d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist > d:
			self.wall_following_state = "turn left"
			msg.angular.z = self.turning_speed_wf_fast
			
			
		elif (self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist < d):
			if (self.rightfront_dist < self.dist_too_close_to_wall):
				# Getting too close to the wall
				self.wall_following_state = "turn left"
				msg.linear.x = self.forward_speed
				msg.angular.z = self.turning_speed_wf_fast		
			else: 			
				# Go straight ahead
				self.wall_following_state = "follow wall"  
				msg.linear.x = self.forward_speed	
									
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist > d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist < d:
			self.wall_following_state = "turn left"
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist > d:
			self.wall_following_state = "turn left" 
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist < d:
			self.wall_following_state = "turn left" 
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist < d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		else:
			pass 

		# Send velocity command to the robot
		self.publisher_.publish(msg)	
		
	def bug2(self):
	
		# Each time we start towards a new goal, we need to calculate the start-goal line
		if self.start_goal_line_calculated == False:
		
			# Make sure go to goal mode is set.
			self.robot_mode = "go to goal mode"				

			self.start_goal_line_xstart = self.current_x
			self.start_goal_line_xgoal = self.goal_x_coordinates[self.goal_idx]
			self.start_goal_line_ystart = self.current_y
			self.start_goal_line_ygoal = self.goal_y_coordinates[self.goal_idx]
			
			# Calculate the slope of the start-goal line m
			self.start_goal_line_slope_m = (
				(self.start_goal_line_ygoal - self.start_goal_line_ystart) / (
				self.start_goal_line_xgoal - self.start_goal_line_xstart))
			
			# Solve for the intercept b
			self.start_goal_line_y_intercept = self.start_goal_line_ygoal - (
					self.start_goal_line_slope_m * self.start_goal_line_xgoal) 
				
			# We have successfully calculated the start-goal line
			self.start_goal_line_calculated = True
			
		if self.robot_mode == "go to goal mode":
			self.go_to_goal()			
		elif self.robot_mode == "wall following mode":
			self.follow_wall()
def main(args=None):

    # Initialize rclpy library
    rclpy.init(args=args)
    
    # Create the node
    controller = PlaceholderController()

    # Spin the node so the callback function is called
    # Pull messages from any topics this node is subscribed to
    # Publish any pending messages to the topics
    rclpy.spin(controller)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    controller.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Here is the state estimator that goes with the code above. I used an Extended Kalman Filter to filter the odometry measurements from the mobile robot.

# Author: Addison Sears-Collins
# Date: December 8, 2020
# ROS Version: ROS 2 Foxy Fitzroy

# Python math library
import math

# ROS client library for Python
import rclpy

# Used to create nodes
from rclpy.node import Node

# Twist is linear and angular velocity
from geometry_msgs.msg import Twist 

# Position, orientation, linear velocity, angular velocity
from nav_msgs.msg import Odometry

# Handles laser distance scan to detect obstacles
from sensor_msgs.msg import LaserScan

# Used for laser scan
from rclpy.qos import qos_profile_sensor_data

# Enable use of std_msgs/Float64MultiArray message
from std_msgs.msg import Float64MultiArray 

# Scientific computing library for Python
import numpy as np

class PlaceholderEstimator(Node):
	"""
	Class constructor to set up the node
	"""
	def __init__(self):
		############## INITIALIZE ROS PUBLISHERS AND SUBSCRIBERS ######
	
	
		# Initiate the Node class's constructor and give it a name
		super().__init__('PlaceholderEstimator')
		
		# Create a subscriber 
		# This node subscribes to messages of type 
		# geometry_msgs/Twist.msg
		# The maximum number of queued messages is 10.
		self.velocity_subscriber = self.create_subscription(
			Twist,
			'/en613/cmd_vel',
			self.velocity_callback,
			10)
			
		# Create a subscriber
		# This node subscribes to messages of type
		# nav_msgs/Odometry
		self.odom_subscriber = self.create_subscription(
			Odometry,
			'/en613/odom',
			self.odom_callback,
			10)
		
		# Create a publisher
		# This node publishes the estimated position (x, y, yaw) 
		# The type of message is std_msgs/Float64MultiArray
		self.publisher_state_est = self.create_publisher(
			Float64MultiArray, 
			'/en613/state_est', 
			10)
			
		############# STATE TRANSITION MODEL PARAMETERS ###############	
		
		# Time step from one time step t-1 to the next time step t
		self.delta_t = 0.002 # seconds
		
		# Keep track of the estimate of the yaw angle
		# for control input vector calculation.
		self.est_yaw = 0.0
		
		# A matrix
		# 3x3 matrix -> number of states x number of states matrix
		# Expresses how the state of the system [x,y,yaw] changes 
		# from t-1 to t when no control command is executed. Typically 
		# a robot on wheels only drives when the wheels are commanded 
		# to turn.
		# For this case, A is the identity matrix.
		# A is sometimes F in the literature.
		self.A_t_minus_1 = np.array([ 	[1.0,  0,   0],
						[  0,1.0,   0],
						[  0,  0, 1.0]])
		
		# The estimated state vector at time t-1 in the global 
		# reference frame
		# [x_t_minus_1, y_t_minus_1, yaw_t_minus_1]
		# [meters, meters, radians]
		self.state_vector_t_minus_1 = np.array([0.0,0.0,0.0])
		
		# The control input vector at time t-1 in the global 
		# reference frame
		# [v,v,yaw_rate]
		# [meters/second, meters/second, radians/second]
		# In the literature, this is commonly u.
		self.control_vector_t_minus_1 = np.array([0.001,0.001,0.001])
		
		# Noise applied to the forward kinematics (calculation
		# of the estimated state at time t from the state transition
		# model of the mobile robot. This is a vector with the
		# number of elements equal to the number of states.
		self.process_noise_v_t_minus_1 = np.array([0.096,0.096,0.032])

		############# MEASUREMENT MODEL PARAMETERS ####################	
		
		# Measurement matrix H_t
		# Used to convert the predicted state estimate at time t=1
		# into predicted sensor measurements at time t=1.
		# In this case, H will be the identity matrix since the 
		# estimated state maps directly to state measurements from the 
		# odometry data [x, y, yaw]
		# H has the same number of rows as sensor measurements
		# and same number of columns as states.
		self.H_t = np.array([ 	[1.0,  0,   0],
					[  0,1.0,   0],
					[  0,  0, 1.0]])
				
		# Sensor noise. This is a vector with the
		# number of elements as the number of sensor measurements.
		self.sensor_noise_w_t = np.array([0.07,0.07,0.04])

		############# EXTENDED KALMAN FILTER PARAMETERS ###############
		
		# State covariance matrix P_t_minus_1
		# This matrix has the same number of rows (and columns) as the 
		# number of states (i.e. 3x3 matrix). P is sometimes referred
		# to as Sigma in the literature. It represents an estimate of 
		# the accuracy of the state estimate at t=1 made using the
		# state transition matrix. We start off with guessed values.
		self.P_t_minus_1 = np.array([ 	[0.1,  0,   0],
						[  0,0.1,   0],
						[  0,  0, 0.1]])
						
		# State model noise covariance matrix Q_t
		# When Q is large, the Kalman Filter tracks large changes in 
		# the sensor measurements more closely than for smaller Q.
		# Q is a square matrix that has the same number of rows as 
		# states.
		self.Q_t = np.array([ 		[1.0,   0,   0],
						[  0, 1.0,   0],
						[  0,   0, 1.0]])
						
		# Sensor measurement noise covariance matrix R_t
		# Has the same number of rows and columns as sensor measurements.
		# If we are sure about the measurements, R will be near zero.
		self.R_t = np.array([ 		[1.0,   0,    0],
						[  0, 1.0,    0],
						[  0,    0, 1.0]])
		
	def velocity_callback(self, msg):
		"""
		Listen to the velocity commands (linear forward velocity 
		in the x direction in the robot's reference frame and 
		angular velocity (yaw rate) around the robot's z-axis.
		Convert those velocity commands into a 3-element control
		input vector ... 
		[v,v,yaw_rate]
		[meters/second, meters/second, radians/second]
		"""
		# Forward velocity in the robot's reference frame
		v = msg.linear.x
		
		# Angular velocity around the robot's z axis
		yaw_rate = msg.angular.z
		
		# [v,v,yaw_rate]		
		self.control_vector_t_minus_1[0] = v
		self.control_vector_t_minus_1[1] = v
		self.control_vector_t_minus_1[2] = yaw_rate

	def odom_callback(self, msg):
		"""
		Receive the odometry information containing the position and orientation
		of the robot in the global reference frame. 
		The position is x, y, z.
		The orientation is a x,y,z,w quaternion. 
		"""						
		roll, pitch, yaw = self.euler_from_quaternion(
					msg.pose.pose.orientation.x,
					msg.pose.pose.orientation.y,
					msg.pose.pose.orientation.z,
					msg.pose.pose.orientation.w)
				
		obs_state_vector_x_y_yaw = [msg.pose.pose.position.x,msg.pose.pose.position.y,yaw]
		
		# These are the measurements taken by the odometry in Gazebo
		z_t_observation_vector =  np.array([obs_state_vector_x_y_yaw[0],
						obs_state_vector_x_y_yaw[1],
						obs_state_vector_x_y_yaw[2]])
		
		# Apply the Extended Kalman Filter
		# This is the updated state estimate after taking the latest
		# sensor (odometry) measurements into account.				
		updated_state_estimate_t = self.ekf(z_t_observation_vector)
		
		# Publish the estimate state
		self.publish_estimated_state(updated_state_estimate_t)

	def publish_estimated_state(self, state_vector_x_y_yaw):
		"""
		Publish the estimated pose (position and orientation) of the 
		robot to the '/en613/state_est' topic. 
		:param: state_vector_x_y_yaw [x, y, yaw] 
			x is in meters, y is in meters, yaw is in radians
		"""
		msg = Float64MultiArray()
		msg.data = state_vector_x_y_yaw
		self.publisher_state_est.publish(msg)

	def euler_from_quaternion(self, x, y, z, w):
		"""
		Convert a quaternion into euler angles (roll, pitch, yaw)
		roll is rotation around x in radians (counterclockwise)
		pitch is rotation around y in radians (counterclockwise)
		yaw is rotation around z in radians (counterclockwise)
		"""
		t0 = +2.0 * (w * x + y * z)
		t1 = +1.0 - 2.0 * (x * x + y * y)
		roll_x = math.atan2(t0, t1)
    
		t2 = +2.0 * (w * y - z * x)
		t2 = +1.0 if t2 > +1.0 else t2
		t2 = -1.0 if t2 < -1.0 else t2
		pitch_y = math.asin(t2)
    
		t3 = +2.0 * (w * z + x * y)
		t4 = +1.0 - 2.0 * (y * y + z * z)
		yaw_z = math.atan2(t3, t4)
    
		return roll_x, pitch_y, yaw_z # in radians
		
	def getB(self,yaw,dt):
		"""
		Calculates and returns the B matrix
		
		3x3 matix -> number of states x number of control inputs
		The control inputs are the forward speed and the rotation 
		rate around the z axis from the x-axis in the 
		counterclockwise direction.
		[v,v,yaw_rate]
		Expresses how the state of the system [x,y,yaw] changes 
		from t-1 to t
		due to the control commands (i.e. inputs).
		:param yaw: The yaw (rotation angle around the z axis) in rad 
		:param dt: The change in time from time step t-1 to t in sec
		"""
		B = np.array([	[np.cos(yaw) * dt,  0,   0],
				[  0, np.sin(yaw) * dt,   0],
				[  0,  0, dt]]) 				
		return B
		
	def ekf(self,z_t_observation_vector):
		"""
		Extended Kalman Filter. Fuses noisy sensor measurement to 
		create an optimal estimate of the state of the robotic system.
		
		INPUT
		:param z_t_observation_vector The observation from the Odometry
			3x1 NumPy Array [x,y,yaw] in the global reference frame
			in [meters,meters,radians].
		
		OUTPUT
		:return state_estimate_t optimal state estimate at time t	
			[x,y,yaw]....3x1 list --->
			[meters,meters,radians]
					
		"""
		######################### Predict #############################
		# Predict the state estimate at time t based on the state 
		# estimate at time t-1 and the control input applied at time
		# t-1.
		state_estimate_t = self.A_t_minus_1 @ (
			self.state_vector_t_minus_1) + (
			self.getB(self.est_yaw,self.delta_t)) @ (
			self.control_vector_t_minus_1) + (
			self.process_noise_v_t_minus_1)
			
		# Predict the state covariance estimate based on the previous
		# covariance and some noise
		P_t = self.A_t_minus_1 @ self.P_t_minus_1 @ self.A_t_minus_1.T + (
			self.Q_t)
		
		################### Update (Correct) ##########################
		# Calculate the difference between the actual sensor measurements
		# at time t minus what the measurement model predicted 
		# the sensor measurements would be for the current timestep t.
		measurement_residual_y_t = z_t_observation_vector - (
			(self.H_t @ state_estimate_t) + (
			self.sensor_noise_w_t))
			
		# Calculate the measurement residual covariance
		S_t = self.H_t @ P_t @ self.H_t.T + self.R_t
		
		# Calculate the near-optimal Kalman gain
		# We use pseudoinverse since some of the matrices might be
		# non-square or singular.
		K_t = P_t @ self.H_t.T @ np.linalg.pinv(S_t)
		
		# Calculate an updated state estimate for time t
		state_estimate_t = state_estimate_t + (K_t @ measurement_residual_y_t)
		
		# Update the state covariance estimate for time t
		P_t = P_t - (K_t @ self.H_t @ P_t)
		
		#### Update global variables for the next iteration of EKF ####		
		# Update the estimated yaw		
		self.est_yaw = state_estimate_t[2]
		
		# Update the state vector for t-1
		self.state_vector_t_minus_1 = state_estimate_t
		
		# Update the state covariance matrix
		self.P_t_minus_1 = P_t
		
		######### Return the updated state estimate ###################
		state_estimate_t = state_estimate_t.tolist()
		return state_estimate_t


def main(args=None):
    """
    Entry point for the progam.
    """
    
    # Initialize rclpy library
    rclpy.init(args=args)

    # Create the node
    estimator = PlaceholderEstimator()

    # Spin the node so the callback function is called.
    # Pull messages from any topics this node is subscribed to.
    # Publish any pending messages to the topics.
    rclpy.spin(estimator)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    estimator.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Wall Following Robot Mode Demo

Just for fun, I created a switch in the first section of the code (see previous section) that enables you to have the robot do nothing but follow walls. Here is a video demonstration of that.

Obstacle Avoidance Robot Mode Demo

Here is a demo of what the robot looks like when it does nothing but wander around the room, avoiding obstacles and walls.