moveit2
The MoveIt Motion Planning Framework for ROS 2.
Loading...
Searching...
No Matches
policy.py
Go to the documentation of this file.
1# Software License Agreement (BSD License)
2#
3# Copyright (c) 2023, Peter David Fagan
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions
8# are met:
9#
10# * Redistributions of source code must retain the above copyright
11# notice, this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above
13# copyright notice, this list of conditions and the following
14# disclaimer in the documentation and/or other materials provided
15# with the distribution.
16# * Neither the name of Willow Garage, Inc. nor the names of its
17# contributors may be used to endorse or promote products derived
18# from this software without specific prior written permission.
19#
20# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
24# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
26# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
28# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31# POSSIBILITY OF SUCH DAMAGE.
32#
33# Author: Peter David Fagan
34
35"""
36Definition of an abstract base class for policy deployment.
37For now, this policy only supports moveit_servo command interfaces and Image sensors.
38"""
39
40from abc import ABC, abstractmethod
41
42from rclpy.node import Node
43from rclpy.qos import QoSProfile
44
45from control_msgs.msg import JointJog
46from geometry_msgs.msg import PoseStamped, Twist
47from sensor_msgs.msg import Image
48
49from std_srvs.srv import SetBool
50
51import message_filters
52
53
54class Policy(ABC, Node):
55 """An abstract base class for deploying learnt policies."""
56
57 def __init__(self, params, node_name="policy_node"):
58 """Initialise the policy."""
59 super().__init__(node_name)
60 self.logger = self.get_logger()
61
62 # parse parameters
63 self.param_listener = params.ParamListener(self)
64 self.params = self.param_listener.get_params()
65
66 # set policy to inactive by default
67 self._is_active = False
68 self.activate_policy_service = self.create_service(
69 SetBool,
70 "activate_policy",
72 )
73
74 # register sensor topics
75 self.register_sensors()
76
77 # register servo command topic
78 self.register_command()
79
80 @property
81 def is_active(self):
82 """Returns True if the policy is active."""
83 return self._is_active
84
85 @is_active.setter
86 def active(self, value):
87 """Sets the policy to active state via Python API."""
88 self._is_active = value
89
90 def activate_policy(self, request, response):
91 """Sets the policy to active state via ROS interface."""
92 self._is_active = request.data
93 return response
94
95 def get_sensor_msg_type(self, msg_type):
96 """Returns the ROS 2 message type for a given sensor type."""
97 if msg_type == "sensor_msgs/Image":
98 return Image
99 else:
100 raise ValueError(f"Sensor type {msg_type} not supported.")
101
102 def get_command_msg_type(self, msg_type):
103 """Returns the ROS 2 message type for a given command type."""
104 if msg_type == "geometry_msgs/PoseStamped":
105 return PoseStamped
106 elif msg_type == "geometry_msgs/Twist":
107 return Twist
108 elif msg_type == "control_msgs/JointJog":
109 return JointJog
110 else:
111 raise ValueError(f"Command type {msg_type} not supported.")
112
114 """Register the topics to listen to for sensor data."""
115 self.sensor_subs = []
116 # TODO: refactor this section
117 # Related Issue: https://github.com/PickNikRobotics/generate_parameter_library/issues/155
118 for sensor_idx in range(self.params.num_sensors):
119 sensor_params = self.get_parameters_by_prefix(f"sensor{sensor_idx + 1}")
120 self.sensor_subs.append(
121 message_filters.Subscriber(
122 self,
123 self.get_sensor_msg_type(sensor_params["type"].value),
124 str(sensor_params["topic"].value),
125 qos_profile=QoSProfile(depth=sensor_params["qos"].value),
126 )
127 )
128
129 # create a synchronizer for the sensor topics
130 self.sensor_sync = message_filters.ApproximateTimeSynchronizer(
131 self.sensor_subs,
132 self.params.sensor_queue,
133 self.params.sensor_slop,
134 )
135
136 # register model forward pass as callback
137 self.sensor_sync.registerCallback(self.forwardforward)
138
140 """Register the topic to publish actions to."""
141 self.command_pub = self.create_publisher(
142 self.get_command_msg_type(self.params.command.type),
143 self.params.command.topic,
144 QoSProfile(depth=self.params.command.qos),
145 )
146
147 @abstractmethod
148 def forward():
149 """Perform a forward pass of the policy."""
150 pass
activate_policy(self, request, response)
Definition policy.py:90
get_command_msg_type(self, msg_type)
Definition policy.py:102
__init__(self, params, node_name="policy_node")
Definition policy.py:57
get_sensor_msg_type(self, msg_type)
Definition policy.py:95