moveit2
The MoveIt Motion Planning Framework for ROS 2.
Loading...
Searching...
No Matches
srdf_publisher_test.py
Go to the documentation of this file.
1# Software License Agreement (BSD License)
2#
3# Copyright (c) 2024, PickNik Robotics Inc.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9#
10# # Redistributions of source code must retain the above copyright
11# notice, this list of conditions and the following disclaimer.
12# # Redistributions in binary form must reproduce the above
13# copyright notice, this list of conditions and the following
14# disclaimer in the documentation and/or other materials provided
15# with the distribution.
16# # Neither the name of PickNik Robotics Inc. nor the names of its
17# contributors may be used to endorse or promote products derived
18# from this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31# POSSIBILITY OF SUCH DAMAGE.
32
33# Author: Tyler Weaver
34
35import unittest
36from threading import Event
37from threading import Thread
38
39import launch
40import launch_ros
41import launch_testing
42import rclpy
43from rclpy.node import Node
44from rclpy.qos import (
45 QoSProfile,
46 QoSReliabilityPolicy,
47 QoSDurabilityPolicy,
48)
49from std_msgs.msg import String
50
51import pytest
52
53
54@pytest.mark.launch_test
56 srdf_publisher = launch_ros.actions.Node(
57 package="moveit_ros_planning",
58 parameters=[{"robot_description_semantic": "test value"}],
59 executable="srdf_publisher",
60 output="screen",
61 )
62
63 return launch.LaunchDescription(
64 [
65 srdf_publisher,
66 # Start tests right away - no need to wait for anything
67 launch_testing.actions.ReadyToTest(),
68 ]
69 ), {"srdf_publisher": srdf_publisher}
70
71
72class SrdfObserverNode(Node):
73 def __init__(self, name="srdf_observer_node"):
74 super().__init__(name)
75 self.msg_event_object = Event()
76 self.srdf = ""
77
79 # Create a subscriber
80 self.subscription = self.create_subscription(
81 String,
82 "robot_description_semantic",
84 qos_profile=QoSProfile(
85 reliability=QoSReliabilityPolicy.RELIABLE,
86 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
87 depth=1,
88 ),
89 )
90
91 # Add a spin thread
92 self.ros_spin_thread = Thread(
93 target=lambda node: rclpy.spin(node), args=(self,)
94 )
95 self.ros_spin_thread.start()
96
97 def subscriber_callback(self, msg):
98 self.srdf = msg.data
99 self.msg_event_object.set()
100
101
102# These tests will run concurrently with the dut process. After all these tests are done,
103# the launch system will shut down the processes that it started up
104class TestFixture(unittest.TestCase):
105 def test_rosout_msgs_published(self, proc_output):
106 rclpy.init()
107 try:
108 node = SrdfObserverNode()
109 node.start_subscriber()
110 msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
111 assert (
112 msgs_received_flag
113 ), "Did not receive message on `/robot_description_semantic`!"
114 assert node.srdf == "test value"
115 finally:
116 rclpy.shutdown()
117
118
119@launch_testing.post_shutdown_test()
120class TestProcessOutput(unittest.TestCase):
121 def test_exit_code(self, proc_info):
122 # Check that all processes in the launch (in this case, there's just one) exit
123 # with code 0
124 launch_testing.asserts.assertExitCodes(proc_info)
__init__(self, name="srdf_observer_node")
test_rosout_msgs_published(self, proc_output)