This class wraps some or the real robot's functionality and adds the ROS2 functionality. Strictly speaking, more of it should be encapsulated here (and removed from launch file), so I will definitely revisit this class in future.
# (c) robotics.snowcron.com # Use: MIT license ... from nav25d_07.utils import * from nav25d_07.PlannerStraightLine import * from nav25d_07.PlannerAStarGrid import * from nav25d_07.PlannerAStarGridDynamic import * from nav25d_07.PlannerAStarGraph import * class Robot(Node): def __init__(self, robot_name): print("*** Robot.init(): started, robot_name: '", robot_name, "'", flush=True) # Get decorated robot name used to create distinck ROS2 topic names for different robots self.strRobotName, self.strSlashRobotName, self.strRobotNameSlash, self.strDecoratedNodeName = initSuperNodeAndGetDecoratedRobotNames(robot_name, "Robot") super().__init__(self.strDecoratedNodeName) # --- Create all types of path planners self.globalPathPlannerStraightLine = PlannerStraightLine() strPlanTopic = "/plan/global_planner_straight_line" self.straight_line_plan_publisher = self.create_publisher(Path, self.strSlashRobotName + strPlanTopic, 1) # --- dGridCellSize = 10 self.globalPlannerGrid = PlannerAStarGrid(dGridCellSize, nResizeTo=1000) self.globalPlannerGrid.loadKeepoutGrid("./../harsh/src/maps/perfect_map_01.yaml") strPlanTopic = "/plan/global_planner_a_star_grid" self.grid_plan_publisher = self.create_publisher(Path, self.strSlashRobotName + strPlanTopic, 1) # --- ranges = [24, 49, 74, 99, 124, 149, 174, 199, 224, 255] self.globalPlannerGridDynamic = PlannerAStarGridDynamic(ranges, max_cell_size=256, min_cell_size=8, nResizeTo=1000) # As we pass start and goal, which we do not have here, we should call it elsewhere # self.globalPlannerGridDynamic.loadKeepoutGrid("./../harsh/src/maps/perfect_map_01.yaml", ranges, start_point=start_meters, goal_point=goal_meters) strPlanTopic = "/plan/global_planner_a_star_grid_dynamic" self.grid_dynamic_plan_publisher = self.create_publisher(Path, self.strSlashRobotName + strPlanTopic, 1) # --- self.globalPlannerGraph = PlannerAStarGraph(nResizeTo=1000) strPlanTopic = "/plan/global_planner_a_star_graph" self.graph_plan_publisher = self.create_publisher(Path, self.strSlashRobotName + strPlanTopic, 1) strRoadsGraphFileName = "./../harsh/src/utils_commercial/graphs/perfect_topo_color_05.json" strYamlImageFilePath = "./../harsh/src/maps/perfect_map_01.yaml" self.globalPlannerGraph.loadGraph(strRoadsGraphFileName, strYamlImageFilePath) # Last path that was calculated self.path = None # --- qos_profile = QoSProfile( reliability=QoSReliabilityPolicy.BEST_EFFORT, history=QoSHistoryPolicy.KEEP_LAST, depth=10, # Adjust the depth according to your needs durability=QoSDurabilityPolicy.VOLATILE # You can use TRANSIENT_LOCAL for durability ) # --- Subscribe to Localization info: used to find the position FROM which we have to # build a path self.arrKalmanState = None self.kalman_subscriber = self.create_subscription( Float32MultiArray, self.strSlashRobotName + '/kalman_pose', self.kalman_callback, qos_profile = qos_profile ) # --- Subscribing to commands Coordinator class is sending self.command_subscriber = self.create_subscription( String, self.strSlashRobotName + "/coordinator/command", self.command_callback, 10) # --- Run robot activities by timer timer_period = 0.1 self.timer = self.create_timer(timer_period, self.timer_callback) self.prev_time = self.get_clock().now() # --- print("*** Robot.init(): done; robot_name: '", robot_name, "'", flush=True) # --- # Path planners create path between centers of cells (grid) or between nodes (graph). # These points are far from each other, while we want path points to be dense, for example, it is required by path follower # algorithm. So we use straight line path planner to fill the gaps. def fillPosesLinear(self, start_pose, arrPath, path): pose_prev = start_pose for pt in arrPath: pose = PoseStamped() pose.pose.position.x = float(pt[0]) pose.pose.position.y = float(pt[1]) dx = pose.pose.position.x - pose_prev.pose.position.x dy = pose.pose.position.y - pose_prev.pose.position.y path = self.globalPathPlannerStraightLine.generateStraightLinePath(dx, dy, path, pose_prev) pose_prev = pose # --- Depending on the strPathPlanner parameter, this function uses the # corresponding path planner. Note that the structure of the code is pretty much thre # same for all planners. # Also note that we pass coordinates (in meters) for start and goal poses. # Accordingly, in case of a graph algorithm, we use dynamic grid algorithm to drive to a # closest graph node, and only then use graph algorithm to navigate on the road. # After we arrive, we use dynamic grid again to get from the road to the destination. def getPath(self, start_pose, goal_pose, strPathPlanner, bVisualize, yaml): print("*** getPath():", strPathPlanner, flush=True) path = Path() if(strPathPlanner == "global_planner_straight_line"): path.poses.append(start_pose) dx = goal_pose.pose.position.x - start_pose.pose.position.x dy = goal_pose.pose.position.y - start_pose.pose.position.y path = self.globalPathPlannerStraightLine.generateStraightLinePath(dx, dy, path, start_pose) if(bVisualize): drawPath(path, "./../harsh/src/maps/perfect_map_01.yaml", yaml, None, None) self.path = path self.straight_line_plan_publisher.publish(path) elif(strPathPlanner == "global_planner_a_star_grid"): path.poses.append(start_pose) arrPath = self.globalPlannerGrid.get_astar_grid_path((start_pose.pose.position.x, start_pose.pose.position.y), (goal_pose.pose.position.x, goal_pose.pose.position.y), 32) self.fillPosesLinear(start_pose, arrPath, path) if(bVisualize): drawPath(path, "./../harsh/src/maps/perfect_map_01.yaml", yaml, self.globalPlannerGrid.drawDecorations, None) self.path = path self.grid_plan_publisher.publish(path) elif(strPathPlanner == "global_planner_a_star_grid_dynamic"): path.poses.append(start_pose) self.globalPlannerGridDynamic.bProfiling = True arrPath = self.globalPlannerGridDynamic.get_astar_grid_path_dynamic( (start_pose.pose.position.x, -start_pose.pose.position.y), (goal_pose.pose.position.x, goal_pose.pose.position.y)) self.fillPosesLinear(start_pose, arrPath, path) if(bVisualize): drawPath(path, "./../harsh/src/maps/perfect_map_01.yaml", yaml, self.globalPlannerGridDynamic.drawDecorations, self.globalPlannerGridDynamic.drawTopDecorations) self.path = path self.grid_dynamic_plan_publisher.publish(path) elif(strPathPlanner == "global_planner_a_star_graph"): path.poses.append(start_pose) nStartNodeIdx = self.globalPlannerGraph.getClosestNode(start_pose) print(">>> Node closest to start pose:", nStartNodeIdx, flush=True) nGoalNodeIdx = self.globalPlannerGraph.getClosestNode(goal_pose) print(">>> Node closest to goal pose:", nGoalNodeIdx, flush=True) # --- arrPath = [] # --- bPlanLastMile = True if(bPlanLastMile): start_node_pt = self.globalPlannerGraph.nodes[nStartNodeIdx] pose = Pose() pose.position.x = start_node_pt[0] pose.position.y = start_node_pt[1] pose.position.z = 0. d = getDistanceBetweenPoses(start_pose.pose, pose) if(d > 2.0): arrPath = self.globalPlannerGridDynamic.get_astar_grid_path_dynamic( (start_pose.pose.position.x, -start_pose.pose.position.y), (pose.position.x, -pose.position.y)) # --- arrNodes = self.globalPlannerGraph.get_astar_graph_path (nStartNodeIdx, nGoalNodeIdx, treshold=63) for i in range(len(arrNodes)): p_node = self.globalPlannerGraph.nodes[arrNodes[i]] arrPath.append(p_node) # --- bPlanLastMile = True if(bPlanLastMile): goal_node_pt = self.globalPlannerGraph.nodes[nGoalNodeIdx] pose = Pose() pose.position.x = goal_node_pt[0] pose.position.y = goal_node_pt[1] pose.position.z = 0. d = getDistanceBetweenPoses(goal_pose.pose, pose) if(d > 2.0): arrPath = arrPath + self.globalPlannerGridDynamic.get_astar_grid_path_dynamic((pose.position.x, -pose.position.y), (goal_pose.pose.position.x, -goal_pose.pose.position.y)) # --- self.fillPosesLinear(start_pose, arrPath, path) if(bVisualize): drawPath(path, "./../harsh/src/maps/perfect_map_01.yaml", yaml, self.globalPlannerGraph.drawDecorations, self.globalPlannerGraph.drawTopDecorations) self.path = path self.graph_plan_publisher.publish(path) return path # --- Here we do not do anything, this is just something for the future. def plan_callback(self, msg: Path): self.path = msg # print("*** Received straight line path: ", flush=True) # i = 0 # for pose in self.path.poses: # print(f"\t*** {i}. x: {pose.pose.position.x} y: {pose.pose.position.y}", flush=True) # i += 1 # # Access path data # frame_id = msg.header.frame_id # poses = msg.poses # List of PoseStamped # for pose in poses: # x = pose.pose.position.x # y = pose.pose.position.y # z = pose.pose.position.z # # Access orientation if needed # qx = pose.pose.orientation.x # qy = pose.pose.orientation.y # qz = pose.pose.orientation.z # qw = pose.pose.orientation.w # --- As we receive a command, plan the path from current position to the goal def command_callback(self, msg): command_data = json.loads(msg.data) strCommand = command_data["command"] ptGoal = command_data["pose"] strPathPlanner = command_data["planner"] if(strCommand == "goToPose"): bVisualize = True if(strPathPlanner == "global_planner_straight_line" or strPathPlanner == "global_planner_a_star_grid" or strPathPlanner == "global_planner_a_star_grid_dynamic" or strPathPlanner == "global_planner_a_star_graph"): start = PoseStamped() start.pose.position.x = self.arrKalmanState[0] start.pose.position.y = self.arrKalmanState[1] goal = PoseStamped() goal.pose.position.x = ptGoal[0] goal.pose.position.y = ptGoal[1] self.getPath(start, goal, strPathPlanner, bVisualize, None) # --- Store data from Localization message def kalman_callback(self, msg): self.arrKalmanState = msg.data # print(f"Position: x={x:.2f}, y={y:.2f}, z={z:.2f}, roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}", flush=True) # --- Again, just something to use in future, if necessary def timer_callback(self): pass # ------ def main(args=None): print("*** Robot - main()", flush=True) robot_name = "" for i, arg in enumerate(sys.argv): if(arg == 'robot_name'): try: robot_name = sys.argv[i + 1] except IndexError: print("Error: robot_name argument requires a value", flush=True) return elif(arg == 'global_path_planner'): try: strGlobalPathPlanner = sys.argv[i + 1] except IndexError: print("Error: global_path_planner argument requires a value", flush=True) return rclpy.init(args=args) robot = Robot(robot_name) executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(robot) try: executor.spin() except KeyboardInterrupt: pass finally: executor.shutdown() robot.destroy_node() rclpy.shutdown() # --- if __name__ == '__main__': main()
With Robot class, we can use ROS2 and Gazebo simulator. Here is a result for one of the planners: