ROS2: Path Planning Algorithms Python Implementation

Adding A-Star (Graph) to our Project

A bit of Theory

We can break the rout to linear segments, and use ends of these segments as nodes in a-star algorithm. That means we can find a route from A to B (as list of nodes) and use straight line path planner to connect these nodes.

This graph was created in map_editor utility (see section above), let's repeat it here:

When we need to create a path, we use these nodes numbers to set the destination for a bot.

Here is, however, a problem: the a-star algorithm doesn't know where the bot is! It can be on any node, or outside, in an "uncharted" territory. This problem, by the way, is common for all navigation programs, one of solutions is the last (well, in our case, first) mile navigation. In other words, we figure out where the bot is, and create a straight line (or some other, if we have this algorithm handy) path to a closest (not first in the list!) node of a graph. So, if the task is to go from A to B, and robot is close to C, we need to drive robot to C, and then to create path C-A-B.

This is not necessarily a good strategy, as closest point might ba an isolated one, so the path will fail. We will use this approach for now. In future, maybe, we'll allow the user to say "drive to D instead of driving to closest point, and then to A and B"

New world and maps

To start with, we need a new world: we have a nice background picture, but it doesn't align with "maze" we used before. So we switch to world_01.sdf world.

Notice that I have placed couple of objects on top of a background image. The reason is, AMCL localization algorithm uses lidar scan data, so we need to provide something lidar can see.

Of course, these objects are located on top of their icons on the background image:

How can we find the right coordinates (of the center of corresponding shape) that we write into the .sdf file? We open map_editor utility (see section above) and choose Keepout and "pipette" tool. Now we can load the base image, and when we click on it, the click coordinates, both screen and world (in pixels and meters, accordingly) are printed in the terminal you used to start the Utility:

```
Selected option: Keepout
Color at (207, 306) / (-6.82, 2.35): (0, 0, 0, 0)

```

The (0, 0, 0, 0) is for pixel color.

Now, to see it properly in RViz, we need to provide RViz with proper map and keepout map. I have created files called world_01.yaml and world_01.pgm, located in maps directory.

To create these files, I have opened map_editor utility, loaded the base image, switched to Keepout mode and drew the keepout map on top of a base image, using brush tool.

This is not a very accurate image, but it will do. You can improve it in some bitmap editor, like Gimp, or wait until I implement more drawing tools in map_editor utility.

Then I have saved the keepout map. As for map, I have copied the keepout map and... that's all. We can actually use same file as map and keepout map. Here is an example of starting the app. As you can see, for both map and keepout map I use world_01.yaml. This is a temporary solution, just because graph algorithm does not use keepout map. In the section below, for a-star grid (as opposed to graph) navigation I am goind to create a different keepout map. But currently, they are the same: map and keepout mask:

```
ros2 launch multi_bot_nav_03 multi_simulation_launch.py world:=src/worlds/world_01.sdf map:=src/maps/world_01.yaml keepout_mask:=src/maps/keepout_world_01.yaml rviz_config_file:=src/multi_bot_nav_03/rviz/map_single.rviz mode:=map

```

Now AMCL can figure out where the robot is.

Coordinator.py

This is a script that manages all activity of our application. Here we set the task (list of nodes) and so on. In other words, nothing new, compared to what you saw in Straight Line Planner section.

