moveit2
The MoveIt Motion Planning Framework for ROS 2.
Loading...
Searching...
No Matches
test_trajectory_cache.py
Go to the documentation of this file.
1# Copyright 2024 Intrinsic Innovation LLC.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# Author: methylDragon
16
17from ament_index_python.packages import get_package_share_directory
18from launch import LaunchDescription
19from launch.actions import ExecuteProcess
20from launch_pytest.tools import process as process_tools
21from launch_ros.actions import Node
22
23from moveit_configs_utils import MoveItConfigsBuilder
24
25import launch_pytest
26
27import pytest
28import os
29
30# This would have been a gtest, but we need a move_group instance, which
31# requires some parameters loaded and a separate node started.
32
33
34@pytest.fixture
36 return (
37 MoveItConfigsBuilder("moveit_resources_panda")
38 .robot_description(file_path="config/panda.urdf.xacro")
39 .planning_pipelines("ompl", ["ompl"])
40 .trajectory_execution(file_path="config/gripper_moveit_controllers.yaml")
41 .to_moveit_configs()
42 )
43
44
45# We need this so the move_group has something to interact with
46@pytest.fixture
47def robot_fixture(moveit_config):
48 run_move_group_node = Node(
49 package="moveit_ros_move_group",
50 executable="move_group",
51 output="log",
52 parameters=[moveit_config.to_dict()],
53 )
54
55 static_tf = Node(
56 package="tf2_ros",
57 executable="static_transform_publisher",
58 output="log",
59 arguments=["0.0", "0.0", "0.0", "0.0", "0.0", "0.0", "world", "panda_link0"],
60 )
61
62 robot_state_publisher = Node(
63 package="robot_state_publisher",
64 executable="robot_state_publisher",
65 name="robot_state_publisher",
66 output="log",
67 parameters=[moveit_config.robot_description],
68 )
69
70 # ros2_control using FakeSystem as hardware
71 ros2_controllers_path = os.path.join(
72 get_package_share_directory("moveit_resources_panda_moveit_config"),
73 "config",
74 "ros2_controllers.yaml",
75 )
76 ros2_control_node = Node(
77 package="controller_manager",
78 executable="ros2_control_node",
79 parameters=[moveit_config.robot_description, ros2_controllers_path],
80 output="log",
81 )
82
83 load_controllers = []
84 for controller in [
85 "panda_arm_controller",
86 "panda_hand_controller",
87 "joint_state_broadcaster",
88 ]:
89 load_controllers += [
90 ExecuteProcess(
91 cmd=["ros2 run controller_manager spawner {}".format(controller)],
92 shell=True,
93 output="log",
94 )
95 ]
96
97 return [
98 run_move_group_node,
99 static_tf,
100 robot_state_publisher,
101 ros2_control_node,
102 *load_controllers,
103 ]
104
105
106@pytest.fixture
108 return Node(
109 package="moveit_ros_trajectory_cache",
110 executable="test_trajectory_cache",
111 name="test_trajectory_cache_node",
112 output="screen",
113 cached_output=True,
114 parameters=[moveit_config.to_dict()],
115 )
116
117
118@launch_pytest.fixture
119def launch_description(trajectory_cache_test_runner_node, robot_fixture):
120 return LaunchDescription(
121 robot_fixture
122 + [trajectory_cache_test_runner_node, launch_pytest.actions.ReadyToTest()]
123 )
124
125
126def validate_stream(expected):
127 def wrapped(output):
128 assert (
129 expected in output.splitlines()
130 ), f"Did not get expected: {expected} in output:\n\n{output}"
131
132 return wrapped
133
134
135@pytest.mark.launch(fixture=launch_description)
136def test_all_tests_pass(trajectory_cache_test_runner_node, launch_context):
137 # Check for occurrences of [PASS] in output
138 assert process_tools.wait_for_output_sync(
139 launch_context,
140 trajectory_cache_test_runner_node,
141 lambda x: x.count("[PASS]") == 169, # All test cases passed.
142 timeout=30,
143 )
144
145 # Check no occurrences of [FAIL] in output
146 assert not process_tools.wait_for_output_sync(
147 launch_context,
148 trajectory_cache_test_runner_node,
149 lambda x: "[FAIL]" in x,
150 timeout=10,
151 )
152
153 yield
trajectory_cache_test_runner_node(moveit_config)
launch_description(trajectory_cache_test_runner_node, robot_fixture)
test_all_tests_pass(trajectory_cache_test_runner_node, launch_context)