In the "Links" section, you can see some Internet gurus implementing one of the possible approaches. Let's take a closer look at what aurtors are doing, as well as at possible alternatives.
The code uses the BasicNavigator class (same as in Waypoint Follower example in one of the earlier sections). In addition to BasicNavigator class functionality, we need to be able to run an infinite cycle (navigate through the list of waypoints one time after another) and to switch to battery charging and back.
To do it, we can create another class that has the "patrolling" (wrapping waypoint follower into an infinite loop) and "charging" functionality, and has a BasicNavigator class as a member variable:
class ChargerNavigator(Node): def __init__(self): super().__init__('ChargerNavigator') ... self.navigator = BasicNavigator()
Then we need to run the infinite loop, that handles all the navigation,
and recreates route when goal is reached. In the "waypoint follower"
example earlier, we have handled it by simply placing the "while" cycle
in the main function.
waypoint_follower.py:
def main(): rclpy.init() navigator = BasicNavigator() navigator.waitUntilNav2Active() # set our demo's goal poses to follow goal_poses = [] goal_pose1 = PoseStamped() goal_pose1.header.frame_id = 'map' goal_pose1.header.stamp = navigator.get_clock().now().to_msg() goal_pose1.pose.position.x = 1.0 goal_pose1.pose.position.y = 2.15 goal_pose1.pose.orientation.w = 1.0 goal_pose1.pose.orientation.z = 0.0 goal_poses.append(goal_pose1) # additional goals can be appended goal_pose2 = PoseStamped() goal_pose2.header.frame_id = 'map' goal_pose2.header.stamp = navigator.get_clock().now().to_msg() goal_pose2.pose.position.x = 1.5 goal_pose2.pose.position.y = -3.75 goal_pose2.pose.orientation.w = 0.707 goal_pose2.pose.orientation.z = 0.707 goal_poses.append(goal_pose2) ... navigator.followWaypoints(goal_poses) while not navigator.isNavComplete(): ...
There was nothing wrong with this approach earlier, but now we need to perform an extra step: we need to subscribe to a ROS2 battery level message and change code's behavior when battery is discharged. Here is the problem.
See, an infinite loop running in the main thread is a blocking thing and a second function, one that handles subscription to battery info, will never be called.
To overcome this obstacle, people usually use the MultiThreadedExecutor. First, we need to create a second class (that is also node-based):
import time # Time library from rclpy.duration import Duration import rclpy from rclpy.node import Node from rclpy.executors import MultiThreadedExecutor from basic_navigator import BasicNavigator, NavigationResult from geometry_msgs.msg import PoseStamped from geometry_msgs.msg import Twist from sensor_msgs.msg import BatteryState import time class BatteryStatusPublisher(Node): def __init__(self): super().__init__('BatteryStatusPublisher') self.subscription_battery_status_not_used_except_for_scope = self.create_subscription( BatteryState, '/battery_status', self.get_battery_status, 10) def get_battery_status(self, msg: BatteryState): self.nBatteryLevel = msg.percentage print("Battery at", self.nBatteryLevel, "%")
Let me also provide the complete ChargerNavigator code (though I never tested it properly, so do not be surprised if you get a negative battery level). This code is mostly copied from "Links" below, it should work, but I do not recommend this approach. Later I will explain why.
To call this code, we can use the following "main" function:
import time # Time library from geometry_msgs.msg import PoseStamped # Pose with ref frame and timestamp from rclpy.duration import Duration # Handles time for ROS 2 import rclpy # Python client library for ROS 2 from basic_navigator import BasicNavigator, NavigationResult # Helper module from charger_navigator import ChargerNavigator #, BatteryStatusPublisher #from rclpy.executors import MultiThreadedExecutor from rclpy.executors import SingleThreadedExecutor def main(args=None): try: rclpy.init(args=args) navigator = ChargerNavigator() batteryStatusPublisher = BatteryStatusPublisher() executor = MultiThreadedExecutor(num_threads=4) executor.add_node(navigator) executor.add_node(batteryStatusPublisher) try: executor.spin() finally: executor.shutdown() navigator.destroy_node() finally: rclpy.shutdown() if __name__ == '__main__': main()
Let me repeat: this code works. But... what is it doing? We (well, not we - they!) create a ChargerNavigator class, that has a timer in its init() function. Then we use a blocking call, and to avoid consequences, we start another node in another thread! Ugly. And not just ugly: if something goes wrong, it is going to be very difficult to control resources leaking.