```
# (c) robotics.snowcron.com

# Import necessary modules:
import rclpy
...

class Coordinator(Node):
def __init__(self, planner, path_follower, robot_name = ""):
print("*** Coordinator.init()... ", end = '')

# Coordinator takes path planner as one of its constructor
# arguments. We can pass Straight Line Path Planner from earlier
# section, or the A-Star Planner (see below). So it keeps
# two different arrays: for grid navigation algorithms, it
# uses array of waypoints (self.arrWaypoints) while for graph navigation
# algorithms, it uses array of nodes (which are just numbers, refering
# to keys in nodes dictionary) stored in self.arrWaypointNodes

# Waypoints to go through for grid-based algos
self.arrWaypoints = []

# Indices of waypoint nodes for nav. graph based algos
self.arrWaypointNodes = []

...

# This is a planner I mentioned. Different planners can be oassed
# as parameter to constructor. As for path follower, it is the
# same pure pursuit as in earlier sections. The reason is, the
# resulting path looks the same as before: just a list of poses,
# so we can use pure pursuit. In future I am goind to create different
# types of followers, so we can pass them instead.

self.planner = planner
self.path_follower = path_follower

...

print("*** Coordinator.init(): done")

# ------

...
# This function is called by timer, currently at 10 Hz
# It provides a simple finite automata

def coordinator_callback(self):

# At the beginning (and only if initial pose is available)
# retrieve path from the service (below, in planner section)
# and switch to "started" mode

if(self.status == "not_started" and self.path_follower.initial_pose_received == True):
self.status = "started"

print("*** Coordinator.coordinator_callback(); calling goToPose()/goToNode()")

# Show background image
self.publish_image("/images/background")

# In the following code fragment, commented are examples of
# going to a single waypoint or node, uncommented are examples
# of going through array of waypoints/nodes. You can swap comments
# to see how it works.

if(self.planner.strMode == "straight_line"):
self.goThroughPoses()
# #     goal = PoseStamped()
# #     goal.pose.position.x = 4.0
# #     goal.pose.position.y = 4.0
# #     self.goToPose(goal)
elif(self.planner.strMode == "a_star_graph"):
# self.goToNode(None, 10, True)
self.goThroughNodes()

# If mode is "started", then all the work is done in path follower
elif(self.status == "started"):
# TBD: get status from path_follower
# self.status = "done"

pass

# ---

# As discussed above, when we start, the first thing we need to do is
# moving from current position to the closest node
# This function circles through all nodes, looking for one closest
# to the robot's position.

def getClosestNode(self):
current_pose = self.path_follower.current_pose

dist = float("inf")
nIdxOfMin = 0
for i in range(len(self.planner.nodes)):
pose = Pose()
pose.position.x = self.planner.nodes[i][0]
pose.position.y = self.planner.nodes[i][1]

d = self.path_follower.getDistanceBetweenPoses(current_pose, pose)
if(d < dist):
nIdxOfMin = i
dist = d

return nIdxOfMin

# This function calls the service to get path from nStartNodeIdx
# to nGoalNodeIdx (they are really node names as integers, so "keys"
# would probably be better term than Idx).
# It can be called as a standalone function to go to node, in which
# case nStartNodeIdx in None and bStart is True. This means that
# current position will be used as a starting point and after path
# If we want to go through an array of nodes, we call this function
# multiple times. Second and following calls use nStartNodeIdx that is
# not None (end of previous segment becomes beginning of the next one,
# and current pos used only for first node in array).
# bStart in that case is set to False, so that we can concatenate
# each path for all segments together, and only then call "start"

def goToNode(self, nStartNodeIdx, nGoalNodeIdx, bStart):
print("*** Coordinator.GoToNode()")

# When this is the first segment, we need to get to its start point
if(nStartNodeIdx is None):
nStartNodeIdx = self.getClosestNode()

# Note that in start_pose we pass current pose,
# while in end pose we pack start and goal nodes.
# This is definitely a bad practice, but I did it, so I don't have
# to create a custom message type.

start_pose = PoseStamped()
start_pose.pose = self.path_follower.current_pose
else:
start_pose = PoseStamped()
start_pose.pose.position.x = self.planner.nodes[nStartNodeIdx][0]
start_pose.pose.position.y = self.planner.nodes[nStartNodeIdx][1]

# Once again: start pose contains actual pose, while goal pose contains
# names (keys, ids) of start and end nodes. An ugly way of not creating a
# custom message, and reusing same message type we used for grig based navigation.

goal_pose = PoseStamped()
goal_pose.pose.position.x = float(nStartNodeIdx)
goal_pose.pose.position.y = float(nGoalNodeIdx)

# In case of graph based algorithms, getPath should understand that pervasive
# data format we used. Not a big deal, but please keep it in mind.

path = self.planner.getPath(start_pose, goal_pose)

if(bStart):
if(path is not None):
print("*** Coordinator.GoToPose(); publishing path")
self.path_follower.path_follower_path_publisher.publish(path)
self.path_follower.command("start")

return path

# This function circles through all nodes in an array. Before the first node, it
# adds a closest to current node, so the bot goes to the road first, and only then
# navigates to destination.

def goThroughNodes(self):

print("*** Coordinator.goThroughNodes()")

# From current pose to closest node
path = self.goToNode(None, self.arrWaypointNodes[0], False)

if(path is not None):
local_path = None

# Now, in cycle, through all nodes
for i in range(1, len(self.arrWaypointNodes)):
local_path = self.goToNode(self.arrWaypointNodes[i - 1],
self.arrWaypointNodes[i], False)
if(local_path is not None):
for pt in local_path.poses[:-1]:
path.poses.append(pt)
else:
print(">>> Unable to build path from", self.arrWaypointNodes[i-1],
"to", self.arrWaypointNodes[i])
return
path.poses.append(local_path.poses[-1])
else:
print(">>> Unable to build path from current pose to", self.arrWaypointNodes[0])
return

# After all path arrays are concatenated together, we finally call "start"
# Note that to see path in RViz, we have to also publish it

print("*** Coordinator.goThroughNodes(); publishing path")
self.path_follower.path_follower_path_publisher.publish(path)
self.path_follower.command("start")

# ---

def goToPose(self, goal):
# same as before, this is for grid based algorithms, like
# straight line planner
...

# ---

def goThroughPoses(self):
# same as before, this is for grid based algorithms, like
# straight line planner
...

# This is a demo that uses all above. By switching comments below, you can
# invoke different planners, and so on.

def main(args=None):
print("*** Coordinator - main()")

try:
rclpy.init(args=args)

# This section MUST correspond to the section of launch file

arrRobotNames = [""]
#arrRobotNames = ["robot1", "robot2"]

# Here we set type of a planner. Depending on this flag,, we will
# create planner below

# strPlannerName = "straight_line_planner"
strPlannerName = "a_star_graph_planner"

# Ground display is a 640x640, we project there base image, path
# (planned and actual) and whatever else we want.

node_ground = rclpy.create_node('spawn_model')
client = node_ground.create_client(SpawnEntity, '/spawn_entity')
request = SpawnEntity.Request()
request.name = "my_model"

# TBD. For now: cube.urdf is a very flat cube (plane) to display map image on.
# Its size should be the same as size of a map image (again: make adjustable)
# For example, for image 640x640 and resolution: 0.05, we need
# 640*0.05 = 32m surface.
request.robot_namespace = ""

future = client.call_async(request)
rclpy.spin_until_future_complete(node_ground, future)

# Add all classes to executor. Note that we do different
# things depending on robot name (so robots do not collide, as we do not handle
# collisions yet) and depending on planner type. The part for straight_line_planner
# is same as in the earlier section

img_layer_manager = ImgLayersManager([], "/images/ground_plane", 640, 640, 4, 5)

strTopic = '/images/background'

for robot_name in arrRobotNames:
arrWaypoints = []
arrWaypointNodes = []
if(strPlannerName == "straight_line_planner"):
planner = GlobalPlannerStraightLine(robot_name)

if(robot_name == "" or robot_name == "robot1"):
wp_1 = PoseStamped()
wp_1.pose.position.x = 4.0
wp_1.pose.position.y = 0.0
arrWaypoints.append(wp_1)

wp_2 = PoseStamped()
wp_2.pose.position.x = 4.0
wp_2.pose.position.y = 7.0
arrWaypoints.append(wp_2)

wp_3 = PoseStamped()
wp_3.pose.position.x = 6.0
wp_3.pose.position.y = 1.0
arrWaypoints.append(wp_3)

wp_4 = PoseStamped()
wp_4.pose.position.x = 0.0
wp_4.pose.position.y = 0.0
arrWaypoints.append(wp_4)
else:
wp_1 = PoseStamped()
wp_1.pose.position.x = -4.0
wp_1.pose.position.y = 0.0
arrWaypoints.append(wp_1)

wp_2 = PoseStamped()
wp_2.pose.position.x = -4.0
wp_2.pose.position.y = 2.0
arrWaypoints.append(wp_2)

wp_3 = PoseStamped()
wp_3.pose.position.x = -2.0
wp_3.pose.position.y = 2.0
arrWaypoints.append(wp_3)

elif(strPlannerName == "a_star_graph_planner"):
planner = GlobalPlannerAStar(robot_name)
planner.setMode("a_star_graph")

# Note / TBD: if we want to be able to change roads graph dynamically, we need to
# publish it instead of assigning here, same way it is done for base map image
# in publish_image()

# Here we tell the robot to go to node 22, then to node 3
if(robot_name == "" or robot_name == "robot1"):
arrWaypointNodes.append(22)
arrWaypointNodes.append(3)
else:
arrWaypointNodes.append(35)
arrWaypointNodes.append(13)

path_follower = PathFollower(robot_name)
coordinator = Coordinator(planner, path_follower, robot_name)

coordinator.arrWaypoints = arrWaypoints
coordinator.arrWaypointNodes = arrWaypointNodes

strTopic = coordinator.strSlashRobotName + '/images/robot_path_markers'

try:
executor.spin()
finally:
executor.shutdown()
#planner.destroy_node()
#path_follower.destroy_node()

finally:
# Shutdown
rclpy.shutdown()

# ---

if __name__ == '__main__':
main()

```

