How to Detect and Classify Traffic Lights

In this tutorial, we will develop a program to detect traffic lights and then classify their color (e.g. red, green, and yellow). Our program will first use a pretrained neural network to detect traffic lights, then it will run those detections through another neural network which is trained to identify the color. By the end of this tutorial, you will be able to develop this:

Our goal is to build an early prototype of a system that can be used in an autonomous vehicle.

Real-World Applications

  • Self-driving cars/autonomous vehicles

Prerequisites

Helpful Tip

As you work through this tutorial, focus on the end goals I listed in the beginning. Don’t get bogged down in trying to understand every last detail of the math and the libraries we will use to develop the application.

We are trying to build products not publish research papers. Focus on the inputs, the outputs, and what the algorithm is supposed to do at a high level.

Get a working traffic light detector and classifier up and running; and, at some later date when you want to add more complexity to your project or write a research paper, you can dive deeper under the hood to understand all the details.

Trying to understand every last detail is like trying to build your own database from scratch in order to start a website or taking a course on internal combustion engines to learn how to drive a car. 

Let’s get started!

Find Some Videos and an Image

The first thing we need to do is find some videos and an image to serve as our test cases.

We want to download videos and images that show traffic lights. Images of streets from the point of view of a driver in a car are good candidates.

I found some good candidates on Pixabay.com and Dreamstime.com. Type “driving” or “traffic lights” in the video search on that website.

The Bosch Small Traffic Lights Dataset is another source for images.

Here is what a sample image might look like:

image-green-light

Installation and Setup

We now need to make sure we have all the software packages installed. Check to see if you have OpenCV installed on your machine. If you are using Anaconda, you can type:

conda install -c conda-forge opencv

Alternatively, you can type:

pip install opencv-python

Make sure you have NumPy installed, a scientific computing library for Python.

If you’re using Anaconda, you can type:

conda install numpy

Alternatively, you can type:

pip install numpy

Install Matplotlib, a plotting library for Python.

For Anaconda users:

conda install -c conda-forge matplotlib

Otherwise, you can install like this:

pip install matplotlib

Extract Traffic Lights From Images

We now need to go through all of our images and extract the traffic lights. We want a good mix of green, yellow, and red traffic light examples. 

To do this, we will create two programs:

extract_traffic_lights.py: This program is the driver. You will run this program to create cropped images of traffic lights.

object_detection.py: This program contains a bunch of methods for traffic light detection (as well as other objects like pedestrians, cars, and stop signs).

Here is the code for extract_traffic_lights.py:

# Project: How to Detect and Classify Traffic Lights
# Author: Addison Sears-Collins
# Date created: January 11, 2021
# Description: This program extracts traffic lights from images.

import cv2 # Computer vision library
import object_detection # Contains methods for object detection in images

# Get a list of jpeg image files containing traffic lights
files = object_detection.get_files('traffic_light_input/*.jpg')

# Load the object detection model
this_model = object_detection.load_ssd_coco()

# Keep track of the number of traffic lights found
traffic_light_count = 0

# Keep track of the number of image files that were processed
file_count = 0

# Display a count of the number of images we need to process
print("Number of Images:", len(files))

# Go through each image file, one at a time
for file in files:

  # Detect objects in the image
  # img_rgb is the original image in RGB format
  # out is a dictionary containing the results of object detection
  # file_name is the name of the file
  (img_rgb, out, file_name) = object_detection.perform_object_detection(model=this_model, file_name=file, save_annotated=None, model_traffic_lights=None)
	
  # Every 10 files that are processed
  if (file_count % 10) == 0:

    # Display a count of the number of files that have been processed
    print("Images processed:", file_count)

    # Display the total number of traffic lights that have been identified so far
    print("Number of Traffic lights identified: ", traffic_light_count)
		
  # Increment the number of files by 1
  file_count = file_count + 1

  # For each traffic light (i.e. bounding box) that was detected
  for idx in range(len(out['boxes'])):

    # Extract the type of object that was detected	
    obj_class = out["detection_classes"][idx]
		
    # If the object that was detected is a traffic light
    if obj_class == object_detection.LABEL_TRAFFIC_LIGHT:
		
      # Extract the coordinates of the bounding box
      box = out["boxes"][idx]
			
      # Extract (i.e. crop) the traffic light from the image     
      traffic_light = img_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
			
      # Convert the traffic light from RGB format into BGR format
      traffic_light = cv2.cvtColor(traffic_light, cv2.COLOR_RGB2BGR)

      # Store the cropped image in a folder named 'traffic_light_cropped'     
      cv2.imwrite("traffic_light_cropped/" + str(traffic_light_count) + ".jpg", traffic_light)
			
      # Increment the number of traffic lights by 1
      traffic_light_count = traffic_light_count + 1

# Display the total number of traffic lights identified
print("Number of Traffic lights identified:", traffic_light_count)

Here is the code for object_detection.py:

# Project: How to Detect and Classify Traffic Lights
# Author: Addison Sears-Collins
# Date created: January 11, 2021
# Description: This program helps detect objects (e.g. traffic lights) in
#   images.

import tensorflow as tf # Machine learning library
from tensorflow import keras # Library for neural networks
import numpy as np # Scientific computing library
import cv2 # Computer vision library
import glob # Filename handling library

# Inception V3 model for Keras
from tensorflow.keras.applications.inception_v3 import preprocess_input

# To detect objects, we will use a pretrained neural network that has been 
# trained on the COCO data set. You can read more about this data set here: 
#   https://content.alegion.com/datasets/coco-ms-coco-dataset
# COCO labels are here: https://github.com/tensorflow/models/blob/master/research/object_detection/data/mscoco_label_map.pbtxt
LABEL_PERSON = 1
LABEL_CAR = 3
LABEL_BUS = 6
LABEL_TRUCK = 8
LABEL_TRAFFIC_LIGHT = 10
LABEL_STOP_SIGN = 13

def accept_box(boxes, box_index, tolerance):
  """
  Eliminate duplicate bounding boxes.
  """
  box = boxes[box_index]

  for idx in range(box_index):
    other_box = boxes[idx]
    if abs(center(other_box, "x") - center(box, "x")) < tolerance and abs(center(other_box, "y") - center(box, "y")) < tolerance:
      return False

  return True

def get_files(pattern):
  """
  Create a list of all the images in a directory
	
  :param:pattern str The pattern of the filenames
  :return: A list of the files that match the specified pattern	
  """
  files = []

  # For each file that matches the specified pattern
  for file_name in glob.iglob(pattern, recursive=True):

    # Add the image file to the list of files
    files.append(file_name)

  # Return the complete file list
  return files
	
def load_model(model_name):
  """
  Download a pretrained object detection model, and save it to your hard drive.
  :param:str Name of the pretrained object detection model
  """
  url = 'http://download.tensorflow.org/models/object_detection/tf2/20200711/' + model_name + '.tar.gz'
	
  # Download a file from a URL that is not already in the cache
  model_dir = tf.keras.utils.get_file(fname=model_name, untar=True, origin=url)

  print("Model path: ", str(model_dir))
  
  model_dir = str(model_dir) + "/saved_model"
  model = tf.saved_model.load(str(model_dir))

  return model

def load_rgb_images(pattern, shape=None):
  """
  Loads the images in RGB format.
	
  :param:pattern str The pattern of the filenames
  :param:shape Image dimensions (width, height)
  """
  # Get a list of all the image files in a directory
  files = get_files(pattern)

  # For each image in the directory, convert it from BGR format to RGB format
  images = [cv2.cvtColor(cv2.imread(file), cv2.COLOR_BGR2RGB) for file in files]

  # Resize the image if the desired shape is provided
  if shape:
    return [cv2.resize(img, shape) for img in images]
  else:
    return images

def load_ssd_coco():
  """
  Load the neural network that has the SSD architecture, trained on the COCO
  data set.
  """
  return load_model("ssd_resnet50_v1_fpn_640x640_coco17_tpu-8")

