This is a simple implementation of a Path Follower algorithm. The onlyaddition I made is about reducing speed when turning.
# (c) robotics.snowcron.com # Use: MIT license # import required modules ... class PathFollower(): def __init__(self, robot_node): print("*** PathFollower.init()") # This class has the owner (of Robot class) that is a ROS2 node self.robot_node = robot_node # In a single robot scenario, robot name is set to "", in 2 robots # case it is robot1 and robot2 if(self.robot_node.strRobotName == ""): self.path_color_0 = (0, 255, 0, 128) # bgra, green self.path_color_1 = (255, 0, 0, 128) # bgra, blue self.real_path_color = (0, 0, 255, 255) # bgra, red elif(self.robot_node.strRobotName == "robot1"): self.path_color_0 = (255, 255, 0, 128) # bgra, cyan self.path_color_1 = (255, 0, 255, 128) # bgra, magenta self.real_path_color = (0, 255, 255, 255) # bgra, yellow else: self.path_color_0 = (0, 255, 0, 128) # bgra, green self.path_color_1 = (255, 0, 0, 128) # bgra, blue self.real_path_color = (0, 0, 255, 255) # bgra, red # Markers are a temporary imperfect solution to show path in Gazebo. # Marker is a little sphere and to not create a clogged presentation where # robot moves slowly, we only show new marker if it is at least 1m away # from the previous one self.dDistBetweenRobotMarkers = 1.0 self.keepout_yaml = robot_node.keepout_yaml img, dOriginMetersX, dOriginMetersY, strMode, dResolution, nNegate, nOccupiedThresh, nFreeThresh = self.keepout_yaml # Dimensions of our map, pixels self.nCanvasWidth, self.nCanvasHeight = img.shape # TBD: here we assume grid to be square self.nGridSizeMeters = self.nCanvasWidth * dResolution # We publish map and path for ImageLayersManager to handle, if necessary nChannels = 4 self.imgCanvas = np.zeros((self.nCanvasHeight, self.nCanvasWidth, nChannels), dtype=np.uint8) self.bridge = CvBridge() self.frame_num = 0 # Used to put marker number in a header of a ROS2 message self.canvas_publisher = self.robot_node.create_publisher( Image, self.robot_node.strSlashRobotName + '/images/robot_path_markers', 10) # --- self.nCurrentSegment = 0 # 0-based number of path segment self.nTotalSegments = -1 # Path length, segments self.maxLookaheadDistance = 5.0 # How far ahead should algorithm look self.minLookaheadDistance = 2.0 # Min distance, user when turning # As we turn, we reduce lookahead distance self.dAdjustedLookaheadDistance = self.maxLookaheadDistance # Give or take 1 meter self.dLookaheadAccuracy = 1.0 # When we are approaching the end of a path, lookahead distance can be too much. self.bNearFinish = False # Pose the algorithm is "aiming" at self.target_pose = Pose() self.dDistanceToFinish = float("inf") self.angular_speed = 1.0 self.linear_speed = 2.0 self.dSpeedCoef = 1.0 # --- self.bPathSet = False # If path was already provided, if not do nothing # Publish the desired linear and angular velocity of the robot (in the # robot chassis coordinate frame) to the /cmd_vel_unstamped topic. # The diff_drive controller that our robot uses will read this topic # and move the robot accordingly. self.robot_node.publisherCmdVel = self.robot_node.create_publisher( Twist, self.robot_node.strSlashRobotName + "/cmd_vel", 10) # --- State of a finite automata # Note that owner node has to make sure self.waitForNodesToStart() was called and # finished with success status self.navStatus = "idle" # --- self.prev_time = self.robot_node.get_clock().now() # --- print("*** PathFollower.init(): done", flush=True) # --- def command(self, strCommand): # "start", "stop" #, "reset", "revert", set speed and other params... self.navStatus = strCommand # --- def setCurrentPose(self, x, y, z, roll, pitch, yaw): self.kalman_pose = Pose() self.kalman_pose.position.x = x self.kalman_pose.position.y = y self.kalman_pose.position.z = 0.0 qx, qy, qz, qw = euler_to_quaternion(roll, pitch, yaw) self.kalman_pose.orientation.x = qx self.kalman_pose.orientation.y = qy self.kalman_pose.orientation.z = qz self.kalman_pose.orientation.w = qw # --- def setCurrentPoseNoTransform(self, x, y, z, qx, qy, qz, qw): self.kalman_pose = Pose() self.kalman_pose.position.x = x self.kalman_pose.position.y = y self.kalman_pose.position.z = 0.0 self.kalman_pose.orientation.x = qx self.kalman_pose.orientation.y = qy self.kalman_pose.orientation.z = qz self.kalman_pose.orientation.w = qw # --- # Publish message for dif. drive of a robot def drive(self, linear_speed, angular_speed, dAngleToTurnRobot): msg = Twist() msg.linear.x = float(linear_speed) msg.linear.y = 0.0 msg.linear.z = 0.0 msg.angular.x = 0.0 msg.angular.y = 0.0 if(dAngleToTurnRobot < 0): angular_speed *= -1 msg.angular.z = float(angular_speed) self.robot_node.publisherCmdVel.publish(msg) # --- # Assign a path for robot to follow def setPath(self, path): # Stop current execution self.command("idle") self.path = path # Initialize progress self.nTotalSegments = len(self.path.poses) self.bPathSet = True self.bNearFinish = False # ------ def closest_point_on_segment(self, x1, y1, x2, y2, x, y): pt = Pose() if x1 == x2 and y1 == y2: pt.position.x = x1 pt.position.y = y1 return pt # Both endpoints are the same # Calculate the direction vector of the segment dx = x2 - x1 dy = y2 - y1 # Calculate the length of the segment segment_length = dx * dx + dy * dy # Handle degenerate case where the segment has zero length if segment_length == 0: return x1, y1 # Calculate the vector from (x1, y1) to the point (x, y) vx = x - x1 vy = y - y1 # Calculate the dot product of the segment and the vector to the point dot_product = (vx * dx + vy * dy) / segment_length # Clamp the dot product to the range [0, 1] dot_product = max(0, min(1, dot_product)) # Calculate the closest point on the segment closest_x = x1 + dot_product * dx closest_y = y1 + dot_product * dy pt.position.x = closest_x pt.position.y = closest_y return pt # --- # This function implements a simple finite automata # Robot calculates the point to go to and linear / angular speed, and when point # is close enough, repeats the calculation for the next point, until end of path is # reached. def onTimer(self): now = self.robot_node.get_clock().now() duration = now - self.prev_time delta_ms = duration.to_msg().sec * 1000 + duration.to_msg().nanosec / 1e6 # Only used to print "Status changed" at the bottom of this function bStatusChanged = False # We do nothing until Localization provides initial position if(self.robot_node.bInitialPoseReceived == False): return if(self.navStatus == "stop"): self.drive(0, 0, 0) # stop self.navStatus = "idle" bStatusChanged = True elif(self.navStatus == "idle"): # Just to show we are alive if delta_ms >= 1000: print("*** PathFollower.onTimer() *** Status: %s" % (self.navStatus), flush=True) elif(self.navStatus == "start"): if(self.bPathSet == False): print("Path not set, see set_path_callback()", flush=True) self.navStatus = "idle" else: self.prev_pose = self.kalman_pose self.markerArray = self.publish_path_markers(self.path.poses) self.bNearFinish = False self.target_pose = Pose() self.dDistanceToFinish = float("inf") self.navStatus = "calculate" bStatusChanged = True elif(self.navStatus == "calculate"): # 1. Find closest segment end nClosestSegmentEnd = self.nCurrentSegment + 1 # 2. From closest segment end, scan for first segment ending outside of lookahead range if(nClosestSegmentEnd + self.maxLookaheadDistance < self.nTotalSegments): nFirstSegmentEndingOutsideOfLookaheadRangeIdx = int(nClosestSegmentEnd + self.maxLookaheadDistance - 1) self.bNearFinish = False else: nFirstSegmentEndingOutsideOfLookaheadRangeIdx = None self.bNearFinish = True # --- if(self.bNearFinish == False): self.dAdjustedLookaheadDistance = self.maxLookaheadDistance for nIdx in range(nClosestSegmentEnd, nFirstSegmentEndingOutsideOfLookaheadRangeIdx): pt0 = self.path.poses[nIdx - 1].pose pt1 = self.path.poses[nIdx].pose pt2 = self.path.poses[nIdx + 1].pose dAngleToTurnRobot = angle_between_vectors([pt0.position.x, pt0.position.y], [pt1.position.x, pt1.position.y], [pt2.position.x, pt2.position.y]) # A simple way of reducing lookahead distance depending on turn angle if(abs(dAngleToTurnRobot) >= math.pi / 2.0): self.dAdjustedLookaheadDistance = self.minLookaheadDistance elif(abs(dAngleToTurnRobot) >= math.pi / 4.0): self.dAdjustedLookaheadDistance = self.minLookaheadDistance + (self.maxLookaheadDistance - self.minLookaheadDistance) / 4 elif(abs(dAngleToTurnRobot) >= math.pi / 6.0): self.dAdjustedLookaheadDistance = self.minLookaheadDistance + (self.maxLookaheadDistance - self.minLookaheadDistance) / 2 else: self.dAdjustedLookaheadDistance = self.maxLookaheadDistance else: # If near finish, aim for finish self.dAdjustedLookaheadDistance = self.nTotalSegments - nClosestSegmentEnd # Works as distance between markers is 1 nIdx = self.nCurrentSegment + int(self.dAdjustedLookaheadDistance) if(self.nCurrentSegment == self.nTotalSegments - 1): print(">>>>> STOP! <<<<<", flush=True) self.navStatus = "stop" bStatusChanged = True else: self.target_pose = self.path.poses[nIdx].pose # 4. Turn the robot towards target point alpha = math.atan2((self.target_pose.position.y - self.kalman_pose.position.y), (self.target_pose.position.x - self.kalman_pose.position.x)) _, _, beta = euler_from_quaternion(self.kalman_pose.orientation.x, self.kalman_pose.orientation.y, self.kalman_pose.orientation.z, self.kalman_pose.orientation.w) dAngleToTurnRobot = alpha - beta nSign = 1 if dAngleToTurnRobot >= 0 else -1 if(dAngleToTurnRobot > math.pi or dAngleToTurnRobot < -math.pi): dAngleToTurnRobot = -1 * nSign * (2 * math.pi - abs(dAngleToTurnRobot)) # At this point, dDistance holds value from cycle above: current to target dSuggestedLinearSpeed = self.linear_speed dSuggestedAngularSpeed = self.angular_speed # Adjust speed depending on turn angle if(abs(dAngleToTurnRobot) >= math.pi/2): dSpeedCoef = 0.0 elif(abs(dAngleToTurnRobot) >= math.pi/3): dSpeedCoef = 0.1 elif(abs(dAngleToTurnRobot) >= math.pi/4): dSpeedCoef = 0.25 elif(abs(dAngleToTurnRobot) >= math.pi/6): dSpeedCoef = 0.5 elif(abs(dAngleToTurnRobot) >= math.pi/8): dSpeedCoef = 0.7 else: dSpeedCoef = 1.0 # We want speed to change in a smooth way if(self.dSpeedCoef < dSpeedCoef): self.dSpeedCoef = min(1, self.dSpeedCoef + 0.1) elif(self.dSpeedCoef > dSpeedCoef): self.dSpeedCoef = max(0, self.dSpeedCoef - 0.1) # else: pass dSuggestedLinearSpeed = self.linear_speed * self.dSpeedCoef # Now adjust angular speed based on estimated time to turn. If we # do not do it, robot will draw sinusoid... if(dSuggestedLinearSpeed == 0): dSuggestedAngularSpeed = self.angular_speed else: dTimeEstimate = 1.0 / dSuggestedLinearSpeed dSuggestedAngularSpeed = min(self.angular_speed, 2 * abs(dAngleToTurnRobot) / dTimeEstimate) self.drive(dSuggestedLinearSpeed, dSuggestedAngularSpeed, dAngleToTurnRobot) self.navStatus = "driving" bStatusChanged = True print(">>>>>>>>>>>>>>>>>>> Changing status to driving", flush=True) elif(self.navStatus == "driving"): if(self.nCurrentSegment == self.nTotalSegments - 1): print(">>>>> STOP! <<<<<", flush=True) self.navStatus = "stop" bStatusChanged = True # Check if we need to increase self.nCurrentSegment dDistance_0 = getDistanceBetweenPoses(self.kalman_pose, self.path.poses[self.nCurrentSegment].pose) dDistance_1 = getDistanceBetweenPoses(self.kalman_pose, self.path.poses[self.nCurrentSegment + 1].pose) if(dDistance_1 < dDistance_0): self.nCurrentSegment += 1 # --- if(self.bNearFinish == False): self.navStatus = "calculate" bStatusChanged = True elif(self.bNearFinish == True): self.navStatus = "calculate" bStatusChanged = True dDistanceToFinish = self.nTotalSegments - self.nCurrentSegment if(dDistanceToFinish <= self.dDistanceToFinish): self.dDistanceToFinish = dDistanceToFinish else: print(">>>>> STOP! <<<<<", flush=True) self.navStatus = "stop" bStatusChanged = True # This is for presentation only: draw robot's path on a canvas and publish it for ImageLayersManager to use if(self.navStatus == "driving" or self.navStatus == "calculate"): dRobotShift = getDistanceBetweenPoses(self.prev_pose, self.kalman_pose) if(dRobotShift >= self.dDistBetweenRobotMarkers): prev_x = int(self.nCanvasWidth * (self.prev_pose.position.x + self.nGridSizeMeters/2) / self.nGridSizeMeters) prev_y = int(self.nCanvasHeight * (self.nGridSizeMeters/2 - self.prev_pose.position.y) / self.nGridSizeMeters) x = int(self.nCanvasWidth * (self.kalman_pose.position.x + self.nGridSizeMeters/2) / self.nGridSizeMeters) y = int(self.nCanvasHeight * (self.nGridSizeMeters/2 - self.kalman_pose.position.y) / self.nGridSizeMeters) cv2.line(self.imgCanvas, (prev_x, prev_y), (x, y), self.real_path_color, 2) self.prev_pose = self.kalman_pose msg = self.bridge.cv2_to_imgmsg(self.imgCanvas, "bgra8") msg.header.frame_id = str(self.robot_node.strSlashRobotName + '/images/robot_path_markers') self.frame_num += 1 self.canvas_publisher.publish(msg) # --- if(bStatusChanged): print("*** PathFollower.onTimer() *** Status changed: %s" % (self.navStatus), flush=True) if delta_ms >= 1000: self.prev_time = now # --- Marker is just a sphere def create_path_marker(self, x, y, z, r, g, b, a=1.0): marker = Marker() marker.header.frame_id = "world" marker.type = Marker.SPHERE marker.action = Marker.ADD marker.lifetime = Duration().to_msg() # forever marker.pose.position.x = x marker.pose.position.y = y marker.pose.position.z = z marker.scale.x = 0.2 marker.scale.y = 0.2 marker.scale.z = 0.2 marker.color.r = r / 255.0 marker.color.g = g / 255.0 marker.color.b = b / 255.0 marker.color.a = a return marker # ------ def publish_path_markers(self, path_points): markers = MarkerArray() for i, point in enumerate(path_points): marker = self.create_path_marker( point.pose.position.x, point.pose.position.y, point.pose.position.z, 0.0, 255., 0., 0.5) marker.id = i markers.markers.append(marker) x = int(self.nCanvasWidth * (point.pose.position.x + self.nGridSizeMeters/2) / self.nGridSizeMeters) y = int(self.nCanvasHeight * (self.nGridSizeMeters/2 - point.pose.position.y) / self.nGridSizeMeters) if(i > 0): if(self.robot_node.strRobotName == "" or self.robot_node.strRobotName == "robot1"): color = self.path_color_0 else: color = self.path_color_1 cv2.line(self.imgCanvas, (prev_x, prev_y), (x, y), color, 5) prev_x = x prev_y = y msg = self.bridge.cv2_to_imgmsg(self.imgCanvas, "rgba8") msg.header.frame_id = str(self.frame_num) # 'msg' is a ROS2 sensor_msgs/Image.msg self.frame_num += 1 self.canvas_publisher.publish(msg)