Go to battery charger

Quick and (very) dirty solution

In the "Links" section, you can see some Internet gurus implementing one of the possible approaches. Let's take a closer look at what aurtors are doing, as well as at possible alternatives.

The code uses the BasicNavigator class (same as in Waypoint Follower example in one of the earlier sections). In addition to BasicNavigator class functionality, we need to be able to run an infinite cycle (navigate through the list of waypoints one time after another) and to switch to battery charging and back.

To do it, we can create another class that has the "patrolling" (wrapping waypoint follower into an infinite loop) and "charging" functionality, and has a BasicNavigator class as a member variable:

                    class ChargerNavigator(Node):
                        def __init__(self):
                            super().__init__('ChargerNavigator')
                            ...
                            self.navigator = BasicNavigator()
                

Then we need to run the infinite loop, that handles all the navigation, and recreates route when goal is reached. In the "waypoint follower" example earlier, we have handled it by simply placing the "while" cycle in the main function.
waypoint_follower.py:

                    def main():
                        rclpy.init()
                
                        navigator = BasicNavigator()
                
                        navigator.waitUntilNav2Active()
                
                        # set our demo's goal poses to follow
                        goal_poses = []
                        goal_pose1 = PoseStamped()
                        goal_pose1.header.frame_id = 'map'
                        goal_pose1.header.stamp = navigator.get_clock().now().to_msg()
                        goal_pose1.pose.position.x = 1.0
                        goal_pose1.pose.position.y = 2.15
                        goal_pose1.pose.orientation.w = 1.0
                        goal_pose1.pose.orientation.z = 0.0
                        goal_poses.append(goal_pose1)
                
                        # additional goals can be appended
                        goal_pose2 = PoseStamped()
                        goal_pose2.header.frame_id = 'map'
                        goal_pose2.header.stamp = navigator.get_clock().now().to_msg()
                        goal_pose2.pose.position.x = 1.5
                        goal_pose2.pose.position.y = -3.75
                        goal_pose2.pose.orientation.w = 0.707
                        goal_pose2.pose.orientation.z = 0.707
                        goal_poses.append(goal_pose2)

                        ...
                
                        navigator.followWaypoints(goal_poses)
    
                        while not navigator.isNavComplete():
                            ...
                

There was nothing wrong with this approach earlier, but now we need to perform an extra step: we need to subscribe to a ROS2 battery level message and change code's behavior when battery is discharged. Here is the problem.

See, an infinite loop running in the main thread is a blocking thing and a second function, one that handles subscription to battery info, will never be called.

To overcome this obstacle, people usually use the MultiThreadedExecutor. First, we need to create a second class (that is also node-based):

                    import time  # Time library
    
                    from rclpy.duration import Duration
                    import rclpy                             
                    from rclpy.node import Node
                    from rclpy.executors import MultiThreadedExecutor
                    from basic_navigator import BasicNavigator, NavigationResult
                    from geometry_msgs.msg import PoseStamped
                    from geometry_msgs.msg import Twist      
                    from sensor_msgs.msg import BatteryState 
                    import time
                
                    class BatteryStatusPublisher(Node):
                        def __init__(self):
                            super().__init__('BatteryStatusPublisher')
                            self.subscription_battery_status_not_used_except_for_scope = self.create_subscription(
                                BatteryState,
                                '/battery_status',
                                self.get_battery_status,
                                10)
                    
                        def get_battery_status(self, msg: BatteryState):
                            self.nBatteryLevel = msg.percentage
                            print("Battery at", self.nBatteryLevel, "%")

                

Let me also provide the complete ChargerNavigator code (though I never tested it properly, so do not be surprised if you get a negative battery level). This code is mostly copied from "Links" below, it should work, but I do not recommend this approach. Later I will explain why.

                    
                

To call this code, we can use the following "main" function:

                    import time # Time library
                    
                    from geometry_msgs.msg import PoseStamped # Pose with ref frame and timestamp
                    from rclpy.duration import Duration # Handles time for ROS 2
                    import rclpy # Python client library for ROS 2
                    
                    from basic_navigator import BasicNavigator, NavigationResult # Helper module
                    from charger_navigator import ChargerNavigator #, BatteryStatusPublisher
                    
                    #from rclpy.executors import MultiThreadedExecutor
                    from rclpy.executors import SingleThreadedExecutor
                    
                    def main(args=None):
                    
                        try:  
                            rclpy.init(args=args)
                            
                            navigator = ChargerNavigator()
                            batteryStatusPublisher = BatteryStatusPublisher()
                    
                            executor = MultiThreadedExecutor(num_threads=4)
                            executor.add_node(navigator)
                            executor.add_node(batteryStatusPublisher)
                    
                            try:
                                executor.spin()
                            finally:
                                executor.shutdown()
                                navigator.destroy_node()
                    
                        finally:
                            rclpy.shutdown()    
                    
                    
                    if __name__ == '__main__':
                    main()
                

Let me repeat: this code works. But... what is it doing? We (well, not we - they!) create a ChargerNavigator class, that has a timer in its init() function. Then we use a blocking call, and to avoid consequences, we start another node in another thread! Ugly. And not just ugly: if something goes wrong, it is going to be very difficult to control resources leaking.

(C) snowcron.com, all rights reserved

Please read the disclaimer