ROS2: Path Planning Algorithms Python Implementation

# First and last mile

Imagine that you have parked you robot at a parking lot, next to a highway. There is a nice paved road connecting parking with a highway, and a ditch that is occupying the rest of space between them.

So far we used A Star Graph navigation algorithm on a road that had graph and if the initial position of a robot was outside the graph, well, we just used straight line navigation to get to the closest node of a graph.

There are few problems with this approach. First of all, if you drive from the parking lot to a road by a straight line, you will most likely end up in a ditch. Second, there is also a "last mile" problem, which is "drive there by a graph and then go to parking at your destination". Current implementation of our a-star navigation algorithm doesn't have this functionality. Third, we might want to drive along a highway, then do some cross-terrain driving, then highway again and so on, in other words, what if we need to randomly switch from graph based navigation to grid based navigation and back?

To illustrate this point I have created a wall on a keepout map. If only graph navigation algorithm is used, the robot gets to the nearest node on the road - right through the wall.

All these problems can be solved by allowing the Coordinator to decide when to use different algorithms, which means we have to make its finite automata just a bit more advanced.

As you can see on the image above, instead of going to the right, where the closest node was, the robot went down, along the "drivable" keepout level.

## Using multiple colors for Path

This is not part of this section's objective, but at some point I have realized that if we have multiple waypoints, it would be convenient to draw corresponding fragments of a path between them using different colors. Of course, as we may have MANY waypoints, using too many colors is probably a bad idea, so I chose to simply switch colors. Each robot (currently, max is 2, but it is fully adjustable) has 2 colors. Path to first waypoint is painted with color_0, then color_1, then color_0 again and so on.

An obvious problem with this approach (you can see it on an image below) is that if we go to waypoint (first color) and then at least part of the way back, the second color will overwrite the first one. It can be solved by using semi-transparent colors and at some point I will probably do it.

Fragments of the path for a second robot will have different color, which helps us to distinguish them.

```
if(self.strRobotName == ""):
self.path_color_0 = (0, 255, 0, 128)          # bgra, green
self.path_color_1 = (255, 0, 0, 128)          # bgra, blue
self.real_path_color = (0, 0, 255, 255)       # bgra, red
elif(self.strRobotName == "robot1"):
self.path_color_0 = (255, 255, 0, 128)          # bgra, cyan
self.path_color_1 = (255, 0, 255, 128)          # bgra, magenta
self.real_path_color = (0, 255, 255, 255)     # bgra, yellow

# All other robots, add more colors if necessary
else:
self.path_color_0 = (0, 255, 0, 128)          # bgra, green
self.path_color_1 = (255, 0, 0, 128)          # bgra, blue
self.real_path_color = (0, 0, 255, 255)       # bgra, red

```

## Coordinator.py

There are additional changes, compared to previous section. Now (and this is probably the way it should be) the Coordinator class has two planners: grid planner and graph planner. It is not user right now, but will be used soon: the idea is to use grid planner to get to the road, if the initial robot position is off the graph. Obviously, using grid planner is better than going by straight line (most likely, to the nearest ditch).

If grid planner is set to None, the straight line approach will be used.

