moveit2
The MoveIt Motion Planning Framework for ROS 2.
Loading...
Searching...
No Matches
rosout_publish_test.py
Go to the documentation of this file.
1# Software License Agreement (BSD License)
2#
3# Copyright (c) 2023, PickNik Robotics Inc.
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9#
10# # Redistributions of source code must retain the above copyright
11# notice, this list of conditions and the following disclaimer.
12# # Redistributions in binary form must reproduce the above
13# copyright notice, this list of conditions and the following
14# disclaimer in the documentation and/or other materials provided
15# with the distribution.
16# # Neither the name of Willow Garage nor the names of its
17# contributors may be used to endorse or promote products derived
18# from this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31# POSSIBILITY OF SUCH DAMAGE.
32
33# Author: Tyler Weaver
34
35import unittest
36from threading import Event
37from threading import Thread
38
39import launch
40import launch_ros
41import launch_testing
42import rclpy
43from rclpy.node import Node
44from rcl_interfaces.msg import Log
45
46import pytest
47
48
49@pytest.mark.launch_test
51 # dut: device under test
52 dut_process = launch_ros.actions.Node(
53 package="moveit_core",
54 executable=launch.substitutions.LaunchConfiguration("dut"),
55 output="screen",
56 )
57
58 return launch.LaunchDescription(
59 [
60 launch.actions.DeclareLaunchArgument(
61 "dut",
62 description="Executable to launch",
63 ),
64 dut_process,
65 # Start tests right away - no need to wait for anything
66 launch_testing.actions.ReadyToTest(),
67 ]
68 ), {"dut_process": dut_process}
69
70
72 def __init__(self, name="rosout_observer_node"):
73 super().__init__(name)
74 self.msg_event_object = Event()
75
77 # Create a subscriber
78 self.subscription = self.create_subscription(
80 )
81
82 # Add a spin thread
83 self.ros_spin_thread = Thread(
84 target=lambda node: rclpy.spin(node), args=(self,)
85 )
86 self.ros_spin_thread.start()
87
88 def subscriber_callback(self, data):
89 self.msg_event_object.set()
90
91
92# These tests will run concurrently with the dut process. After all these tests are done,
93# the launch system will shut down the processes that it started up
94class TestFixture(unittest.TestCase):
95 def test_rosout_msgs_published(self, proc_output):
96 rclpy.init()
97 try:
99 node.start_subscriber()
100 msgs_received_flag = node.msg_event_object.wait(timeout=5.0)
101 assert msgs_received_flag, "Did not receive msgs !"
102 finally:
103 rclpy.shutdown()
104
105
106@launch_testing.post_shutdown_test()
107class TestProcessOutput(unittest.TestCase):
108 def test_exit_code(self, proc_info):
109 # Check that all processes in the launch (in this case, there's just one) exit
110 # with code 0
111 launch_testing.asserts.assertExitCodes(proc_info)
__init__(self, name="rosout_observer_node")
test_rosout_msgs_published(self, proc_output)