GlobalPlannerAStar.py

This is a planner that uses a-star graph algorithm. It is derived from AStarGraph class (see earlier sections). Now, it DEFINITELY has to be also derived from GlobalPlannerStraightLine class, as it uses straight line functions. I will do it later.

```
# (c) robotics.snowcron.com

# Import necessary modules

import rclpy
...

class GlobalPlannerAStar(Node, AStarGraph):

def __init__(self, robot_name = ""):

# Part of initialization is done in parent
#super().__init__()

print("*** GlobalPlannerAStar.init()... ")

# As before, we set segment length to 10 cm. TBD. Note that it is not necessarily
# an optimal solution: segment can, technically, be kilometers long, as long
# as it is on a straight line. Something to address in future.

self.dSegmentLen = 0.1

...

# Publisher and service for the plan

self.plan_publisher = self.create_publisher(Path, self.strSlashRobotName + "/plan", 1)

self.make_plan_service = self.create_service(GetPlan, self.strSlashRobotName + "/plan",
self.make_plan_callback)

self.get_plan_client = self.create_client(GetPlan, self.strSlashRobotName + "/plan")
while not self.get_plan_client.wait_for_service(timeout_sec=1.0):
print("\t*** Service not available, waiting...")

self.nodes = None

# Currently, we do not have a-star grid navigation, but I will add it in future.
# So we need to be able to distinguish between them:

self.strMode = "a_star_graph"

print("*** GlobalPlannerAStar.init() done")

# Here we handle request to the service to provide a path.
# As I use "pervasive" data format, please pay attention to what is
# unpacked from the request.

def make_plan_callback(self, request, response):

print("*** GlobalPlannerAStar.make_plan_callback()", flush=True)

# goal is a StampedPose, but instead of coordinates, I pass the nodes
# (which are integer keys from adjacency list) for start and goal nodes.

goal = request.goal

# Indexes of start and finish nodes
nStartIdx = int(goal.pose.position.x)
nFinishIdx = int(goal.pose.position.y)

# TBD: what treshold do we really need here?
nodes_path = self.get_astar_graph_path(nStartIdx, nFinishIdx, treshold=63)

if(nodes_path is None):
return None
else:
print("*** Nodes in path:", nodes_path)

path = Path()

# Unlike goal pose of a request, start pose contains the pose,
# not node names
start_pose = request.start

# Copy first point to a path
path.poses.append(start_pose)

# Now in cycle, create a straight line path(s) between consequtive
# nodes

for nNodeIdx in nodes_path:

# Last pose of prev. segment becomes first pose of next one
start_pose = path.poses[-1]

# Then the already familiar straight line path planning follows

# Calculate delta x and y
dx = self.nodes[nNodeIdx][0] - start_pose.pose.position.x
dy = self.nodes[nNodeIdx][1] - start_pose.pose.position.y

# Simple straight line path

# Calculate distance between start and goal
distance = math.sqrt(dx**2 + dy**2)

# Number of points to include in path
num_points = int(distance / self.dSegmentLen) # 0.1m resolution

for i in range(1, num_points + 1):
pose_prev = path.poses[-1]

pose = PoseStamped()
pose.pose.position.x = start_pose.pose.position.x + i * dx / num_points
pose.pose.position.y = start_pose.pose.position.y + i * dy / num_points

# Calculate quaternion based on segment direction
alpha = math.atan2(pose.pose.position.y - pose_prev.pose.position.y,
pose.pose.position.x - pose_prev.pose.position.x)

quat = self.quaternion_from_euler(0, 0, alpha)

# print(f"\t*** x: {pose.pose.position.x}; y: {pose.pose.position.y}; \
#     alpha: {math.degrees(alpha)}")

# Set orientation (not used)
pose.pose.orientation = quat
# ---

path.poses.append(pose)

self.plan_publisher.publish(path)

response.plan = path
return response

# ---

...

# Create and send request to service, to get path

def getPath(self, start, goal):
print("*** GlobalPlannerAStar.getPath()")

print("*** Waiting for 'GlobalPlannerAStar' service")
while not self.get_plan_client.wait_for_service(timeout_sec=1.0):
print("*** 'GlobalPlannerAStar' service not available, waiting...", flush = True)

request = GetPlan.Request()
request.start = start
request.goal = goal

future = self.get_plan_client.call_async(request)
rclpy.spin_until_future_complete(self, future)

if future.result() is not None:
path = future.result().plan

return path

else:
print('*** Failed to get plan')
return None

# ---

def setMode(self, strMode):
self.strMode = strMode

# This function loads graph (adjacency list for all nodes on a map)
# from the file (created by map editor utility)

with open(strFilePath) as f:

nodes_str = nodes_str.split("nodes = ")[1]

nodes = {}
for k,v in nodes_str.items():
nodes[int(k)] = v

self.nodes = nodes

# ---

...

```

PathFollower.py

The path follower file didn't change at all, compared to earlier section.

```

```