```
# (c) robotics.snowcron.com

# Import necessary modules:

import rclpy
...

class Coordinator(Node):
# Note the change: we are now passing two planners
def __init__(self, grid_planner, graph_planner, path_follower, robot_name):

...

# Ignore it for now: I am going to use it in the next section
self.bFirstMileProcessed = False

# Here we assign not one, but two planners

self.grid_planner = grid_planner
self.graph_planner = graph_planner

...

# Names of Finite Automata states changed

def coordinator_callback(self):
if(self.status == "not_started" and self.path_follower.initial_pose_received == True):

self.publish_image("/images/background")

if(self.grid_planner is not None):
print("Grid planner mode:", self.grid_planner.strMode)
if(self.graph_planner is not None):
print("Graph planner mode:", self.graph_planner.strMode)

# Commented out code can be used instead - it is just a demo
if(self.grid_planner is not None and self.grid_planner.strMode == "straight_line"):
self.goThroughPoses()
# #     goal = PoseStamped()
# #     goal.pose.position.x = 4.0
# #     goal.pose.position.y = 4.0
# #     self.goToPose(goal)

elif(self.grid_planner is not None and self.grid_planner.strMode == "a_star_grid"):
self.goThroughPoses()
# #     goal = PoseStamped()
# #     goal.pose.position.x = -5.22
# #     goal.pose.position.y = -4.75
# #     self.goToPose(goal)

elif(self.graph_planner is not None and self.graph_planner.strMode == "a_star_graph"):
# self.goToNode(None, 10, True)

self.goThroughNodes()

self.status = "started"

pass
pass

# ---

...

# This is a demo function

def main(args=None):
print("*** Coordinator - main()")

try:
rclpy.init(args=args)

arrRobotNames = [""]
#arrRobotNames = ["robot1", "robot2"]

# Uncomment one of three lines below, to see corresponding functionality
# strPlannerName = "straight_line_planner"
# strPlannerName = "a_star_grid_planner"
strPlannerName = "a_star_graph_planner"

...

img_layer_manager = ImgLayersManager([], "/images/ground_plane", 640, 640, 4, 5)

strTopic = '/images/background'

for robot_name in arrRobotNames:
grid_planner = None
graph_planner = None

arrWaypoints = []
arrWaypointNodes = []
if(strPlannerName == "straight_line_planner"):
...

elif(strPlannerName == "a_star_grid_planner"):
grid_planner = GlobalPlannerAStarGrid(robot_name)

# We pass size (num. of cells) of the grid for a-star
grid_planner.initMap("../../maps/keepout_world_01.yaml", 64, 64)

if(robot_name == "" or robot_name == "robot1"):

# Poses are in ROS2 coords (meters)

wp_1 = PoseStamped()
wp_1.pose.position.x = -5.8
wp_1.pose.position.y = 5.5
arrWaypoints.append(wp_1)
else:
wp_1 = PoseStamped()
wp_1.pose.position.x = -5.22
wp_1.pose.position.y = -4.75
arrWaypoints.append(wp_1)

wp_2 = PoseStamped()
wp_2.pose.position.x = 6.05
wp_2.pose.position.y = -2.47
arrWaypoints.append(wp_2)

elif(strPlannerName == "a_star_graph_planner"):
...

# Create Coordinator, pass both planners etc

path_follower = PathFollower(robot_name)
coordinator = Coordinator(grid_planner, graph_planner, path_follower, robot_name)

coordinator.arrWaypoints = arrWaypoints
coordinator.arrWaypointNodes = arrWaypointNodes

# Note that planner can be None

if(grid_planner is not None):
if(graph_planner is not None):

strTopic = coordinator.strSlashRobotName + '/images/robot_path_markers'

try:
executor.spin()
finally:
executor.shutdown()
#planner.destroy_node()
#path_follower.destroy_node()

finally:
# Shutdown
rclpy.shutdown()

# ---

if __name__ == '__main__':
main()

```

## GlobalPlannerAStarGrid.py

This is a planner that uses a-star grid algorithm. It is derived from AStar class (see earlier sections). Now, it DEFINITELY has to be also derived from GlobalPlannerStraightLine class, as it uses straight line functions. I will do it later.

Also, as we have two planners now, GlobalPlannerAStar.py is renamed to GlobalPlannerAStarGraph.py

