ROS2 and Gazebo: Localization and Path Planning

# Pure Pursuite Path Follower

## PathFollower.py

This class implements a simplified version of a Pure Pursuite algorithm.

Pure Pursuite works by choosing a point along the route, some distance ahead, and moving towards it. As robot moves, it constantly picks the next point, so the "pursuite" never ends. I am not going to explain details here: in the "links" section below you will fine some excellent tutorials, for any level, from beginner to advanced.

```
# (c) robotics.snowcron.com

# --- Importing necessary modules

import rclpy
from rclpy.node import Node

...

class PathFollower(Node):

def __init__(self, robot_name = ""):
print("*** PathFollower.init()")

# Prefixes for frames and message topics. Details are provided
# in the earlier sections, where I created a multi-robot project.

if(robot_name != ""):
super().__init__(robot_name + '_PathFollower')
self.strSlashRobotName = "/" + robot_name
self.strRobotNameSlash = robot_name + "/"
else:
super().__init__('PathFollower')
self.strSlashRobotName = ""
self.strRobotNameSlash = ""

self.strRobotName = robot_name

# This is a segment closest to the robot. Our algorithm will
# look forward starting from this segment, ignoring ones left
# behind.
self.nCurrentSegment = 0

self.nTotalSegments = -1

# We look 1 meter forward. Note that if we approach the end
# of the route, the lookahead distance will not exceed the
# distance to the last point.

# Our sensors are not perfect. We need to decide how close we should
# get to the destination before we state that we have arrived.

# As was mentioned above, when we are close to finish, we should
# keep in mind that we can not look ahead too far.
self.bNearFinish = False

# Point we currently move towards
self.target_pose = Pose()

# --- Used to present data as charts

self.arrChartTime         = []   # Integer numbers in seconds
self.arrChartAMCL         = []
self.arrChartOdom         = []
self.arrChartFilteredOdom = []
self.arrChartGPS          = []
self.arrChartFilteredGPS  = []
self.arrChartGT           = []

# Used for charts only
self.nPointIdx = 0

# Do we have path to follow?
self.bPathSet = False

self.nTotalSegments = -1

# Temp! AMCL requires initial pose, and it can be pretty much
# anything. So we provide this point. In a real robot app, it
# can be coordinates of a charger/docking station, or initial
# GPS fix.
self.current_pose = Pose()
self.current_pose.position.x = -2.0
self.current_pose.position.y = -2.0
self.current_pose.position.z = 0.0
self.current_pose.orientation.x = 0.0
self.current_pose.orientation.y = 0.0
self.current_pose.orientation.z = 0.0
self.current_pose.orientation.w = 1.0

# As this is a localization example, I am going to keep
# track of some raw sensor coordinates and filtered
# coordinates.
self.amcl_pose = copy.deepcopy(self.current_pose)
self.odom_pose = copy.deepcopy(self.current_pose)
self.filtered_odom_pose = copy.deepcopy(self.current_pose)
self.ground_truth_pose = copy.deepcopy(self.current_pose)
self.gps_pose = copy.deepcopy(self.current_pose)
self.gps_filtered_pose = copy.deepcopy(self.current_pose)

# Subscribe to "path_follower_command" TEXT message
# Currently, we can command "start" and "stop": see
self.command_subscription = self.create_subscription(
String,
self.strSlashRobotName + "/path_follower_command",
self.command_callback,
10  # QoS profile depth
)

self.path_follower_command_publisher = self.create_publisher(
String,
self.strSlashRobotName + "/path_follower_command",
10  # QoS profile depth
)

# Probably, /path_follower_set_path is not the best possible
# name for a topic. Anyway, this is where I am going to
# publish the path for path follower to use.

# Subscribe to "path_follower_path" nav_msgs/Path message
self.path_subscription = self.create_subscription(
Path,
self.strSlashRobotName + "/path_follower_set_path",
self.set_path_callback,
10  # QoS profile depth
)
self.path_subscription  # Prevent unused variable warning

self.path_follower_path_publisher = self.create_publisher(
Path,
self.strSlashRobotName + "/path_follower_set_path",
10  # QoS profile depth
)

# ---

# Publish the desired linear and angular velocity of the robot (in the
# robot chassis coordinate frame) to the /cmd_vel_unstamped topic.
# The diff_drive controller that our robot uses will read this topic
# and move the robot accordingly.
# In other words, this is our way to send messages to robot's motors.
self.publisherCmdVel = self.create_publisher(
Twist, #TwistStamped,

# For ROS2 diff. drive controller
#'/diff_cont/cmd_vel_unstamped' + self.strSlashRobotName,

# For Gazebo diff. drive controller
self.strSlashRobotName + "/cmd_vel",
10)

# Maximum values for robot's linear and angular speed
self.angular_speed = 1.0 #0.5
self.linear_speed = 1.0 #0.5

# ---

# As always, we are going to implement a simple finite automata,
# this is its initial state.
self.navStatus = "idle"

# We have "navigation_callback" function, that implements
# a simple finite automata. We call it by timer.

timer_period = 0.1

# Used to publish status
self.prev_time = datetime.now()

# This is to get ground truth coordinates. Gazebo, of course,
# knows exactly where the robot is, so we can use this info
# to estimate accuracy of other location methods. However, keep
# in mind that in the real world you do not have access to such
# information: it is called cheating. Accordingly, I am not
# using GT coordinates for anything but getting accuracy estimation.

ground_truth_pos_sub = self.create_subscription(
ModelStates,
'/gazebo/model_states',
self.ground_truth_pose_callback,
10  # QoS profile depth
)

# ---

# Subscribe to messages of type nav_msgs/Odometry
# that provides position and orientation of the robot
self.odom_subscriber = self.create_subscription(
Odometry,
self.strSlashRobotName + '/odom',
self.odom_callback,
10)

# localization node)

qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
durability=QoSDurabilityPolicy.VOLATILE  # You can use TRANSIENT_LOCAL for durability
)

self.filtered_odom_subscriber = self.create_subscription(
Odometry,
self.strSlashRobotName + '/odometry/filtered',
self.filtered_odom_callback,
qos_profile = qos_profile)

# --- GPS

# Set GPS initial pos, this is to tell ROS2 where
# on a globe our map is.
self.origin_latitude = 44.98
self.origin_longitude = -93.27
self.origin_altitude = 254.99

self.gps_subscriber = self.create_subscription(
NavSatFix,
self.strSlashRobotName + '/gps/fix',
self.gps_callback,
qos_profile = qos_profile  # Set the queue size
)

# I am not using it in this example
self.gps_filtered_subscriber = self.create_subscription(
NavSatFix,
self.strSlashRobotName + '/gps/filtered',
self.gps_filtered_callback,
qos_profile = qos_profile  # Set the queue size
)

# --- AMCL related;
# TBD: move starting AMCL and getting init. position
# to a separate class/node?

self.bNodesActive = False

self.pose_qos = QoSProfile(
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1)

self.localization_pose_sub = self.create_subscription(
PoseWithCovarianceStamped,
self.strSlashRobotName + '/amcl_pose',
self._amclPoseCallback, self.pose_qos)

self.initial_pose_pub = self.create_publisher(
PoseWithCovarianceStamped,
self.strSlashRobotName + '/initialpose', 10)

self.waitForNodesToStart()

# ---

print("*** PathFollower.init(): done")

# ------
# We can send commands to this node, like
# "start" and "end"

def command(self, strCommand):
command_msg = String()
command_msg.data = strCommand
self.path_follower_command_publisher.publish(command_msg)

# ------
# This is a rather standard way of waiting for all the
# necessary nodes to come up. Here I wait for AMCL.

def waitForNodesToStart(self):
print("*** PathFollower.waitForNodesToStart()...")
print("\t*** amcl starting...")
self._waitForNodeToActivate(self.strRobotNameSlash + 'amcl')
print("\t*** amcl started successfully")

self._waitForInitialPose()

print("*** PathFollower.waitForNodesToStart() done")

self.bNodesActive = True

# ------

def _waitForNodeToActivate(self, node_name):
print("\t\t*** Waiting for ", node_name, " to become active...")
node_service = node_name + "/get_state"
state_client = self.create_client(GetState, node_service)
while not state_client.wait_for_service(timeout_sec=1.0):
print("\t\t*** ", node_service, " service not available, waiting...")

req = GetState.Request()
state = "unknown"
while (state != "active"):
print("\t\t*** Getting ", node_name, " state...")
future = state_client.call_async(req)
rclpy.spin_until_future_complete(self, future)
if future.result() is not None:
state = future.result().current_state.label
print("\t\t*** Result of get_state: %s" % state)
#time.sleep(1)
time.sleep(1)
return

# ------
# AMCL related. We need to make sure AMCL received its
# initial position. Note that we are explicitely setting
# it in the code, so yes, it will be received.

def _waitForInitialPose(self):
print("\t***_waitForInitialPose...")
print("\t\t*** Setting initial pose")
self._setInitialPose()
print("\t\t***Waiting for amcl_pose to be received")
rclpy.spin_once(self, timeout_sec=1.0)
#time.sleep(1)
time.sleep(1)
return

# ------

def _setInitialPose(self):

initial_pose = PoseStamped()

# In ROS 2, when you need to work with time,
# it's recommended to use the Time class from rclpy.time
# rather than the Python standard library time module
# or datetime module.

initial_pose.pose.position.x = -2.0
initial_pose.pose.position.y = -0.0
initial_pose.pose.position.z = 0.0
initial_pose.pose.orientation.x = 0.0
initial_pose.pose.orientation.y = 0.0
initial_pose.pose.orientation.z = 0.0
initial_pose.pose.orientation.w = 1.0

msg = PoseWithCovarianceStamped()
msg.pose.pose = initial_pose.pose
print("\t\t\t*** Publishing Initial Pose")
self.initial_pose_pub.publish(msg)

return

# ------

# The frequency of pose updates can be controlled by
# the following parameters:
# update_min_d: This parameter specifies the minimum translation (in meters)
#    that the robot must move before a new pose estimate is published.
#    If the robot's translation (position change) is less than this value,
#    no new pose estimate is published.
#
# update_min_a: This parameter specifies the minimum rotation (in radians)
# that the robot must turn before a new pose estimate is published.

def _amclPoseCallback(self, msg):
#print("*** PathFollower._amclPoseCallback()")

self.amcl_pose = msg.pose.pose

self.printPoseInfo()

# ---
# This is a reporting function, it doesn't affect the
# actual navigation. It prints something like
# X        Y        Z      Yaw
# AMCL:              2.53     0.36     0.00     -162
# Odom:              2.40     0.31     0.15     -165
# Filtered Odom:     2.42     0.32    -0.00     -168
# GPS:               2.03    -0.58   255.50     ---
# Filtered GPS:     -2.00    -2.00     0.00     ---
# GT:                3.00     0.60     0.15     -153
# ===

def printPoseInfo(self):
if(self.strRobotName != "" or self.initial_pose_received == False):
return

now = datetime.now()
delta_ms = (now - self.prev_time).seconds * 1000 + (now - self.prev_time).microseconds / 1000.

if delta_ms >= 1000:
amcl_pose = self.amcl_pose

_, _, amcl_yaw = self.euler_from_quaternion(
amcl_pose.orientation.x,
amcl_pose.orientation.y,
amcl_pose.orientation.z,
amcl_pose.orientation.w)

# ---

odom_pose = self.odom_pose

_, _, odom_yaw = self.euler_from_quaternion(
odom_pose.orientation.x,
odom_pose.orientation.y,
odom_pose.orientation.z,
odom_pose.orientation.w)

# ---

filtered_odom_pose = self.filtered_odom_pose

_, _, filtered_odom_yaw = self.euler_from_quaternion(
filtered_odom_pose.orientation.x,
filtered_odom_pose.orientation.y,
filtered_odom_pose.orientation.z,
filtered_odom_pose.orientation.w)

# ---

ground_truth_pose = self.ground_truth_pose
# ground_truth_pose = self.getGroundTruthCoordinates()

_, _, ground_truth_yaw = self.euler_from_quaternion(
ground_truth_pose.orientation.x,
ground_truth_pose.orientation.y,
ground_truth_pose.orientation.z,
ground_truth_pose.orientation.w)

# ---
gps_pose = self.gps_pose
gps_filtered_pose = self.gps_filtered_pose

print("*** Status: %s; \n\
X        Y        Z      Yaw \n\
AMCL:           %7.2f  %7.2f  %7.2f  %7.0f\n\
Odom:           %7.2f  %7.2f  %7.2f  %7.0f\n\
Filtered Odom:  %7.2f  %7.2f  %7.2f  %7.0f\n\
GPS:            %7.2f  %7.2f  %7.2f     ---\n\
Filtered GPS:   %7.2f  %7.2f  %7.2f     ---\n\
GT:             %7.2f  %7.2f  %7.2f  %7.0f\n===\n\n" %

(self.navStatus,
amcl_pose.position.x, amcl_pose.position.y,
amcl_pose.position.z, math.degrees(amcl_yaw),

odom_pose.position.x, odom_pose.position.y,
odom_pose.position.z, math.degrees(odom_yaw),

filtered_odom_pose.position.x, filtered_odom_pose.position.y,
filtered_odom_pose.position.z, math.degrees(filtered_odom_yaw),

gps_pose.position.x, gps_pose.position.y,
gps_pose.position.z,

gps_filtered_pose.position.x, gps_filtered_pose.position.y,
gps_filtered_pose.position.z,

ground_truth_pose.position.x, ground_truth_pose.position.y,
ground_truth_pose.position.z, math.degrees(ground_truth_yaw)
)
)

if(self.strRobotName == "" and self.nPointIdx > 0):
self.arrChartTime.append(len(self.arrChartTime))
self.arrChartAMCL.append(
math.sqrt(
(ground_truth_pose.position.x - amcl_pose.position.x)**2 +
(ground_truth_pose.position.y - amcl_pose.position.y)**2))
self.arrChartOdom.append(
math.sqrt(
(ground_truth_pose.position.x - odom_pose.position.x)**2 +
(ground_truth_pose.position.y - odom_pose.position.y)**2))
self.arrChartFilteredOdom.append(
math.sqrt(
(ground_truth_pose.position.x - filtered_odom_pose.position.x)**2 +
(ground_truth_pose.position.y - filtered_odom_pose.position.y)**2))
self.arrChartGPS.append(
math.sqrt(
(ground_truth_pose.position.x - gps_pose.position.x)**2 +
(ground_truth_pose.position.y - gps_pose.position.y)**2))
self.arrChartFilteredGPS.append(
math.sqrt(
(ground_truth_pose.position.x - gps_filtered_pose.position.x)**2 +
(ground_truth_pose.position.y - gps_filtered_pose.position.y)**2))

self.nPointIdx += 1

# Stores the current Ground Truth coordinates

def ground_truth_pose_callback(self, msg):
strName = self.strRobotName
if(self.strRobotName == ""):
strName = "multi_bot_nav_01"
robot_index = msg.name.index(strName)
self.ground_truth_pose = msg.pose[robot_index]
#print(f"Robot Ground-Truth Pose: x={robot_pose.position.x}, y={robot_pose.position.y}, z={robot_pose.position.z}")

# Stores the current odom coordinates
def odom_callback(self, msg):
self.odom_pose = msg.pose.pose
self.printPoseInfo()

# A very primitive transformation for GPS to x,y
# Note: ROS2 provides a better one in navsat_transform node

def lat_lon_to_xy(self, latitude, longitude):

# Earth radius in meters (approximate)

# Calculate the differences in latitude and longitude
delta_lat = lat1 - lat2
delta_lon = lon1 - lon2

# Calculate X and Y coordinates in meters
x = -earth_radius * delta_lon * math.cos((lat1 + lat2) / 2)

return x, y

# Store x,y coordinates obtained from GPS coordinates

def gps_callback(self, msg):

x, y = self.lat_lon_to_xy(msg.latitude, msg.longitude)

self.gps_pose.position.x = x
self.gps_pose.position.y = y
self.gps_pose.position.z = msg.altitude
self.gps_pose.orientation.x = 0.0
self.gps_pose.orientation.y = 0.0
self.gps_pose.orientation.z = 0.0
self.gps_pose.orientation.w = 1.0

# self.printPoseInfo()
# print("Latitude: ", msg.latitude, " Longitude: ", msg.longitude, " Altitude: ", msg.altitude)
self.printPoseInfo()

# Currently, not used (note that it always returns initial pose,
# this is because navsat_transform node is commented out)

def gps_filtered_callback(self, msg):

x, y = self.lat_lon_to_xy(msg.latitude, msg.longitude)

self.gps_filtered_pose.position.x = x
self.gps_filtered_pose.position.y = y
self.gps_filtered_pose.position.z = msg.longitude
self.gps_filtered_pose.orientation.x = 0.0
self.gps_filtered_pose.orientation.y = 0.0
self.gps_filtered_pose.orientation.z = 0.0
self.gps_filtered_pose.orientation.w = 1.0

# self.printPoseInfo()
# print("Latitude: ", msg.latitude, " Longitude: ", msg.longitude, " Altitude: ", msg.altitude)
self.printPoseInfo()

def filtered_odom_callback(self, msg):
#print(">>> Filtered odometry callback! <<<")
self.current_pose = msg.pose.pose

self.filtered_odom_pose = msg.pose.pose
self.printPoseInfo()

# Publish a command for the robot (to its diff. drive) to move

def drive(self, linear_speed, angular_speed, dAngleToTurnRobot):
msg = Twist()
msg.linear.x = float(linear_speed)
msg.linear.y = 0.0
msg.linear.z = 0.0
msg.angular.x = 0.0
msg.angular.y = 0.0

if(dAngleToTurnRobot < 0):
angular_speed *= -1

msg.angular.z = float(angular_speed)

self.publisherCmdVel.publish(msg)

# We can pass a command to the finite automata.
# Currently, "start" and "stop" are supported.

def command_callback(self, msg):

# "start", "stop"
self.navStatus = msg.data

# Perform any necessary actions based on the received command
# For example, you can process the command and send a response

# Publish a response message
# response_msg = String()
# response_msg.data = 'Command received and processed.'
# self.response_publisher.publish(response_msg)

# ------
# We can publish a path and receive it using this callback
def set_path_callback(self, msg):
print("*** PathFollower.set_path_callback()")

# TBD: stop current execution
self.navStatus == "idle"

self.path = msg

# Initialize progress
self.nTotalSegments = len(self.path.poses)

self.bPathSet = True
self.bNearFinish = False

# ------
# As we look for the next lookahead point, we start from the closest
# point on the path and scan forward, this way we will not move
# in circles.
# Also note that as we scan closest segments first, we are not
# going to jump over large sections of the path.

def closest_point_on_segment(self, x1, y1, x2, y2, x, y):
pt = Pose()

if x1 == x2 and y1 == y2:
pt.position.x = x1
pt.position.y = y1
return pt  # Both endpoints are the same

# Calculate the direction vector of the segment
dx = x2 - x1
dy = y2 - y1

# Calculate the length of the segment
segment_length = dx * dx + dy * dy

# Handle degenerate case where the segment has zero length
if segment_length == 0:
return x1, y1

# Calculate the vector from (x1, y1) to the point (x, y)
vx = x - x1
vy = y - y1

# Calculate the dot product of the segment and the vector to the point
dot_product = (vx * dx + vy * dy) / segment_length

# Clamp the dot product to the range [0, 1]
dot_product = max(0, min(1, dot_product))

# Calculate the closest point on the segment
closest_x = x1 + dot_product * dx
closest_y = y1 + dot_product * dy

pt.position.x = closest_x
pt.position.y = closest_y
return pt

# ------ Finite automata

# Current time
now = datetime.now()

# Time passed since prev.
delta_ms = (now - self.prev_time).seconds * 1000
+ (now - self.prev_time).microseconds / 1000.

# There are actions, for example, printing status,
# that we only want to perform if status changed.
bStatusChanged = False

# If AMCL haven't received its initial pose, do nothing
return

# "stop" pauses navigation and displays statistics it collected
# as a chart
if(self.navStatus == "stop"):
self.drive(0, 0, 0)    # stop
self.navStatus = "idle"
bStatusChanged = True

# If we have two robots, do not show chart
if(self.strRobotName == ""):
# Create a figure and axis
fig, ax = plt.subplots()

# Plot the data with different colors and labels
ax.plot(self.arrChartTime, self.arrChartAMCL,
label='self.arrChartAMCL', color='red', linestyle='-')
ax.plot(self.arrChartTime, self.arrChartOdom,
label='self.arrChartOdom', color='blue', linestyle='-')
ax.plot(self.arrChartTime, self.arrChartFilteredOdom,
label='self.arrChartFilteredOdom', color='green', linestyle='-')
ax.plot(self.arrChartTime, self.arrChartGPS,
label='self.arrChartGPS', color='purple', linestyle='-')
#ax.plot(self.arrChartTime, self.arrChartFilteredGPS,
label='self.arrChartFilteredGPS', color='orange', linestyle='-')
#ax.plot(self.arrChartTime, self.arrChartGT,
label='self.arrChartGT', color='brown', linestyle='-')

ax.legend(loc='upper left')

# Set labels for X and Y axes
ax.set_xlabel('Time')
ax.set_ylabel('Values')

# Show the plot
plt.show()

# Cleanup
self.arrChartTime         = []
self.arrChartAMCL         = []
self.arrChartOdom         = []
self.arrChartFilteredOdom = []
self.arrChartGPS          = []
self.arrChartFilteredGPS  = []
#self.arrChartGT           = []

elif(self.navStatus == "idle"):
if delta_ms >= 1000:
print("*** Status: %s" % (self.navStatus))

elif(self.navStatus == "start"):
if(self.bPathSet == False):
print("Path not set, see set_path_callback()")
self.navStatus = "idle"

else:
# We reset variables that need to be reset,
# and change status, so the next time it is
# processed as "calculate"

self.bNearFinish = False
self.target_pose = Pose()
self.dDistanceToFinish = float("inf")

self.navStatus = "calculate"

bStatusChanged = True

elif(self.navStatus == "calculate"):
# 1. Find closest segment end
dMinDist = float("inf")
if(self.nCurrentSegment == self.nTotalSegments - 1):
nMinIdx = self.nCurrentSegment
else:
nMinIdx = self.nCurrentSegment + 1

for nIdx in range(self.nCurrentSegment, self.nTotalSegments):
dDistance = self.getDistanceBetweenPoses(
self.current_pose, self.path.poses[nIdx].pose)

if(dDistance <= dMinDist):
dMinDist = dDistance
nMinIdx = nIdx
self.nCurrentSegment = nMinIdx - 1

# Break as soon as distance start increasing
# As our sensors are imperfect, we might have large
# localization error near the finish, at the moment
# when we are supposed to stop. Due to the error,
# we will continue moving, and the distance to
# finish will keep increasing.
# So we need to stop, if distance to finish increases,
# even if we were not able to get close enough.
else:
break

# 2. From closest segment end, scan for first segment
# after it, ending outside of lookahead range
bFound = False
for nIdx in range(nMinIdx, self.nTotalSegments):
dDistance = self.getDistanceBetweenPoses(
self.current_pose, self.path.poses[nIdx].pose)

bFound = True
break
else:
self.bNearFinish = True

# 3. On the first segment ending outside of lookahead range,
# find a point that is self.lookaheadDistance away.
# That will give us 2 points of intersection of circle
# with segment, we need one closer to segment end.
# Note: nIdx > 0, as we always start with robot at
# 0th point of a route
if(bFound == False):
self.target_pose = self.closest_point_on_segment(
self.path.poses[nIdx - 1].pose.position.x,
self.path.poses[nIdx - 1].pose.position.y,
self.path.poses[nIdx].pose.position.x,
self.path.poses[nIdx].pose.position.y,
self.current_pose.position.x,
self.current_pose.position.y)
else:
# Point we need to use as our target is located
# within a segment, so we divide it by half, until
# we are close enough

pt_1 = self.path.poses[nIdx - 1].pose
pt_2 = self.path.poses[nIdx].pose

pt = Pose()

dDistance = self.getDistanceBetweenPoses(self.current_pose, pt_1)
pt = pt_2
else:
while(True):
pt.position.x = (pt_1.position.x + pt_2.position.x) / 2
pt.position.y = (pt_1.position.y + pt_2.position.y) / 2

dDistance = self.getDistanceBetweenPoses(self.current_pose, pt)
if(dDistance >= dMinus and dDistance <= dPlus):
break
elif(dDistance <= dMinus):
pt_2 = pt
else:
pt_1 = pt

self.target_pose = pt
#print("3: pt: ", pt.position.x, pt.position.y)

# 4. Turn the robot towards target point
alpha = math.atan2((self.target_pose.position.y
- self.current_pose.position.y),
(self.target_pose.position.x
- self.current_pose.position.x))

_, _, beta = self.euler_from_quaternion(
self.current_pose.orientation.x,
self.current_pose.orientation.y,
self.current_pose.orientation.z,
self.current_pose.orientation.w)

dAngleToTurnRobot = alpha - beta

nSign = 1 if dAngleToTurnRobot >= 0 else -1
if(dAngleToTurnRobot > math.pi or dAngleToTurnRobot < -math.pi):
dAngleToTurnRobot = -1 * nSign
* (2 * math.pi - abs(dAngleToTurnRobot))

# At this point, dDistance holds value from
# cycle above: current point to target
dSuggestedLinearSpeed = self.linear_speed
dSuggestedAngularSpeed = self.angular_speed

# Depending on the angle, we need to adjust
# the speed of our robot

# If angle is too large, turn in place
if(abs(dAngleToTurnRobot) > math.pi / 2.0):
dSuggestedLinearSpeed = 0.0

elif(dDistance != 0):
dSpeedCoef = 1.0

dCurvature = abs(dAngleToTurnRobot / dDistance)
if(dCurvature < 0.3):
dSpeedCoef = 1.0
elif(dCurvature < 0.6):
dSpeedCoef = 0.9
elif(dCurvature < 1.2):
dSpeedCoef = 0.75
else:
dSpeedCoef = 0.5

dSuggestedLinearSpeed = dSuggestedLinearSpeed * dSpeedCoef

dTimeEstimate = dDistance / dSuggestedLinearSpeed
if(dTimeEstimate != 0):
dSuggestedAngularSpeed = min(self.angular_speed,
2 * abs(dAngleToTurnRobot) / dTimeEstimate)
else:
dSuggestedAngularSpeed = 0

self.drive(dSuggestedLinearSpeed,
dSuggestedAngularSpeed,
dAngleToTurnRobot)
self.navStatus = "driving"
bStatusChanged = True

elif(self.navStatus == "driving"):
dDistance = self.getDistanceBetweenPoses(
self.current_pose, self.target_pose)

# Note that we do recalc minimum once per second
if(self.bNearFinish == False
and delta_ms >= 1000
and self.nCurrentSegment < self.nTotalSegments - 1):
self.navStatus = "calculate"
bStatusChanged = True
elif(True):
self.navStatus = "calculate"
bStatusChanged = True

dDistanceToFinish = \
self.getDistanceBetweenPoses(self.current_pose, self.path.poses[-1].pose)

if(self.bNearFinish == True):
#print(dDistanceToFinish, self.dDistanceToFinish)
if(dDistanceToFinish <= self.dDistanceToFinish):
self.dDistanceToFinish = dDistanceToFinish
else:
self.navStatus = "stop"
bStatusChanged = True

#print("Distance to Finish: ", dDistanceToFinish)

# ---

# if(bStatusChanged):
#     print("*** Status: %s" % (self.navStatus))

if delta_ms >= 1000:
self.prev_time = now

# Angle to turn robot while driving to a target pose

def getAngleBetweenPoses(self, start, goal):

roll1, pitch1, yaw1 = self.euler_from_quaternion(start.orientation.x,
start.orientation.y, start.orientation.z, start.orientation.w)
roll2, pitch2, yaw2 = self.euler_from_quaternion(goal.orientation.x,
goal.orientation.y, goal.orientation.z, goal.orientation.w)

angle = yaw2 - yaw1

#print("### Angle between poses: ", math.degrees(yaw1), math.degrees(yaw2), math.degrees(yaw2 - yaw1))

if angle > math.pi:
angle -= 2*math.pi
elif angle < -math.pi:
angle += 2*math.pi

return angle

def getDistanceBetweenPoses(self, start, goal):

dx = goal.position.x - start.position.x
dy = goal.position.y - start.position.y
dz = goal.position.z - start.position.z

distance = math.sqrt(dx**2 + dy**2 + dz**2)
return distance

# ---

def euler_from_quaternion(self, x, y, z, w):
"""
Convert a quaternion into euler angles (roll, pitch, yaw)
roll is rotation around x in radians (counterclockwise)
pitch is rotation around y in radians (counterclockwise)
yaw is rotation around z in radians (counterclockwise)
"""
t0 = +2.0 * (w * x + y * z)
t1 = +1.0 - 2.0 * (x * x + y * y)
roll_x = math.atan2(t0, t1)

t2 = +2.0 * (w * y - z * x)
t2 = +1.0 if t2 > +1.0 else t2
t2 = -1.0 if t2 < -1.0 else t2
pitch_y = math.asin(t2)

t3 = +2.0 * (w * z + x * y)
t4 = +1.0 - 2.0 * (y * y + z * z)
yaw_z = math.atan2(t3, t4)

return roll_x, pitch_y, yaw_z # in radians

# ------

def create_path_marker(self, x, y, z, r, g, b, a=1.0):
marker = Marker()

marker.type = Marker.SPHERE

marker.pose.position.x = x
marker.pose.position.y = y
marker.pose.position.z = z

marker.scale.x = 0.2
marker.scale.y = 0.2
marker.scale.z = 0.2

marker.color.r = r / 255.0
marker.color.g = g / 255.0
marker.color.b = b / 255.0
marker.color.a = a

return marker

# ------

def main(args=None):
print("*** PathFollower - main()")

rclpy.init(args=args)
path_follower = PathFollower()

# ---

if __name__ == '__main__':
main()

```