36Definition of an abstract base class for policy deployment.
37For now, this policy only supports moveit_servo command interfaces and Image sensors.
40from abc
import ABC, abstractmethod
42from rclpy.node
import Node
43from rclpy.qos
import QoSProfile
45from control_msgs.msg
import JointJog
46from geometry_msgs.msg
import PoseStamped, Twist
47from sensor_msgs.msg
import Image
49from std_srvs.srv
import SetBool
55 """An abstract base class for deploying learnt policies."""
57 def __init__(self, params, node_name="policy_node"):
58 """Initialise the policy."""
82 """Returns True if the policy is active."""
87 """Sets the policy to active state via Python API."""
91 """Sets the policy to active state via ROS interface."""
96 """Returns the ROS 2 message type for a given sensor type."""
97 if msg_type ==
"sensor_msgs/Image":
100 raise ValueError(f
"Sensor type {msg_type} not supported.")
103 """Returns the ROS 2 message type for a given command type."""
104 if msg_type ==
"geometry_msgs/PoseStamped":
106 elif msg_type ==
"geometry_msgs/Twist":
108 elif msg_type ==
"control_msgs/JointJog":
111 raise ValueError(f
"Command type {msg_type} not supported.")
114 """Register the topics to listen to for sensor data."""
118 for sensor_idx
in range(self.
params.num_sensors):
119 sensor_params = self.get_parameters_by_prefix(f
"sensor{sensor_idx + 1}")
121 message_filters.Subscriber(
124 str(sensor_params[
"topic"].value),
125 qos_profile=QoSProfile(depth=sensor_params[
"qos"].value),
140 """Register the topic to publish actions to."""
143 self.
params.command.topic,
144 QoSProfile(depth=self.
params.command.qos),
149 """Perform a forward pass of the policy."""
activate_policy(self, request, response)
get_command_msg_type(self, msg_type)
__init__(self, params, node_name="policy_node")
get_sensor_msg_type(self, msg_type)