```
# (c) robotics.snowcron.com

# Import the necessary modules:

import rclpy
...

# ROS2 Node based class derives also from our AStar class for grid path planning
class GlobalPlannerAStarGrid(Node, AStar):

def __init__(self, robot_name):

print("*** GlobalPlannerAStarGrid.init()... ")

# This is init for AStar parent. See below for init of second parent
AStar.__init__(self)

# ---

self.strMode = "a_star_grid"

# Same as earlier, it all boils down to straight line planning
self.dSegmentLen = 0.1

if(robot_name != ""):
Node.__init__(self, robot_name + '_GlobalPlannerAStarGrid')
self.strSlashRobotName = "/" + robot_name
self.strRobotNameSlash = robot_name + "/"
else:
Node.__init__(self, 'GlobalPlannerAStarGrid')
self.strSlashRobotName = ""
self.strRobotNameSlash = ""

self.strRobotName = robot_name

# File name for keepout map YAML
self.strYamlImageFilePath = None

# Dimensions of a grid
self.nGridSizeX = None
self.nGridSizeY = None

# Grid itself
self.arrGrid = None

# Data that we read from keepout YAML file
self.dictMapData = None

# This code wasn't changed

self.plan_publisher = self.create_publisher(Path, self.strSlashRobotName + "/plan", 1)

self.make_plan_service = self.create_service(GetPlan, self.strSlashRobotName + "/plan",
self.make_plan_callback)

self.get_plan_client = self.create_client(GetPlan, self.strSlashRobotName + "/plan")
while not self.get_plan_client.wait_for_service(timeout_sec=1.0):
print("\t*** Service not available, waiting...")

print("*** GlobalPlannerAStarGrid.init() done")

# ---
# Grid is 64x64, origin at top-left.
# ROS2 coordinates are in meters, origin at bottom-left.
# So yes, we need a separate function to convert

def poseToGrid(self, p):
x = (int)((p.pose.position.x - self.dictMapData['originMetersX']) / self.dictMapData['gridCellSizeX'])

nImgSizeY = self.dictMapData['img'].size[1]
dOriginY = self.dictMapData['originMetersY']
dResolution = self.dictMapData['resolution']

y = (int)((nImgSizeY * dResolution - (p.pose.position.y +  - self.dictMapData['originMetersY'])) / self.dictMapData['gridCellSizeY'])

return (y, x)

# Conversion back from grid to pose

def gridToPose(self, node):
x = (node[1] + 0.5) * self.dictMapData['gridCellSizeX'] + self.dictMapData['originMetersX']
y = (self.nGridSizeY - node[0]  + 0.5) * self.dictMapData['gridCellSizeY'] + self.dictMapData['originMetersY']

return (x, y)

# This is a finite automata, caled by timer
def make_plan_callback(self, request, response):

print("*** GlobalPlannerAStarGrid.make_plan_callback()", flush=True)

# request contains start and goal points, unlike with graph
# path planner, those are just poses, nothing fancy

start = request.start
goal = request.goal

# Path to return
path = Path()

# Call AStar class to get path
# TBD: 32 is hardcoded
arrGridPath = self.get_astar_grid_path(self.poseToGrid(start), self.poseToGrid(goal), 32)
if(arrGridPath is None):
print(">>> Unable to find path")
else:
# Note: at this point, path contains cells of a grid through which
# the path goes: y, x = node[0], node[1]
# We need to convert it to Poses

# TBD: nNegate currently not used

start_pose = request.start

# Copy first point to a path
path.poses.append(start_pose)

for node in arrGridPath:
# first pose of the next segment is always the
# last pose of the previous one
start_pose = path.poses[-1]

x, y = self.gridToPose(node)

# lines petween poses

# Calculate delta x and y
dx = x - start_pose.pose.position.x
dy = y - start_pose.pose.position.y

# Calculate distance between start and goal
distance = math.sqrt(dx**2 + dy**2)

# Number of points to include in path
num_points = int(distance / self.dSegmentLen) # 0.1m resolution

for i in range(1, num_points + 1):
pose_prev = path.poses[-1]

pose = PoseStamped()
pose.pose.position.x = start_pose.pose.position.x + i * dx / num_points
pose.pose.position.y = start_pose.pose.position.y + i * dy / num_points

path.poses.append(pose)

# Publish the path
self.plan_publisher.publish(path)

response.plan = path
return response

# This function packs request same way it was done for the
# straight line planner (as they both are grid planners) and
# differently from graph planner.

def getPath(self, start, goal):
print("*** GlobalPlannerAStarGrid.getPath()")

print("*** Waiting for 'GlobalPlannerAStarGrid' service")
while not self.get_plan_client.wait_for_service(timeout_sec=1.0):
print("*** 'GlobalPlannerAStarGrid' service not available, waiting...", flush = True)

print("*** Calculating path from (", start.pose.position.x, ',', start.pose.position.y,
') to (',
goal.pose.position.x, ',', goal.pose.position.y, ")", flush = True)

request = GetPlan.Request()
request.start = start
request.goal = goal

future = self.get_plan_client.call_async(request)
rclpy.spin_until_future_complete(self, future)

if future.result() is not None:
path = future.result().plan

return path

else:
print('*** Failed to get plan')
return None

# This function reads keepout map YAML

# TBD: handle return if can not open file
with open(self.strYamlImageFilePath, mode="r", encoding="utf-8") as input_file:

arrLines = text.splitlines()
strMapFileName = self.strYamlImageFilePath[:-4] + "pgm"

dOriginMetersX = 0
dOriginMetersY = 0
for str in arrLines:
key, value = str.split(":")
value = value.strip()
if(key == "mode"):
strMode = value
elif(key == "resolution"):
dResolution = float(value)
elif(key == "origin"):
value = value.replace("[", "")
value = value.replace("]", "")
arrOrigin = value.split(", ")

# Note: if "origin" not provided, default is (0,0)
dOriginMetersX = float(arrOrigin[0].strip())
dOriginMetersY = float(arrOrigin[1].strip())

elif(key == "negate"):
nNegate = int(value)
elif(key == "occupied_thresh"):
nOccupiedThresh = float(value)
elif(key == "free_thresh"):
nFreeThresh = float(value)
elif(key == "image"):
strMapFileName = "../../maps/" + value

img = Image.open(strMapFileName)

# ---

# Coordinates of corners of the map we just loaded in meters
# Note: origin is left-bottom
dLeftMeters = dOriginMetersX
dTopMeters = dOriginMetersY + img.size[1] * dResolution
dRightMeters = dOriginMetersX + img.size[0] * dResolution
dBottomMeters = dOriginMetersY

dGridCellSizeX = img.size[0] * dResolution / self.nGridSizeX
dGridCellSizeY = img.size[1] * dResolution / self.nGridSizeY

# ---

img_array = np.array(img).astype(np.int)

# TBD: not the best approach
img_array = img_array - 127

self.arrGrid = 255 * np.ones((self.nGridSizeX, self.nGridSizeY), dtype=np.int)
for i in range(self.nGridSizeY):
for j in range(self.nGridSizeX):
cropped = img_array[i*10:(i+1)*10, j*10:(j+1)*10]
self.arrGrid[i, j] = np.amax(cropped)

# ---

self.dictMapData = {'grid': self.arrGrid, 'img': img,
'originMetersX': dOriginMetersX, 'originMetersY': dOriginMetersY,
'gridCellSizeX': dGridCellSizeX, 'gridCellSizeY': dGridCellSizeY,
'mode': strMode, 'resolution': dResolution, 'nNegate': nNegate,
'nOccupiedThresh': nOccupiedThresh, 'nFreeThresh': nFreeThresh,
}

# This function is separate from __init__, so we can call it
# any time, not just at start

def initMap(self, strYamlImageFilePath, nGridSizeX, nGridSizeY):
self.strYamlImageFilePath = strYamlImageFilePath
self.nGridSizeX = nGridSizeX
self.nGridSizeY = nGridSizeY

self.setMap(self.arrGrid)

# This is a demo function

def main(args=None):
print("*** GlobalPlannerAStarGrid - main()", flush=True)

rclpy.init(args=args)
planner = GlobalPlannerAStarGrid("")
planner.initMap("../../maps/keepout_world_01.yaml", 64, 64)

start = PoseStamped()
start.pose.position.x = 5.93
start.pose.position.y = 1.15

goal = PoseStamped()
goal.pose.position.x = 3.62
goal.pose.position.y = 1.15

planner.getPath(start, goal)

planner.destroy_node()
rclpy.shutdown()

# ---

if __name__ == '__main__':
main()

# ---

```

## PathFollower.py

The path follower file didn't change at all, compared to earlier section.

```

```