def save_image_annotated(img_rgb, file_name, output, model_traffic_lights=None):
  """
  Annotate the image with the object types, and generate cropped images of
  traffic lights.
  """
  # Create annotated image file 
  output_file = file_name.replace('.jpg', '_test.jpg')
	
  # For each bounding box that was detected  
  for idx in range(len(output['boxes'])):

    # Extract the type of the object that was detected
    obj_class = output["detection_classes"][idx]
    
    # How confident the object detection model is on the object's type
    score = int(output["detection_scores"][idx] * 100)
		
    # Extract the bounding box
    box = output["boxes"][idx]

    color = None
    label_text = ""

    if obj_class == LABEL_PERSON:
      color = (0, 255, 255)
      label_text = "Person " + str(score)
    if obj_class == LABEL_CAR:
      color = (255, 255, 0)
      label_text = "Car " + str(score)
    if obj_class == LABEL_BUS:
      color = (255, 255, 0)
      label_text = "Bus " + str(score)
    if obj_class == LABEL_TRUCK:
      color = (255, 255, 0)
      label_text = "Truck " + str(score)
    if obj_class == LABEL_STOP_SIGN:
      color = (128, 0, 0)
      label_text = "Stop Sign " + str(score)
    if obj_class == LABEL_TRAFFIC_LIGHT:
      color = (255, 255, 255)
      label_text = "Traffic Light " + str(score)
			
      if model_traffic_lights:
      
			  # Annotate the image and save it
        img_traffic_light = img_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
        img_inception = cv2.resize(img_traffic_light, (299, 299))
        
				# Uncomment this if you want to save a cropped image of the traffic light
        #cv2.imwrite(output_file.replace('.jpg', '_crop.jpg'), cv2.cvtColor(img_inception, cv2.COLOR_RGB2BGR))
        img_inception = np.array([preprocess_input(img_inception)])

        prediction = model_traffic_lights.predict(img_inception)
        label = np.argmax(prediction)
        score_light = str(int(np.max(prediction) * 100))
        if label == 0:
          label_text = "Green " + score_light
        elif label == 1:
          label_text = "Yellow " + score_light
        elif label == 2:
          label_text = "Red " + score_light
        else:
          label_text = 'NO-LIGHT'  # This is not a traffic light

    if color and label_text and accept_box(output["boxes"], idx, 5.0) and score > 50:
      cv2.rectangle(img_rgb, (box["x"], box["y"]), (box["x2"], box["y2"]), color, 2)
      cv2.putText(img_rgb, label_text, (box["x"], box["y"]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

  cv2.imwrite(output_file, cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR))
  print(output_file)

def center(box, coord_type):
  """
  Get center of the bounding box.
  """
  return (box[coord_type] + box[coord_type + "2"]) / 2

def perform_object_detection(model, file_name, save_annotated=False, model_traffic_lights=None):
  """
  Perform object detection on an image using the predefined neural network.
  """
  # Store the image
  img_bgr = cv2.imread(file_name)
  img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
  input_tensor = tf.convert_to_tensor(img_rgb) # Input needs to be a tensor
  input_tensor = input_tensor[tf.newaxis, ...]

  # Run the model
  output = model(input_tensor)

  print("num_detections:", output['num_detections'], int(output['num_detections']))

  # Convert the tensors to a NumPy array
  num_detections = int(output.pop('num_detections'))
  output = {key: value[0, :num_detections].numpy()
            for key, value in output.items()}
  output['num_detections'] = num_detections

  print('Detection classes:', output['detection_classes'])
  print('Detection Boxes:', output['detection_boxes'])

  # The detected classes need to be integers.
  output['detection_classes'] = output['detection_classes'].astype(np.int64)
  output['boxes'] = [
    {"y": int(box[0] * img_rgb.shape[0]), "x": int(box[1] * img_rgb.shape[1]), "y2": int(box[2] * img_rgb.shape[0]),
     "x2": int(box[3] * img_rgb.shape[1])} for box in output['detection_boxes']]

  if save_annotated:
    save_image_annotated(img_rgb, file_name, output, model_traffic_lights)

  return img_rgb, output, file_name
	
def perform_object_detection_video(model, video_frame, model_traffic_lights=None):
  """
  Perform object detection on a video using the predefined neural network.
	
  Returns the annotated video frame.
  """
  # Store the image
  img_rgb = cv2.cvtColor(video_frame, cv2.COLOR_BGR2RGB)
  input_tensor = tf.convert_to_tensor(img_rgb) # Input needs to be a tensor
  input_tensor = input_tensor[tf.newaxis, ...]

  # Run the model
  output = model(input_tensor)

  # Convert the tensors to a NumPy array
  num_detections = int(output.pop('num_detections'))
  output = {key: value[0, :num_detections].numpy()
            for key, value in output.items()}
  output['num_detections'] = num_detections

  # The detected classes need to be integers.
  output['detection_classes'] = output['detection_classes'].astype(np.int64)
  output['boxes'] = [
    {"y": int(box[0] * img_rgb.shape[0]), "x": int(box[1] * img_rgb.shape[1]), "y2": int(box[2] * img_rgb.shape[0]),
     "x2": int(box[3] * img_rgb.shape[1])} for box in output['detection_boxes']]

  # For each bounding box that was detected  
  for idx in range(len(output['boxes'])):

    # Extract the type of the object that was detected
    obj_class = output["detection_classes"][idx]
    
    # How confident the object detection model is on the object's type
    score = int(output["detection_scores"][idx] * 100)
		
    # Extract the bounding box
    box = output["boxes"][idx]

    color = None
    label_text = ""

    # if obj_class == LABEL_PERSON:
      # color = (0, 255, 255)
      # label_text = "Person " + str(score)
    # if obj_class == LABEL_CAR:
      # color = (255, 255, 0)
      # label_text = "Car " + str(score)
    # if obj_class == LABEL_BUS:
      # color = (255, 255, 0)
      # label_text = "Bus " + str(score)
    # if obj_class == LABEL_TRUCK:
      # color = (255, 255, 0)
      # label_text = "Truck " + str(score)
    if obj_class == LABEL_STOP_SIGN:
      color = (128, 0, 0)
      label_text = "Stop Sign " + str(score)
    if obj_class == LABEL_TRAFFIC_LIGHT:
      color = (255, 255, 255)
      label_text = "Traffic Light " + str(score)
			
      if model_traffic_lights:
      
			  # Annotate the image and save it
        img_traffic_light = img_rgb[box["y"]:box["y2"], box["x"]:box["x2"]]
        img_inception = cv2.resize(img_traffic_light, (299, 299))
        
        img_inception = np.array([preprocess_input(img_inception)])

        prediction = model_traffic_lights.predict(img_inception)
        label = np.argmax(prediction)
        score_light = str(int(np.max(prediction) * 100))
        if label == 0:
          label_text = "Green " + score_light
        elif label == 1:
          label_text = "Yellow " + score_light
        elif label == 2:
          label_text = "Red " + score_light
        else:
          label_text = 'NO-LIGHT'  # This is not a traffic light

    # Use the score variable to indicate how confident we are it is a traffic light (in % terms)
    # On the actual video frame, we display the confidence that the light is either red, green,
    # yellow, or not a valid traffic light.
    if color and label_text and accept_box(output["boxes"], idx, 5.0) and score > 20:
      cv2.rectangle(img_rgb, (box["x"], box["y"]), (box["x2"], box["y2"]), color, 2)
      cv2.putText(img_rgb, label_text, (box["x"], box["y"]), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

  output_frame = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
  return output_frame

def double_shuffle(images, labels):
  """
  Shuffle the images to add some randomness.
  """
  indexes = np.random.permutation(len(images))

  return [images[idx] for idx in indexes], [labels[idx] for idx in indexes]

def reverse_preprocess_inception(img_preprocessed):
  """
  Reverse the preprocessing process.
  """
  img = img_preprocessed + 1.0
  img = img * 127.5
  return img.astype(np.uint8)

In order to detect the traffic lights in the image, we will use a pretrained object detection model available from TensorFlow. This model has been trained using the COCO data set

You can see a list of object detection models on this page at GitHub

When the model downloads, it will go to the C:\…\…\.keras\datasets folder. The first time you run extract_traffic_lights.py, the model will download to that directory. 

Now, let’s run the program.

python extract_traffic_lights.py

Here is what you should see when you run it.

1-downloading-model-first-time-extractionJPG

When you run the program above, it will generate a bunch of cropped images of traffic lights and save them inside the current directory in the traffic_light_cropped folder.

I had 1,170 cropped images of traffic lights (or pieces of traffic lights).

2-running-extract-pyJPG

Here is what some of the cropped images look like:

121
1088
1-5

Separate the Traffic Light Images by Color

We now need to separate the cropped images of traffic lights by color. 

Create a new folder inside the current directory named traffic_light_dataset.

Inside the traffic_light_dataset folder, create three separate folders:

  • 0_green
  • 1_yellow
  • 2_red
  • 3_not

Go back to the traffic_light_cropped folder. 

  • Grab some good images of green traffic lights, and place them in the 0_green folder.
  • Grab some good images of yellow traffic lights, and place them in the 1_yellow folder.
  • Grab some good images of red traffic lights, and place them in the 2_red folder.
  • Grab some good images of the back and side of a traffic light (or pieces of buildings or something else that isn’t a traffic light), and place them in the 3_not folder. These images are your negative samples. Sometimes your neural network will classify an object as a traffic light, but it will actually be the back or side of a traffic light, so it isn’t useful for a self-driving car.

Here is an example of a 3_not image:

49

Note that the images are different sizes. This is exactly what we want. 

We now have a custom traffic light data set that will enable us to train a neural network to detect the color of a traffic light.

The more images you have, the better. The performance of neural networks tends to improve with more training examples.

Use Transfer Learning to Create a Traffic Light Color Detection System

We are now going to use a technique called transfer learning to create a traffic light color detection system. 

Transfer learning involves storing the knowledge gained while solving one problem and reapplying that knowledge to solve another, similar problem. 

We will use the Inception V3 neural network architecture to perform this task. 

Within the same directory that contains the object_detection.py program, open up a new Python program called train_traffic_light_color.py. This program will train a neural network to detect traffic light color. The best neural network will be saved as traffic.h5.

20% of the images in the traffic_light_dataset folder will be the validation data set, and 80% of the images will be the training data set. The validation data set is used to tune the architecture of the neural network. It helps us to find the best neural network architecture to solve the problem (i.e. to determine the traffic light color).

The validation data set is also used to determine when the neural network needs to stop training. Once the error (i.e. loss) on the validation data set reaches a minimum, training stops. This process is known as early stopping.

Here is the code for train_traffic_light_color.py:

# Project: How to Detect and Classify Traffic Lights
# Author: Addison Sears-Collins
# Date created: January 17, 2021
# Description: This program trains a neural network to detect the color
#   of a traffic light. Performance on the validation data set is saved
#   to a directory. Also, the best neural network model is saved as
#   traffic.h5.

import collections # Handles specialized container datatypes
import cv2 # Computer vision library
import matplotlib.pyplot as plt # Plotting library
import numpy as np # Scientific computing library
import object_detection # Custom object detection program
import sys
import tensorflow as tf # Machine learning library
from tensorflow import keras # Library for neural networks
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import Dense, Flatten, Dropout, GlobalAveragePooling2D, GlobalMaxPooling2D, BatchNormalization
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.optimizers import Adam, Adadelta
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import to_categorical
sys.path.append('../')

# Show the version of TensorFlow and Keras that I am using
print("TensorFlow", tf.__version__)
print("Keras", keras.__version__)

def show_history(history):
  """
  Visualize the neural network model training history
  
  :param:history A record of training loss values and metrics values at 
                 successive epochs, as well as validation loss values 
                 and validation metrics values
  """
  plt.plot(history.history['accuracy'])
  plt.plot(history.history['val_accuracy'])
  plt.title('model accuracy')
  plt.ylabel('accuracy')
  plt.xlabel('epoch')
  plt.legend(['train_accuracy', 'validation_accuracy'], loc='best')
  plt.show()

def Transfer(n_classes, freeze_layers=True):
  """
  Use the InceptionV3 neural network architecture to perform transfer learning.
	
  :param:n_classes Number of classes
  :param:freeze_layers If True, the network's parameters don't change.
  :return The best neural network
  """
  print("Loading Inception V3...")

  # To understand what the parameters mean, do a Google search 'inceptionv3 keras'. 
  # The first search result should send you to the Keras website, which has an 
  # explanation of what each of these parameters mean.
  # input_top means we are removing the top part of the Inception model, which is the 
  # classifier. 
  # input_shape needs to have 3 channels, and needs to be at least 75x75 for the
  # resolution.
  # Our neural network will build off of the Inception V3 model (trained on the ImageNet
  # data set).
  base_model = InceptionV3(weights='imagenet', include_top=False, input_shape=(299, 299, 3))

  print("Inception V3 has finished loading.")

  # Display the base network architecture
  print('Layers: ', len(base_model.layers))
  print("Shape:", base_model.output_shape[1:])
  print("Shape:", base_model.output_shape)
  print("Shape:", base_model.outputs)
  base_model.summary()

  # Create the neural network. This network uses the Sequential
  # architecture where each layer has one 
  # input tensor (e.g. vector, matrix, etc.) and one output tensor 
  top_model = Sequential()

  # Our classifier model will build on top of the base model
  top_model.add(base_model)
  top_model.add(GlobalAveragePooling2D())
  top_model.add(Dropout(0.5))
  top_model.add(Dense(1024, activation='relu'))
  top_model.add(BatchNormalization())
  top_model.add(Dropout(0.5))
  top_model.add(Dense(512, activation='relu'))
  top_model.add(Dropout(0.5))
  top_model.add(Dense(128, activation='relu'))
  top_model.add(Dense(n_classes, activation='softmax'))

  # Freeze layers in the model so that they cannot be trained (i.e. the
  # parameters in the neural network will not change)
  if freeze_layers:
    for layer in base_model.layers:
      layer.trainable = False

  return top_model

# Perform image augmentation. 
# Image augmentation enables us to alter the available images
# (e.g. rotate, flip, changing the hue, etc.) to generate more images that our
# neural network can use for training...therefore preventing us from having to
# collect more external images.
datagen = ImageDataGenerator(rotation_range=5, width_shift_range=[-10, -5, -2, 0, 2, 5, 10],
                             zoom_range=[0.7, 1.5], height_shift_range=[-10, -5, -2, 0, 2, 5, 10],
                             horizontal_flip=True)

shape = (299, 299)

# Load the cropped traffic light images from the appropriate directory
img_0_green = object_detection.load_rgb_images("traffic_light_dataset/0_green/*", shape)
img_1_yellow = object_detection.load_rgb_images("traffic_light_dataset/1_yellow/*", shape)
img_2_red = object_detection.load_rgb_images("traffic_light_dataset/2_red/*", shape)
img_3_not_traffic_light = object_detection.load_rgb_images("traffic_light_dataset/3_not/*", shape)

# Create a list of the labels that is the same length as the number of images in each
# category
# 0 = green
# 1 = yellow
# 2 = red
# 3 = not a traffic light
labels = [0] * len(img_0_green)
labels.extend([1] * len(img_1_yellow))
labels.extend([2] * len(img_2_red))
labels.extend([3] * len(img_3_not_traffic_light))

# Create NumPy array
labels_np = np.ndarray(shape=(len(labels), 4))
images_np = np.ndarray(shape=(len(labels), shape[0], shape[1], 3))

# Create a list of all the images in the traffic lights data set
img_all = []
img_all.extend(img_0_green)
img_all.extend(img_1_yellow)
img_all.extend(img_2_red)
img_all.extend(img_3_not_traffic_light)

# Make sure we have the same number of images as we have labels
assert len(img_all) == len(labels)  

# Shuffle the images
img_all = [preprocess_input(img) for img in img_all]
(img_all, labels) = object_detection.double_shuffle(img_all, labels)

# Store images and labels in a NumPy array
for idx in range(len(labels)):
  images_np[idx] = img_all[idx]
  labels_np[idx] = labels[idx]
	
print("Images: ", len(img_all))
print("Labels: ", len(labels))

# Perform one-hot encoding
for idx in range(len(labels_np)):
  # We have four integer labels, representing the different colors of the 
  # traffic lights.
  labels_np[idx] = np.array(to_categorical(labels[idx], 4))
	
# Split the data set into a training set and a validation set
# The training set is the portion of the data set that is used to 
#   determine the parameters (e.g. weights) of the neural network.
# The validation set is the portion of the data set used to
#   fine tune the model-specific parameters (i.e. hyperparameters) that are 
#   fixed before you train and test your neural network on the data. The 
#   validation set helps us select the final model (e.g. learning rate, 
#   number of hidden layers, number of hidden units, activation functions, 
#   number of epochs, etc.
# In this case, 80% of the data set becomes training data, and 20% of the
# data set becomes validation data.
idx_split = int(len(labels_np) * 0.8)
x_train = images_np[0:idx_split]
x_valid = images_np[idx_split:]
y_train = labels_np[0:idx_split]
y_valid = labels_np[idx_split:]

# Store a count of the number of traffic lights of each color
cnt = collections.Counter(labels)
print('Labels:', cnt)
n = len(labels)
print('0:', cnt[0])
print('1:', cnt[1])
print('2:', cnt[2])
print('3:', cnt[3])

# Calculate the weighting of each traffic light class
class_weight = {0: n / cnt[0], 1: n / cnt[1], 2: n / cnt[2], 3: n / cnt[3]}
print('Class weight:', class_weight)

# Save the best model as traffic.h5
checkpoint = ModelCheckpoint("traffic.h5", monitor='val_loss', mode='min', verbose=1, save_best_only=True)
early_stopping = EarlyStopping(min_delta=0.0005, patience=15, verbose=1)

# Generate model using transfer learning
model = Transfer(n_classes=4, freeze_layers=True)

# Display a summary of the neural network model
model.summary()

# Generate a batch of randomly transformed images 
it_train = datagen.flow(x_train, y_train, batch_size=32)

# Configure the model parameters for training
model.compile(loss=categorical_crossentropy, optimizer=Adadelta(
  lr=1.0, rho=0.95, epsilon=1e-08, decay=0.0), metrics=['accuracy'])

# Train the model on the image batches for a fixed number of epochs
# Store a record of the error on the training data set and metrics values
#   in the history object.
history_object = model.fit(it_train, epochs=250, validation_data=(
  x_valid, y_valid), shuffle=True, callbacks=[
  checkpoint, early_stopping], class_weight=class_weight)

# Display the training history
show_history(history_object)

# Get the loss value and metrics values on the validation data set
score = model.evaluate(x_valid, y_valid, verbose=0)
print('Validation loss:', score[0])
print('Validation accuracy:', score[1])

print('Saving the validation data set...')

print('Length of the validation data set:', len(x_valid))

# Go through the validation data set, and see how the model did on each image
for idx in range(len(x_valid)):

  # Make the image a NumPy array
  img_as_ar = np.array([x_valid[idx]])

  # Generate predictions	
  prediction = model.predict(img_as_ar)

  # Determine what the label is based on the highest probability
  label = np.argmax(prediction)

  # Create the name of the directory and the file for the validation data set
  # After each run, delete this out_valid/ directory so that old files are not
  # hanging around in there.
  file_name = str(idx) + "_" + str(label) + "_" + str(np.argmax(str(y_valid[idx]))) + ".jpg"
  img = img_as_ar[0]

  # Reverse the image preprocessing process
  img = object_detection.reverse_preprocess_inception(img)

  # Save the image file
  cv2.imwrite(file_name, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))

print('The validation data set has been saved!')

To run the program, type:

python train_traffic_light_color.py
3-run-train-traffic-light-colorJPG

Here is a chart of the accuracy statistics:

4-train-traffic-light-color-accuracy-graph-5

Within the current directory, you will now see some new images. These images show how your neural network performed on the validation data set. The second number in the file name indicates the light’s color. Remember:

  • 0 = green
  • 1 = yellow
  • 2 = red
  • 3 = not

My neural network had >92% accuracy in detecting the colors of traffic lights. Pretty good stuff! 

6-network-accuracyJPG

In a real self-driving car scenario, we would want to make sure that a traffic light is detected for several frames before labeling it as one color or another.

Note how the training accuracy is worse than the validation accuracy. The reason for this is that we are using image augmentation to artificially expand the data set.

Image augmentation is used during training to help improve the model, but it is also making it harder for the network to get the right answers (i.e. correct traffic light colors) during the training phase of the model.

In contrast, the validation data set is what it is. There is no augmentation, so we are getting higher accuracy values.

To clean up the current working directory, I will cut and paste all these validation data set images into a new folder named output_validation_set.

Test Your Traffic Light Color Detection System on Images

Now that we have our trained neural network saved as traffic.h5, let’s test it out on some fresh images.

Create a new folder inside the main working directory called test_images. Inside that folder, place some images that contain traffic lights. We will see how our network performs on images it hasn’t seen before.

Open a new program called detect_traffic_light_color_img.py. This program will use traffic.h5 and the Single Shot MultiBox Detector to detect traffic light images and annotate their color.

Type the following code:

# Project: How to Detect and Classify Traffic Lights
# Author: Addison Sears-Collins
# Date created: January 17, 2021
# Description: This program uses a trained neural network to 
# detect the color of a traffic light in images.

import cv2 # Computer vision library
import numpy as np # Scientific computing library
import object_detection # Custom object detection program
from tensorflow import keras # Library for neural networks
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.applications import imagenet_utils
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import load_img

FILENAME = "test_red.jpg"

# Load the Inception V3 model
model_inception = InceptionV3(weights='imagenet', include_top=True, input_shape=(299,299,3))

# Resize the image
img = cv2.resize(preprocess_input(cv2.imread(FILENAME)), (299, 299))

# Generate predictions
out_inception = model_inception.predict(np.array([img]))

# Decode the predictions
out_inception = imagenet_utils.decode_predictions(out_inception)

print("Prediction for ", FILENAME , ": ", out_inception[0][0][1], out_inception[0][0][2], "%")

# Show model summary data
model_inception.summary()

# Detect traffic light color in a batch of image files
files = object_detection.get_files('test_images/*.jpg')

# Load the SSD neural network that is trained on the COCO data set
model_ssd = object_detection.load_ssd_coco()

# Load the trained neural network
model_traffic_lights_nn = keras.models.load_model("traffic.h5")

# Go through all image files, and detect the traffic light color. 
for file in files:
  (img, out, file_name) = object_detection.perform_object_detection(
    model_ssd, file, save_annotated=True, model_traffic_lights=model_traffic_lights_nn)
  print(file, out)

To run the program, type:

python detect_traffic_light_color_img.py
6_img8_test

Test Your Traffic Light Color Detection System on Video

Now that we have tested our traffic light color detection system on images, let’s test it out on video. I have a few mp4 files which I will save to my current directory. One of my videos is named ‘las_vegas.mp4’, and I will save the output video as ‘las_vegas_annotated.mp4’.

Here is the code. I named it detect_traffic_light_color_vid.py.

# Project: How to Detect and Classify Traffic Lights
# Author: Addison Sears-Collins
# Date created: February 1, 2021
# Description: This program uses a trained neural network to 
# detect the color of a traffic light in video.

import cv2 # Computer vision library
import numpy as np # Scientific computing library
import object_detection # Custom object detection program
from tensorflow import keras # Library for neural networks
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.applications import imagenet_utils
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.preprocessing.image import load_img

# Make sure the video file is in the same directory as your code
filename = 'las_vegas.mp4'
file_size = (1920,1080) # Assumes 1920x1080 mp4
scale_ratio = 1 # Option to scale to fraction of original size. 

# We want to save the output to a video file
output_filename = 'las_vegas_annotated.mp4'
output_frames_per_second = 20.0 

# Load the SSD neural network that is trained on the COCO data set
model_ssd = object_detection.load_ssd_coco()

# Load the trained neural network
model_traffic_lights_nn = keras.models.load_model("traffic.h5")

def main():

  # Load a video
  cap = cv2.VideoCapture(filename)

  # Create a VideoWriter object so we can save the video output
  fourcc = cv2.VideoWriter_fourcc(*'mp4v')
  result = cv2.VideoWriter(output_filename,  
                           fourcc, 
                           output_frames_per_second, 
                           file_size) 
	
  # Process the video
  while cap.isOpened():
		
    # Capture one frame at a time
    success, frame = cap.read() 
		
    # Do we have a video frame? If true, proceed.
    if success:
		
      # Resize the frame
      width = int(frame.shape[1] * scale_ratio)
      height = int(frame.shape[0] * scale_ratio)
      frame = cv2.resize(frame, (width, height))
			
      # Store the original frame
      original_frame = frame.copy()

      output_frame = object_detection.perform_object_detection_video(
        model_ssd, frame, model_traffic_lights=model_traffic_lights_nn)

      # Write the frame to the output video file
      result.write(output_frame)
			
    # No more video frames left
    else:
      break
			
  # Stop when the video is finished
  cap.release()
	
  # Release the video recording
  result.release()
	
  # Close all windows
  cv2.destroyAllWindows() 
	
main()

To run the program, type:

python detect_traffic_light_color_vid.py

You will see in the video below that it isn’t perfect. It doesn’t catch all the traffic lights.

To improve performance, I recommend playing around with the score threshold on this line in the object_detection.py code.

if color and label_text and accept_box(output["boxes"], idx, 5.0) and score > 50:

The score is on a threshold of 0 to 100. You can make it 40, for example. 

Also, if the color on the traffic lights isn’t accurate, consider adding more training examples, and retraining the color detection neural network on these samples. The more data you have, the better.

How to Publish Goal Coordinates in ROS2

Last week, I built a mobile robot in Gazebo that uses ROS2 for message passing. I wanted the robot to go to predefined goal coordinates inside a maze. In order to do that, I needed to create a Publisher node that published the points to a topic.

Here is the ROS2 node code (in Python) that publishes a list of target coordinates (i.e. waypoints) for the mobile robot to go to. Feel free to use this code for your ROS2 projects:

''' ####################
    Publish (x,y) coordinate goals for a differential drive robot in a 
    Gazebo maze.
    ==================================
    Author: Addison Sears-Collins
    Date: November 21, 2020
    #################### '''
 
import rclpy # Import the ROS client library for Python
from rclpy.node import Node # Enables the use of rclpy's Node class
from std_msgs.msg import Float64MultiArray # Enable use of std_msgs/Float64MultiArray message
 
class GoalPublisher(Node):
  """
  Create a GoalPublisher class, which is a subclass of the Node class.
  The class publishes the goal positions (x,y) for a mobile robot in a Gazebo maze world.
  """
  
  def __init__(self):
    """
    Class constructor to set up the node
    """
   
    # Initiate the Node class's constructor and give it a name
    super().__init__('goal_publisher')
     
    # Create publisher(s)      
    self.publisher_goal_x_values = self.create_publisher(Float64MultiArray, '/goal_x_values', 10)
    self.publisher_goal_y_values = self.create_publisher(Float64MultiArray, '/goal_y_values', 10)
     
    # Call the callback functions every 0.5 seconds
    timer_period = 0.5
    self.timer1 = self.create_timer(timer_period, self.get_x_values)
    self.timer2 = self.create_timer(timer_period, self.get_y_values)
    
    # Initialize a counter variable
    self.i = 0  
    self.j = 0
     
    # List the goal destinations
    # We create a list of the (x,y) coordinate goals
    self.goal_x_coordinates = [ 0.0, 3.0, 0.0, -1.5, -1.5,  4.5, 0.0]
    self.goal_y_coordinates = [-4.0, 1.0, 1.5,  1.0, -3.0, -4.0, 0.0] 
   
  def get_x_values(self):
    """
    Callback function.
    """
    msg = Float64MultiArray()
    msg.data = self.goal_x_coordinates
    
    # Publish the x coordinates to the topic
    self.publisher_goal_x_values.publish(msg)
     
    # Increment counter variable
    self.i += 1
     
  def get_y_values(self):
    """
    Callback function.
    """
    msg = Float64MultiArray()
    msg.data = self.goal_y_coordinates
    
    # Publish the y coordinates to the topic
    self.publisher_goal_y_values.publish(msg)
     
    # Increment counter variable
    self.j += 1
     
def main(args=None):
 
  # Initialize the rclpy library
  rclpy.init(args=args)
 
  # Create the node
  goal_publisher = GoalPublisher()
 
  # Spin the node so the callback function is called.
  # Publish any pending messages to the topics.
  rclpy.spin(goal_publisher)
 
  # Destroy the node explicitly
  # (optional - otherwise it will be done automatically
  # when the garbage collector destroys the node object)
  goal_publisher.destroy_node()
 
  # Shutdown the ROS client library for Python
  rclpy.shutdown()
 
if __name__ == '__main__':
  main()

The Bug2 Algorithm for Robot Motion Planning

In this tutorial, we will learn about the Bug2 algorithm for robot motion planning. The Bug2 algorithm is used when you have a mobile robot:

  • With a known starting location
  • With a known goal location
  • Inside an unexplored environment
  • Contains a distance sensor that can detect the distances to objects and walls in the environment (e.g. like an ultrasonic sensor or a laser distance sensor.)
  • Contains an encoder that the robot can use to estimate how far the robot has traveled from the starting location.

Here is a video of a simulated robot I developed in Gazebo and ROS2 that uses the Bug2 algorithm to move from a starting point to a goal point, avoiding walls along the way.

Real-World Applications

Imagine an automatic vacuum cleaner like the one below. It vacuums a room and then needs to move autonomously to another room so that it can clean that room. Along the way, the robot must avoid bumping into walls.

roomba_discovery

Algorithm Description

On a high level, the Bug2 algorithm has two main modes:

  1. Go to Goal Mode: Move from the current location towards the goal (x,y) coordinate.
  2. Wall Following Mode: Move along a wall.

Here is pseudocode for the algorithm:

1.      Calculate a start-goal line. The start-goal line is an imaginary line that connects the starting position to the goal position.

2.      While Not at the Goal

  • Move towards the goal along the start-goal line.
    • If a wall is encountered:
      • Remember the location where the wall was first encountered. This is the “hit point.”
      • Follow the wall until you encounter the start-goal line. This point is known as the “leave point.”
        •  If the leave point is closer to the goal than the hit point, leave the wall, and move towards the goal again.
        • Otherwise, continue following the wall.

That’s the algorithm. In the image below, I have labeled the hit points and leave points.

1-hit-points-leave-points-bug2-algorithmJPG

Python Implementation (ROS2)

The robot you see in the video at the beginning of this tutorial has a laser distance scanner mounted on top of it that enables it to detect distances from 0 degrees (right side of the robot) to 180 degrees (left side of the robot). It is using three Python functions, bug2, follow_wall, and go_to_goal.  

Below is my complete code. It runs in ROS2 Foxy Fitzroy and Gazebo and makes use of the Differential Drive plugin. 

Since we’re just focusing on the algorithms in this post, I won’t go into the details of how I created the robot, built the simulated maze, and created publisher and subscriber nodes for the goal locations, robot pose, and laser distance sensor information (I might cover this in a future post).

Don’t be intimidated by the code. It is long but well-commented. Go through each piece and function one step at a time. Focus only on the following three methods:

  • go_to_goal(self) method 
  • follow_wall(self) method
  • bug2(self) method

Those three methods encapsulate the implementation of Bug2. 

I created descriptive variable names so that you can compare the code to the pseudocode I wrote earlier in this tutorial. 

# Author: Addison Sears-Collins
# Date: December 17, 2020
# ROS Version: ROS 2 Foxy Fitzroy

############## IMPORT LIBRARIES #################
# Python math library
import math 

# ROS client library for Python
import rclpy 

# Enables pauses in the execution of code
from time import sleep 

# Used to create nodes
from rclpy.node import Node

# Enables the use of the string message type
from std_msgs.msg import String 

# Twist is linear and angular velocity
from geometry_msgs.msg import Twist 	
					
# Handles LaserScan messages to sense distance to obstacles (i.e. walls)      	
from sensor_msgs.msg import LaserScan	 

# Handle Pose messages
from geometry_msgs.msg import Pose 

# Handle float64 arrays
from std_msgs.msg import Float64MultiArray
					
# Handles quality of service for LaserScan data
from rclpy.qos import qos_profile_sensor_data 

# Scientific computing library
import numpy as np 

class PlaceholderController(Node):
	"""
	Create a Placeholder Controller class, which is a subclass of the Node 
	class for ROS2.
	"""

	def __init__(self):
		"""
		Class constructor to set up the node
		"""
		####### INITIALIZE ROS PUBLISHERS AND SUBSCRIBERS##############
		# Initiate the Node class's constructor and give it a name
		super().__init__('PlaceholderController')
		
		# Create a subscriber
		# This node subscribes to messages of type Float64MultiArray  
		# over a topic named: /en613/state_est
		# The message represents the current estimated state:
		#   [x, y, yaw]
		# The callback function is called as soon as a message 
		# is received.
		# The maximum number of queued messages is 10.
		self.subscription = self.create_subscription(
			Float64MultiArray,
			'/en613/state_est',
			self.state_estimate_callback,
			10)
		self.subscription  # prevent unused variable warning
		
		# Create a subscriber
		# This node subscribes to messages of type 
		# sensor_msgs/LaserScan		
		self.scan_subscriber = self.create_subscription(
			LaserScan,
			'/en613/scan',
			self.scan_callback,
			qos_profile=qos_profile_sensor_data)
			
		# Create a subscriber
		# This node subscribes to messages of type geometry_msgs/Pose 
		# over a topic named: /en613/goal
		# The message represents the the goal position.
		# The callback function is called as soon as a message 
		# is received.
		# The maximum number of queued messages is 10.
		self.subscription_goal_pose = self.create_subscription(
			Pose,
			'/en613/goal',
			self.pose_received,
			10)
			
		# Create a publisher
		# This node publishes the desired linear and angular velocity 
		# of the robot (in the robot chassis coordinate frame) to the 
		# /en613/cmd_vel topic. Using the diff_drive
		# plugin enables the basic_robot model to read this 
		# /end613/cmd_vel topic and execute the motion accordingly.
		self.publisher_ = self.create_publisher(
			Twist, 
			'/en613/cmd_vel', 
			10)
		
		# Initialize the LaserScan sensor readings to some large value
		# Values are in meters.
		self.left_dist = 999999.9 # Left
		self.leftfront_dist = 999999.9 # Left-front
		self.front_dist = 999999.9 # Front
		self.rightfront_dist = 999999.9 # Right-front
		self.right_dist = 999999.9 # Right

		################### ROBOT CONTROL PARAMETERS ##################
		
		# Maximum forward speed of the robot in meters per second
		# Any faster than this and the robot risks falling over.
		self.forward_speed = 0.035	
		
		# Current position and orientation of the robot in the global 
		# reference frame
		self.current_x = 0.0
		self.current_y = 0.0
		self.current_yaw = 0.0
		
		# By changing the value of self.robot_mode, you can alter what
		# the robot will do when the program is launched.
		# 	"obstacle avoidance mode": Robot will avoid obstacles
		#  	"go to goal mode": Robot will head to an x,y coordinate   
		# 	"wall following mode": Robot will follow a wall 
		self.robot_mode = "go to goal mode"
		
		############# OBSTACLE AVOIDANCE MODE PARAMETERS ##############
		
		# Obstacle detection distance threshold
		self.dist_thresh_obs = 0.25 # in meters
		
		# Maximum left-turning speed	
		self.turning_speed = 0.25 # rad/s

		############# GO TO GOAL MODE PARAMETERS ######################
		# Finite states for the go to goal mode
		# 	"adjust heading": Orient towards a goal x, y coordinate
		# 	"go straight": Go straight towards goal x, y coordinate
		# 	"goal achieved": Reached goal x, y coordinate
		self.go_to_goal_state = "adjust heading"
		
		# List the goal destinations
		# We create a list of the (x,y) coordinate goals
		self.goal_x_coordinates = False # [ 0.0, 3.0, 0.0, -1.5, -1.5,  4.5, 0.0]
		self.goal_y_coordinates = False # [-4.0, 1.0, 1.5,  1.0, -3.0, -4.0, 0.0]
		
		# Keep track of which goal we're headed towards
		self.goal_idx = 0 
		
		# Keep track of when we've reached the end of the goal list
		self.goal_max_idx =  None # len(self.goal_x_coordinates) - 1 
		
		# +/- 2.0 degrees of precision
		self.yaw_precision = 2.0 * (math.pi / 180) 
		
		# How quickly we need to turn when we need to make a heading
		# adjustment (rad/s)
		self.turning_speed_yaw_adjustment = 0.0625
		
		# Need to get within +/- 0.2 meter (20 cm) of (x,y) goal
		self.dist_precision = 0.2 

		############# WALL FOLLOWING MODE PARAMETERS ##################		
		# Finite states for the wall following mode
		# 	"turn left": Robot turns towards the left
		#	"search for wall": Robot tries to locate the wall		
		# 	"follow wall": Robot moves parallel to the wall
		self.wall_following_state = "turn left"
		
		# Set turning speeds (to the left) in rad/s 
		# These values were determined by trial and error.
		self.turning_speed_wf_fast = 1.0  # Fast turn
		self.turning_speed_wf_slow = 0.125 # Slow turn
		
		# Wall following distance threshold.
		# We want to try to keep within this distance from the wall.
		self.dist_thresh_wf = 0.45 # in meters	
		
		# We don't want to get too close to the wall though.
		self.dist_too_close_to_wall = 0.15 # in meters
		
		################### BUG2 PARAMETERS ###########################
		
		# Bug2 Algorithm Switch
		# Can turn "ON" or "OFF" depending on if you want to run Bug2
		# Motion Planning Algorithm
		self.bug2_switch = "ON"
		
		# Start-Goal Line Calculated?
		self.start_goal_line_calculated = False
		
		# Start-Goal Line Parameters
		self.start_goal_line_slope_m = 0
		self.start_goal_line_y_intercept = 0
		self.start_goal_line_xstart = 0
		self.start_goal_line_xgoal = 0
		self.start_goal_line_ystart = 0
		self.start_goal_line_ygoal = 0
		
		# Anything less than this distance means we have encountered
		# a wall. Value determined through trial and error.
		self.dist_thresh_bug2 = 0.15 
		
		# Leave point must be within +/- 0.1m of the start-goal line
		# in order to go from wall following mode to go to goal mode
		self.distance_to_start_goal_line_precision = 0.1
		
		# Used to record the (x,y) coordinate where the robot hit
		# a wall.
		self.hit_point_x = 0
		self.hit_point_y = 0
		
		# Distance between the hit point and the goal in meters
		self.distance_to_goal_from_hit_point = 0.0
		
		# Used to record the (x,y) coordinate where the robot left
		# a wall.		
		self.leave_point_x = 0
		self.leave_point_y = 0
		
		# Distance between the leave point and the goal in meters
		self.distance_to_goal_from_leave_point = 0.0
		
		# The hit point and leave point must be far enough 
		# apart to change state from wall following to go to goal
		# This value helps prevent the robot from getting stuck and
		# rotating in endless circles.
		# This distance was determined through trial and error.
		self.leave_point_to_hit_point_diff = 0.25 # in meters
		
	def pose_received(self,msg):
		"""
		Populate the pose.
		"""
		self.goal_x_coordinates = [msg.position.x]
		self.goal_y_coordinates = [msg.position.y]
		self.goal_max_idx = len(self.goal_x_coordinates) - 1		
	
	def scan_callback(self, msg):
		"""
		This method gets called every time a LaserScan message is 
		received on the /en613/scan ROS topic	
		"""
		# Read the laser scan data that indicates distances
		# to obstacles (e.g. wall) in meters and extract
		# 5 distinct laser readings to work with.
		# Each reading is separated by 45 degrees.
		# Assumes 181 laser readings, separated by 1 degree. 
		# (e.g. -90 degrees to 90 degrees....0 to 180 degrees)
		self.left_dist = msg.ranges[180]
		self.leftfront_dist = msg.ranges[135]
		self.front_dist = msg.ranges[90]
		self.rightfront_dist = msg.ranges[45]
		self.right_dist = msg.ranges[0]
		
		# The total number of laser rays. Used for testing.
		#number_of_laser_rays = str(len(msg.ranges))		
			
		# Print the distance values (in meters) for testing
		#self.get_logger().info('L:%f LF:%f F:%f RF:%f R:%f' % (
		#	self.left_dist,
		#	self.leftfront_dist,
		#	self.front_dist,
		#	self.rightfront_dist,
		#	self.right_dist))
		
		if self.robot_mode == "obstacle avoidance mode":
			self.avoid_obstacles()
			
	def state_estimate_callback(self, msg):
		"""
		Extract the position and orientation data. 
		This callback is called each time
		a new message is received on the '/en613/state_est' topic
		"""
		# Update the current estimated state in the global reference frame
		curr_state = msg.data
		self.current_x = curr_state[0]
		self.current_y = curr_state[1]
		self.current_yaw = curr_state[2]
		
		# Wait until we have received some goal destinations.
		if self.goal_x_coordinates == False and self.goal_y_coordinates == False:
			return
				
		# Print the pose of the robot
		# Used for testing
		#self.get_logger().info('X:%f Y:%f YAW:%f' % (
		#	self.current_x,
		#	self.current_y,
		#	np.rad2deg(self.current_yaw)))  # Goes from -pi to pi 
		
		# See if the Bug2 algorithm is activated. If yes, call bug2()
		if self.bug2_switch == "ON":
			self.bug2()
		else:
			
			if self.robot_mode == "go to goal mode":
				self.go_to_goal()
			elif self.robot_mode == "wall following mode":
				self.follow_wall()
			else:
				pass # Do nothing			
		
	def avoid_obstacles(self):
		"""
		Wander around the maze and avoid obstacles.
		"""
		# Create a Twist message and initialize all the values 
		# for the linear and angular velocities
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0	
		
		# Logic for avoiding obstacles (e.g. walls)
		# >d means no obstacle detected by that laser beam
		# <d means an obstacle was detected by that laser beam
		d = self.dist_thresh_obs
		if   self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist > d:
			msg.linear.x = self.forward_speed # Go straight forward
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist > d:
			msg.angular.z = self.turning_speed  # Turn left
		elif self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed  
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist > d:
			msg.angular.z = -self.turning_speed # Turn right
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist > d:
			msg.angular.z = -self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist < d:
			msg.angular.z = self.turning_speed 
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist < d:
			msg.linear.x = self.forward_speed
		else:
			pass 
			
		# Send the velocity commands to the robot by publishing 
		# to the topic
		self.publisher_.publish(msg)
							
	def go_to_goal(self):
		"""
		This code drives the robot towards to the goal destination
		"""
		# Create a geometry_msgs/Twist message
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0
		
		# If Bug2 algorithm is activated
		if self.bug2_switch == "ON":
		
			# If the wall is in the way
			d = self.dist_thresh_bug2
			if (	self.leftfront_dist < d or 
				self.front_dist < d or 
				self.rightfront_dist < d):
			
				# Change the mode to wall following mode.
				self.robot_mode = "wall following mode"
				
				# Record the hit point	
				self.hit_point_x = self.current_x
				self.hit_point_y = self.current_y
				
				# Record the distance to the goal from the 
				# hit point
				self.distance_to_goal_from_hit_point = (
					math.sqrt((
					pow(self.goal_x_coordinates[self.goal_idx] - self.hit_point_x, 2)) + (
					pow(self.goal_y_coordinates[self.goal_idx] - self.hit_point_y, 2)))) 	
					
				# Make a hard left to begin following wall
				msg.angular.z = self.turning_speed_wf_fast
						
				# Send command to the robot
				self.publisher_.publish(msg)
				
				# Exit this function 		
				return
			
		# Fix the heading		
		if (self.go_to_goal_state == "adjust heading"):
			
			# Calculate the desired heading based on the current position 
			# and the desired position
			desired_yaw = math.atan2(
					self.goal_y_coordinates[self.goal_idx] - self.current_y,
					self.goal_x_coordinates[self.goal_idx] - self.current_x)
			
			# How far off is the current heading in radians?		
			yaw_error = desired_yaw - self.current_yaw
			
			# Adjust heading if heading is not good enough
			if math.fabs(yaw_error) > self.yaw_precision:
			
				if yaw_error > 0:	
					# Turn left (counterclockwise)		
					msg.angular.z = self.turning_speed_yaw_adjustment 				
				else:
					# Turn right (clockwise)
					msg.angular.z = -self.turning_speed_yaw_adjustment
				
				# Command the robot to adjust the heading
				self.publisher_.publish(msg)
				
			# Change the state if the heading is good enough
			else:				
				# Change the state
				self.go_to_goal_state = "go straight"
				
				# Command the robot to stop turning
				self.publisher_.publish(msg)		

		# Go straight										
		elif (self.go_to_goal_state == "go straight"):
			
			position_error = math.sqrt(
						pow(
						self.goal_x_coordinates[self.goal_idx] - self.current_x, 2)
						+ pow(
						self.goal_y_coordinates[self.goal_idx] - self.current_y, 2)) 
						
			
			# If we are still too far away from the goal						
			if position_error > self.dist_precision:

				# Move straight ahead
				msg.linear.x = self.forward_speed
					
				# Command the robot to move
				self.publisher_.publish(msg)
			
				# Check our heading			
				desired_yaw = math.atan2(
					self.goal_y_coordinates[self.goal_idx] - self.current_y,
					self.goal_x_coordinates[self.goal_idx] - self.current_x)
				
				# How far off is the heading?	
				yaw_error = desired_yaw - self.current_yaw		
		
				# Check the heading and change the state if there is too much heading error
				if math.fabs(yaw_error) > self.yaw_precision:
					
					# Change the state
					self.go_to_goal_state = "adjust heading"
				
			# We reached our goal. Change the state.
			else:			
				# Change the state
				self.go_to_goal_state = "goal achieved"
				
				# Command the robot to stop
				self.publisher_.publish(msg)
		
		# Goal achieved			
		elif (self.go_to_goal_state == "goal achieved"):
				
			self.get_logger().info('Goal achieved! X:%f Y:%f' % (
				self.goal_x_coordinates[self.goal_idx],
				self.goal_y_coordinates[self.goal_idx]))
			
			# Get the next goal
			self.goal_idx = self.goal_idx + 1
		
			# Do we have any more goals left?			
			# If we have no more goals left, just stop
			if (self.goal_idx > self.goal_max_idx):
				self.get_logger().info('Congratulations! All goals have been achieved.')
				while True:
					pass

			# Let's achieve our next goal
			else: 
				# Change the state
				self.go_to_goal_state = "adjust heading"				

			# We need to recalculate the start-goal line if Bug2 is running
			self.start_goal_line_calculated = False				
		
		else:
			pass
			
	def follow_wall(self):
		"""
		This method causes the robot to follow the boundary of a wall.
		"""
		# Create a geometry_msgs/Twist message
		msg = Twist()
		msg.linear.x = 0.0
		msg.linear.y = 0.0
		msg.linear.z = 0.0
		msg.angular.x = 0.0
		msg.angular.y = 0.0
		msg.angular.z = 0.0			

		# Special code if Bug2 algorithm is activated
		if self.bug2_switch == "ON":
		
			# Calculate the point on the start-goal 
			# line that is closest to the current position
			x_start_goal_line = self.current_x
			y_start_goal_line = (
				self.start_goal_line_slope_m * (
				x_start_goal_line)) + (
				self.start_goal_line_y_intercept)
						
			# Calculate the distance between current position 
			# and the start-goal line
			distance_to_start_goal_line = math.sqrt(pow(
						x_start_goal_line - self.current_x, 2) + pow(
						y_start_goal_line - self.current_y, 2)) 
							
			# If we hit the start-goal line again				
			if distance_to_start_goal_line < self.distance_to_start_goal_line_precision:
			
				# Determine if we need to leave the wall and change the mode
				# to 'go to goal'
				# Let this point be the leave point
				self.leave_point_x = self.current_x
				self.leave_point_y = self.current_y

				# Record the distance to the goal from the leave point
				self.distance_to_goal_from_leave_point = math.sqrt(
					pow(self.goal_x_coordinates[self.goal_idx] 
					- self.leave_point_x, 2)
					+ pow(self.goal_y_coordinates[self.goal_idx]  
					- self.leave_point_y, 2)) 
			
				# Is the leave point closer to the goal than the hit point?
				# If yes, go to goal. 
				diff = self.distance_to_goal_from_hit_point - self.distance_to_goal_from_leave_point
				if diff > self.leave_point_to_hit_point_diff:
						
					# Change the mode. Go to goal.
					self.robot_mode = "go to goal mode"


				# Exit this function
				return				
		
		# Logic for following the wall
		# >d means no wall detected by that laser beam
		# <d means an wall was detected by that laser beam
		d = self.dist_thresh_wf
		
		if self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist > d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist > d:
			self.wall_following_state = "turn left"
			msg.angular.z = self.turning_speed_wf_fast
			
			
		elif (self.leftfront_dist > d and self.front_dist > d and self.rightfront_dist < d):
			if (self.rightfront_dist < self.dist_too_close_to_wall):
				# Getting too close to the wall
				self.wall_following_state = "turn left"
				msg.linear.x = self.forward_speed
				msg.angular.z = self.turning_speed_wf_fast		
			else: 			
				# Go straight ahead
				self.wall_following_state = "follow wall"  
				msg.linear.x = self.forward_speed	
									
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist > d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		elif self.leftfront_dist > d and self.front_dist < d and self.rightfront_dist < d:
			self.wall_following_state = "turn left"
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist > d:
			self.wall_following_state = "turn left" 
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist < d and self.rightfront_dist < d:
			self.wall_following_state = "turn left" 
			msg.angular.z = self.turning_speed_wf_fast
			
		elif self.leftfront_dist < d and self.front_dist > d and self.rightfront_dist < d:
			self.wall_following_state = "search for wall"
			msg.linear.x = self.forward_speed
			msg.angular.z = -self.turning_speed_wf_slow # turn right to find wall
			
		else:
			pass 

		# Send velocity command to the robot
		self.publisher_.publish(msg)	
		
	def bug2(self):
	
		# Each time we start towards a new goal, we need to calculate the start-goal line
		if self.start_goal_line_calculated == False:
		
			# Make sure go to goal mode is set.
			self.robot_mode = "go to goal mode"				

			self.start_goal_line_xstart = self.current_x
			self.start_goal_line_xgoal = self.goal_x_coordinates[self.goal_idx]
			self.start_goal_line_ystart = self.current_y
			self.start_goal_line_ygoal = self.goal_y_coordinates[self.goal_idx]
			
			# Calculate the slope of the start-goal line m
			self.start_goal_line_slope_m = (
				(self.start_goal_line_ygoal - self.start_goal_line_ystart) / (
				self.start_goal_line_xgoal - self.start_goal_line_xstart))
			
			# Solve for the intercept b
			self.start_goal_line_y_intercept = self.start_goal_line_ygoal - (
					self.start_goal_line_slope_m * self.start_goal_line_xgoal) 
				
			# We have successfully calculated the start-goal line
			self.start_goal_line_calculated = True
			
		if self.robot_mode == "go to goal mode":
			self.go_to_goal()			
		elif self.robot_mode == "wall following mode":
			self.follow_wall()
def main(args=None):

    # Initialize rclpy library
    rclpy.init(args=args)
    
    # Create the node
    controller = PlaceholderController()

    # Spin the node so the callback function is called
    # Pull messages from any topics this node is subscribed to
    # Publish any pending messages to the topics
    rclpy.spin(controller)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    controller.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Here is the state estimator that goes with the code above. I used an Extended Kalman Filter to filter the odometry measurements from the mobile robot.

# Author: Addison Sears-Collins
# Date: December 8, 2020
# ROS Version: ROS 2 Foxy Fitzroy

# Python math library
import math

# ROS client library for Python
import rclpy

# Used to create nodes
from rclpy.node import Node

# Twist is linear and angular velocity
from geometry_msgs.msg import Twist 

# Position, orientation, linear velocity, angular velocity
from nav_msgs.msg import Odometry

# Handles laser distance scan to detect obstacles
from sensor_msgs.msg import LaserScan

# Used for laser scan
from rclpy.qos import qos_profile_sensor_data

# Enable use of std_msgs/Float64MultiArray message
from std_msgs.msg import Float64MultiArray 

# Scientific computing library for Python
import numpy as np

class PlaceholderEstimator(Node):
	"""
	Class constructor to set up the node
	"""
	def __init__(self):
		############## INITIALIZE ROS PUBLISHERS AND SUBSCRIBERS ######
	
	
		# Initiate the Node class's constructor and give it a name
		super().__init__('PlaceholderEstimator')
		
		# Create a subscriber 
		# This node subscribes to messages of type 
		# geometry_msgs/Twist.msg
		# The maximum number of queued messages is 10.
		self.velocity_subscriber = self.create_subscription(
			Twist,
			'/en613/cmd_vel',
			self.velocity_callback,
			10)
			
		# Create a subscriber
		# This node subscribes to messages of type
		# nav_msgs/Odometry
		self.odom_subscriber = self.create_subscription(
			Odometry,
			'/en613/odom',
			self.odom_callback,
			10)
		
		# Create a publisher
		# This node publishes the estimated position (x, y, yaw) 
		# The type of message is std_msgs/Float64MultiArray
		self.publisher_state_est = self.create_publisher(
			Float64MultiArray, 
			'/en613/state_est', 
			10)
			
		############# STATE TRANSITION MODEL PARAMETERS ###############	
		
		# Time step from one time step t-1 to the next time step t
		self.delta_t = 0.002 # seconds
		
		# Keep track of the estimate of the yaw angle
		# for control input vector calculation.
		self.est_yaw = 0.0
		
		# A matrix
		# 3x3 matrix -> number of states x number of states matrix
		# Expresses how the state of the system [x,y,yaw] changes 
		# from t-1 to t when no control command is executed. Typically 
		# a robot on wheels only drives when the wheels are commanded 
		# to turn.
		# For this case, A is the identity matrix.
		# A is sometimes F in the literature.
		self.A_t_minus_1 = np.array([ 	[1.0,  0,   0],
						[  0,1.0,   0],
						[  0,  0, 1.0]])
		
		# The estimated state vector at time t-1 in the global 
		# reference frame
		# [x_t_minus_1, y_t_minus_1, yaw_t_minus_1]
		# [meters, meters, radians]
		self.state_vector_t_minus_1 = np.array([0.0,0.0,0.0])
		
		# The control input vector at time t-1 in the global 
		# reference frame
		# [v,v,yaw_rate]
		# [meters/second, meters/second, radians/second]
		# In the literature, this is commonly u.
		self.control_vector_t_minus_1 = np.array([0.001,0.001,0.001])
		
		# Noise applied to the forward kinematics (calculation
		# of the estimated state at time t from the state transition
		# model of the mobile robot. This is a vector with the
		# number of elements equal to the number of states.
		self.process_noise_v_t_minus_1 = np.array([0.096,0.096,0.032])

		############# MEASUREMENT MODEL PARAMETERS ####################	
		
		# Measurement matrix H_t
		# Used to convert the predicted state estimate at time t=1
		# into predicted sensor measurements at time t=1.
		# In this case, H will be the identity matrix since the 
		# estimated state maps directly to state measurements from the 
		# odometry data [x, y, yaw]
		# H has the same number of rows as sensor measurements
		# and same number of columns as states.
		self.H_t = np.array([ 	[1.0,  0,   0],
					[  0,1.0,   0],
					[  0,  0, 1.0]])
				
		# Sensor noise. This is a vector with the
		# number of elements as the number of sensor measurements.
		self.sensor_noise_w_t = np.array([0.07,0.07,0.04])

		############# EXTENDED KALMAN FILTER PARAMETERS ###############
		
		# State covariance matrix P_t_minus_1
		# This matrix has the same number of rows (and columns) as the 
		# number of states (i.e. 3x3 matrix). P is sometimes referred
		# to as Sigma in the literature. It represents an estimate of 
		# the accuracy of the state estimate at t=1 made using the
		# state transition matrix. We start off with guessed values.
		self.P_t_minus_1 = np.array([ 	[0.1,  0,   0],
						[  0,0.1,   0],
						[  0,  0, 0.1]])
						
		# State model noise covariance matrix Q_t
		# When Q is large, the Kalman Filter tracks large changes in 
		# the sensor measurements more closely than for smaller Q.
		# Q is a square matrix that has the same number of rows as 
		# states.
		self.Q_t = np.array([ 		[1.0,   0,   0],
						[  0, 1.0,   0],
						[  0,   0, 1.0]])
						
		# Sensor measurement noise covariance matrix R_t
		# Has the same number of rows and columns as sensor measurements.
		# If we are sure about the measurements, R will be near zero.
		self.R_t = np.array([ 		[1.0,   0,    0],
						[  0, 1.0,    0],
						[  0,    0, 1.0]])
		
	def velocity_callback(self, msg):
		"""
		Listen to the velocity commands (linear forward velocity 
		in the x direction in the robot's reference frame and 
		angular velocity (yaw rate) around the robot's z-axis.
		Convert those velocity commands into a 3-element control
		input vector ... 
		[v,v,yaw_rate]
		[meters/second, meters/second, radians/second]
		"""
		# Forward velocity in the robot's reference frame
		v = msg.linear.x
		
		# Angular velocity around the robot's z axis
		yaw_rate = msg.angular.z
		
		# [v,v,yaw_rate]		
		self.control_vector_t_minus_1[0] = v
		self.control_vector_t_minus_1[1] = v
		self.control_vector_t_minus_1[2] = yaw_rate

	def odom_callback(self, msg):
		"""
		Receive the odometry information containing the position and orientation
		of the robot in the global reference frame. 
		The position is x, y, z.
		The orientation is a x,y,z,w quaternion. 
		"""						
		roll, pitch, yaw = self.euler_from_quaternion(
					msg.pose.pose.orientation.x,
					msg.pose.pose.orientation.y,
					msg.pose.pose.orientation.z,
					msg.pose.pose.orientation.w)
				
		obs_state_vector_x_y_yaw = [msg.pose.pose.position.x,msg.pose.pose.position.y,yaw]
		
		# These are the measurements taken by the odometry in Gazebo
		z_t_observation_vector =  np.array([obs_state_vector_x_y_yaw[0],
						obs_state_vector_x_y_yaw[1],
						obs_state_vector_x_y_yaw[2]])
		
		# Apply the Extended Kalman Filter
		# This is the updated state estimate after taking the latest
		# sensor (odometry) measurements into account.				
		updated_state_estimate_t = self.ekf(z_t_observation_vector)
		
		# Publish the estimate state
		self.publish_estimated_state(updated_state_estimate_t)

	def publish_estimated_state(self, state_vector_x_y_yaw):
		"""
		Publish the estimated pose (position and orientation) of the 
		robot to the '/en613/state_est' topic. 
		:param: state_vector_x_y_yaw [x, y, yaw] 
			x is in meters, y is in meters, yaw is in radians
		"""
		msg = Float64MultiArray()
		msg.data = state_vector_x_y_yaw
		self.publisher_state_est.publish(msg)

	def euler_from_quaternion(self, x, y, z, w):
		"""
		Convert a quaternion into euler angles (roll, pitch, yaw)
		roll is rotation around x in radians (counterclockwise)
		pitch is rotation around y in radians (counterclockwise)
		yaw is rotation around z in radians (counterclockwise)
		"""
		t0 = +2.0 * (w * x + y * z)
		t1 = +1.0 - 2.0 * (x * x + y * y)
		roll_x = math.atan2(t0, t1)
    
		t2 = +2.0 * (w * y - z * x)
		t2 = +1.0 if t2 > +1.0 else t2
		t2 = -1.0 if t2 < -1.0 else t2
		pitch_y = math.asin(t2)
    
		t3 = +2.0 * (w * z + x * y)
		t4 = +1.0 - 2.0 * (y * y + z * z)
		yaw_z = math.atan2(t3, t4)
    
		return roll_x, pitch_y, yaw_z # in radians
		
	def getB(self,yaw,dt):
		"""
		Calculates and returns the B matrix
		
		3x3 matix -> number of states x number of control inputs
		The control inputs are the forward speed and the rotation 
		rate around the z axis from the x-axis in the 
		counterclockwise direction.
		[v,v,yaw_rate]
		Expresses how the state of the system [x,y,yaw] changes 
		from t-1 to t
		due to the control commands (i.e. inputs).
		:param yaw: The yaw (rotation angle around the z axis) in rad 
		:param dt: The change in time from time step t-1 to t in sec
		"""
		B = np.array([	[np.cos(yaw) * dt,  0,   0],
				[  0, np.sin(yaw) * dt,   0],
				[  0,  0, dt]]) 				
		return B
		
	def ekf(self,z_t_observation_vector):
		"""
		Extended Kalman Filter. Fuses noisy sensor measurement to 
		create an optimal estimate of the state of the robotic system.
		
		INPUT
		:param z_t_observation_vector The observation from the Odometry
			3x1 NumPy Array [x,y,yaw] in the global reference frame
			in [meters,meters,radians].
		
		OUTPUT
		:return state_estimate_t optimal state estimate at time t	
			[x,y,yaw]....3x1 list --->
			[meters,meters,radians]
					
		"""
		######################### Predict #############################
		# Predict the state estimate at time t based on the state 
		# estimate at time t-1 and the control input applied at time
		# t-1.
		state_estimate_t = self.A_t_minus_1 @ (
			self.state_vector_t_minus_1) + (
			self.getB(self.est_yaw,self.delta_t)) @ (
			self.control_vector_t_minus_1) + (
			self.process_noise_v_t_minus_1)
			
		# Predict the state covariance estimate based on the previous
		# covariance and some noise
		P_t = self.A_t_minus_1 @ self.P_t_minus_1 @ self.A_t_minus_1.T + (
			self.Q_t)
		
		################### Update (Correct) ##########################
		# Calculate the difference between the actual sensor measurements
		# at time t minus what the measurement model predicted 
		# the sensor measurements would be for the current timestep t.
		measurement_residual_y_t = z_t_observation_vector - (
			(self.H_t @ state_estimate_t) + (
			self.sensor_noise_w_t))
			
		# Calculate the measurement residual covariance
		S_t = self.H_t @ P_t @ self.H_t.T + self.R_t
		
		# Calculate the near-optimal Kalman gain
		# We use pseudoinverse since some of the matrices might be
		# non-square or singular.
		K_t = P_t @ self.H_t.T @ np.linalg.pinv(S_t)
		
		# Calculate an updated state estimate for time t
		state_estimate_t = state_estimate_t + (K_t @ measurement_residual_y_t)
		
		# Update the state covariance estimate for time t
		P_t = P_t - (K_t @ self.H_t @ P_t)
		
		#### Update global variables for the next iteration of EKF ####		
		# Update the estimated yaw		
		self.est_yaw = state_estimate_t[2]
		
		# Update the state vector for t-1
		self.state_vector_t_minus_1 = state_estimate_t
		
		# Update the state covariance matrix
		self.P_t_minus_1 = P_t
		
		######### Return the updated state estimate ###################
		state_estimate_t = state_estimate_t.tolist()
		return state_estimate_t


def main(args=None):
    """
    Entry point for the progam.
    """
    
    # Initialize rclpy library
    rclpy.init(args=args)

    # Create the node
    estimator = PlaceholderEstimator()

    # Spin the node so the callback function is called.
    # Pull messages from any topics this node is subscribed to.
    # Publish any pending messages to the topics.
    rclpy.spin(estimator)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    estimator.destroy_node()
    
    # Shutdown the ROS client library for Python
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Wall Following Robot Mode Demo

Just for fun, I created a switch in the first section of the code (see previous section) that enables you to have the robot do nothing but follow walls. Here is a video demonstration of that.

Obstacle Avoidance Robot Mode Demo

Here is a demo of what the robot looks like when it does nothing but wander around the room, avoiding obstacles and walls.