CreOS SDK
The CreOS SDK allows you to interact with Avular robots
 
Loading...
Searching...
No Matches
Hello World (ROS2 Python)

Hello World example using ROS2 in Python to communicate with the CreOS SDK ROS2 node.

This example demonstrates how to use ROS2 in Python to communicate with the CreOS SDK ROS2 node. Below you can find the full source code. You can also find a more detailed explanation of the code.

Source code

main.py

1
2import rclpy
3from rclpy.node import Node
4from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
5from geometry_msgs.msg import Twist
6from sensor_msgs.msg import NavSatFix, NavSatStatus
7from creos_sdk_msgs.srv import EnterLowPowerMode
8
10class HelloWorldNode(Node):
11 def __init__(self):
12 super().__init__('hello_world_node')
13
15 self.publisher = self.create_publisher(Twist, 'cmd_vel', QoSProfile(
16 depth=10,
17 reliability=ReliabilityPolicy.BEST_EFFORT,
18 durability=DurabilityPolicy.VOLATILE))
19 self.publish_timer = self.create_timer(1, self.publish_callback)
20
22 self.subscription = self.create_subscription(NavSatFix, 'gnss', self.gps_callback, QoSProfile(
23 depth=10,
24 reliability=ReliabilityPolicy.BEST_EFFORT,
25 durability=DurabilityPolicy.VOLATILE))
26
28 self.client = self.create_client(EnterLowPowerMode, 'enter_low_power_mode')
29
30 powermode_request = EnterLowPowerMode.Request()
31
32 powermode_future = self.client.call_async(powermode_request)
33
34 powermode_future.add_done_callback(self.powermode_callback)
35
37 def publish_callback(self):
38 msg = Twist()
39 msg.linear.x = 0.5
40 msg.angular.z = 0.1
41 self.publisher.publish(msg)
42
44 def gps_callback(self, msg: NavSatFix):
45 status = 'UNKNOWN'
46 match msg.status.status:
47 case NavSatStatus.STATUS_NO_FIX:
48 status = 'NO_FIX'
49 case NavSatStatus.STATUS_FIX:
50 status = 'FIX'
51 case NavSatStatus.STATUS_SBAS_FIX:
52 status = 'SBAS_FIX'
53 case NavSatStatus.STATUS_GBAS_FIX:
54 status = 'GBAS_FIX'
55 case _:
56 status = 'UNKNOWN'
57 self.get_logger().info("Received GNSS message with status: %s" % status)
58
60 def powermode_callback(self, future):
61 try:
62 response = future.result()
63 self.get_logger().info("Received response from EnterLowPowerMode service: %s" % response.success)
64 except Exception as e:
65 self.get_logger().error("Service call failed %r" % (e,))
66
68def main():
69 rclpy.init()
70 node = HelloWorldNode()
71 rclpy.spin(node)
72
73
74if __name__ == '__main__':
75 main()
76
service definitions of the creos_sdk_msgs package
message definitions of the geometry_msgs package
Definition accel.hpp:8
message definitions of the sensor_msgs package
Definition battery_state.hpp:8

Explanation

At the top of the main.py file, we import the necessary modules for the example. Specifically, we import some rclpy modules to interact with ROS2, and the necessary messages packages to communicate with the CreOS SDK ROS2 node. These also include the EnterLowPowerMode service type from the creos_sdk_msgs package.

2import rclpy
3from rclpy.node import Node
4from rclpy.qos import QoSProfile, ReliabilityPolicy, DurabilityPolicy
5from geometry_msgs.msg import Twist
6from sensor_msgs.msg import NavSatFix, NavSatStatus
7from creos_sdk_msgs.srv import EnterLowPowerMode

For this example, we create a HelloWorldNode class that extends from rclpy.node.Node. Most of our implementation will be inside this class' constructor.

10class HelloWorldNode(Node):
11 def __init__(self):
12 super().__init__('hello_world_node')

We create a publisher that publishes geometry_msgs.msg.Twist messages to the cmd_vel topic. This message contains the linear and angular velocity of the robot. We also create a timer that triggers every second. Note that we match the Quality of Service settings with the settings used by the CreOS SDK ROS2 node, as specified in the Input Topics section.

15 self.publisher = self.create_publisher(Twist, 'cmd_vel', QoSProfile(
16 depth=10,
17 reliability=ReliabilityPolicy.BEST_EFFORT,
18 durability=DurabilityPolicy.VOLATILE))
19 self.publish_timer = self.create_timer(1, self.publish_callback)

In the callback of the timer, we create a geometry_msgs.msg.Twist message and publish it.

37 def publish_callback(self):
38 msg = Twist()
39 msg.linear.x = 0.5
40 msg.angular.z = 0.1
41 self.publisher.publish(msg)

Next, we create a subscriber that subscribes to the gnss topic. Note that we also match the Quality of Service settings here.

22 self.subscription = self.create_subscription(NavSatFix, 'gnss', self.gps_callback, QoSProfile(
23 depth=10,
24 reliability=ReliabilityPolicy.BEST_EFFORT,
25 durability=DurabilityPolicy.VOLATILE))

In the callback of the subscriber, we extract the NavSatStatus from the received message, and print the status in a human-readable format.

44 def gps_callback(self, msg: NavSatFix):
45 status = 'UNKNOWN'
46 match msg.status.status:
47 case NavSatStatus.STATUS_NO_FIX:
48 status = 'NO_FIX'
49 case NavSatStatus.STATUS_FIX:
50 status = 'FIX'
51 case NavSatStatus.STATUS_SBAS_FIX:
52 status = 'SBAS_FIX'
53 case NavSatStatus.STATUS_GBAS_FIX:
54 status = 'GBAS_FIX'
55 case _:
56 status = 'UNKNOWN'
57 self.get_logger().info("Received GNSS message with status: %s" % status)

We also create a service client that calls the enter_low_power_mode service. We immediately use this client to send an asynchronous request to the service, and add a done callback to returned future.

28 self.client = self.create_client(EnterLowPowerMode, 'enter_low_power_mode')
29
30 powermode_request = EnterLowPowerMode.Request()
31
32 powermode_future = self.client.call_async(powermode_request)
33
34 powermode_future.add_done_callback(self.powermode_callback)

In the done callback, we print whether the transition to low-power mode was successful. If any exception has happened during the request, we print the failure to the console as well.

60 def powermode_callback(self, future):
61 try:
62 response = future.result()
63 self.get_logger().info("Received response from EnterLowPowerMode service: %s" % response.success)
64 except Exception as e:
65 self.get_logger().error("Service call failed %r" % (e,))

In the main function, we simply create a new instance of our HelloWorldNode class and spin the node.

68def main():
69 rclpy.init()
70 node = HelloWorldNode()
71 rclpy.spin(node)
72
73
74if __name__ == '__main__':
75 main()