From 73904d14039b34568c07ba17f43fa2cdca60db21 Mon Sep 17 00:00:00 2001 From: David Conner Date: Wed, 25 Sep 2024 21:49:19 -0400 Subject: [PATCH] rename to flexbe_status_listener with heartbeat and sync updates; use latched state_map topic --- .../flexbe_onboard/flexbe_onboard.py | 4 +- flexbe_widget/CMakeLists.txt | 2 +- flexbe_widget/bin/flexbe_outcome_listener | 6 -- flexbe_widget/bin/flexbe_status_listener | 6 ++ .../flexbe_widget/behavior_launcher.py | 6 +- ..._listener.py => flexbe_status_listener.py} | 88 +++++++++++++------ 6 files changed, 75 insertions(+), 37 deletions(-) delete mode 100644 flexbe_widget/bin/flexbe_outcome_listener create mode 100755 flexbe_widget/bin/flexbe_status_listener rename flexbe_widget/flexbe_widget/{flexbe_outcome_listener.py => flexbe_status_listener.py} (57%) diff --git a/flexbe_onboard/flexbe_onboard/flexbe_onboard.py b/flexbe_onboard/flexbe_onboard/flexbe_onboard.py index d6172f1..0508b7e 100644 --- a/flexbe_onboard/flexbe_onboard/flexbe_onboard.py +++ b/flexbe_onboard/flexbe_onboard/flexbe_onboard.py @@ -93,9 +93,11 @@ def __init__(self): # only at onboard level self._heartbeat_pub = self.create_publisher(BehaviorSync, Topics._ONBOARD_HEARTBEAT_TOPIC, 10) self._status_pub = self.create_publisher(BEStatus, Topics._ONBOARD_STATUS_TOPIC, 10) - self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_TOPIC, 2) + # Latch state map so we can retreive later if desired latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL) + self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_TOPIC, qos_profile=latching_qos) + self._version_sub = self.create_subscription(String, Topics._UI_VERSION_TOPIC, self._version_callback, qos_profile=latching_qos) diff --git a/flexbe_widget/CMakeLists.txt b/flexbe_widget/CMakeLists.txt index f216901..1dcc9dc 100644 --- a/flexbe_widget/CMakeLists.txt +++ b/flexbe_widget/CMakeLists.txt @@ -11,7 +11,7 @@ install(PROGRAMS bin/breakpoint bin/create_repo bin/evaluate_logs - bin/flexbe_outcome_listener + bin/flexbe_status_listener DESTINATION lib/${PROJECT_NAME} ) diff --git a/flexbe_widget/bin/flexbe_outcome_listener b/flexbe_widget/bin/flexbe_outcome_listener deleted file mode 100644 index 34e9ec2..0000000 --- a/flexbe_widget/bin/flexbe_outcome_listener +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python3 - -from flexbe_widget.flexbe_outcome_listener import flexbe_outcome_listener_main - -if __name__ == '__main__': - flexbe_outcome_listener_main() diff --git a/flexbe_widget/bin/flexbe_status_listener b/flexbe_widget/bin/flexbe_status_listener new file mode 100755 index 0000000..8e7abc3 --- /dev/null +++ b/flexbe_widget/bin/flexbe_status_listener @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from flexbe_widget.flexbe_status_listener import flexbe_status_listener_main + +if __name__ == '__main__': + flexbe_status_listener_main() diff --git a/flexbe_widget/flexbe_widget/behavior_launcher.py b/flexbe_widget/flexbe_widget/behavior_launcher.py index 082e88b..eacfffc 100644 --- a/flexbe_widget/flexbe_widget/behavior_launcher.py +++ b/flexbe_widget/flexbe_widget/behavior_launcher.py @@ -50,6 +50,7 @@ import rclpy from rclpy.node import Node +from rclpy.qos import QoSDurabilityPolicy, QoSProfile from rosidl_runtime_py import get_interface_path @@ -78,7 +79,10 @@ def __init__(self): self._status_pub = self.create_publisher(BEStatus, Topics._ONBOARD_STATUS_TOPIC, 100) self._mirror_pub = self.create_publisher(ContainerStructure, Topics._MIRROR_STRUCTURE_TOPIC, 100) self._heartbeat_pub = self.create_publisher(Int32, Topics._LAUNCHER_HEARTBEAT_TOPIC, 2) - self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_OCS_TOPIC, 2) + + # Latch state map so we can retreive later if desired + latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL) + self._state_map_pub = self.create_publisher(StateMapMsg, Topics._STATE_MAP_OCS_TOPIC, latching_qos) self._behavior_lib = BehaviorLibrary(self) diff --git a/flexbe_widget/flexbe_widget/flexbe_outcome_listener.py b/flexbe_widget/flexbe_widget/flexbe_status_listener.py similarity index 57% rename from flexbe_widget/flexbe_widget/flexbe_outcome_listener.py rename to flexbe_widget/flexbe_widget/flexbe_status_listener.py index a9d3190..424f126 100644 --- a/flexbe_widget/flexbe_widget/flexbe_outcome_listener.py +++ b/flexbe_widget/flexbe_widget/flexbe_status_listener.py @@ -28,52 +28,83 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. -import rclpy -from rclpy.node import Node - from flexbe_core.core import StateMap, Topics -class FlexbeOutcomeListener(Node): - """Simple lister to echo FlexBE state machine information.""" - def __init__(self): - super().__init__('flexbe_outcome_listener') - +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSDurabilityPolicy, QoSProfile - # Suppress the node name and severity information (ChatGPT led me astray) - # import logging - # formatter = logging.Formatter('[%(asctime)s] %(message)s') - # console_handler = logging.StreamHandler() - # console_handler.setFormatter(formatter) - # self.get_logger().handlers.clear() - # self.get_logger().addHandler(console_handler) +class FlexbeStatusListener(Node): + """Simple listener to echo FlexBE state machine information.""" + def __init__(self): + super().__init__('flexbe_status_listener') + latching_qos = QoSProfile(depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL) self._state_map = None self._state_map_sub = self.create_subscription( + Topics._topic_types[Topics._STATE_MAP_OCS_TOPIC], + Topics._STATE_MAP_OCS_TOPIC, + self._state_map_callback, + latching_qos) + self._state_map_sub # prevent unused variable warning + + self._outcome_sub = self.create_subscription( Topics._topic_types[Topics._OUTCOME_TOPIC], Topics._OUTCOME_TOPIC, self._outcome_callback, 10) - self._state_map_sub # prevent unused variable warning + self._outcome_sub # prevent unused variable warning - self._state_map_sub = self.create_subscription( - Topics._topic_types[Topics._STATE_MAP_OCS_TOPIC], - Topics._STATE_MAP_OCS_TOPIC, - self._state_map_callback, + self._heartbeat_sub = self.create_subscription( + Topics._topic_types[Topics._ONBOARD_HEARTBEAT_TOPIC], + Topics._ONBOARD_HEARTBEAT_TOPIC, + self._heartbeat_callback, 10) - self._state_map_sub # prevent unused variable warning + self._heartbeat_sub # prevent unused variable warning + + self._sync_sub = self.create_subscription( + Topics._topic_types[Topics._MIRROR_SYNC_TOPIC], + Topics._MIRROR_SYNC_TOPIC, + self._sync_callback, + 10) + self._sync_sub # prevent unused variable warning + + def _heartbeat_callback(self, msg): + if self._state_map is None: + self.get_logger().info(f'Onboard heartbeat {msg}') + return + + states = [] + for checksum in msg.current_state_checksums: + state_id, _ = StateMap.unhash(checksum) + states.append(self._state_map.get(state_id, 'unknown')) + + self.get_logger().info(f'Onboard heartbeat {msg.behavior_id:10d} : {states}') + + def _sync_callback(self, msg): + if self._state_map is None: + self.get_logger().info(f'Synchronize mirror {msg}') + return + + states = [] + for checksum in msg.current_state_checksums: + state_id, _ = StateMap.unhash(checksum) + states.append(self._state_map.get(state_id, 'unknown')) + + self.get_logger().info(f'Synchronize mirror {msg.behavior_id:10d} : {states}') def _outcome_callback(self, msg): if self._state_map is None: - self.get_logger().info(f"Outcome msg hash value {msg.data:11d}") + self.get_logger().info(f'Outcome msg hash value {msg.data:11d}') return state_id, outcome = StateMap.unhash(msg.data) path = f"'{self._state_map.get(state_id, 'unknown')}'" - self.get_logger().info(f"Outcome {outcome:2d} from {state_id:11d} {path:60s}") + self.get_logger().info(f'Outcome {outcome:2d} from {state_id:11d} {path:60s}') def _state_map_callback(self, msg): - self.get_logger().info(f"New state map received for {msg.behavior_id}") + self.get_logger().info(f'New state map received for {msg.behavior_id}') state_map = {} for id, path in zip(msg.state_ids, msg.state_paths): @@ -82,13 +113,14 @@ def _state_map_callback(self, msg): self._state_map = state_map -def flexbe_outcome_listener_main(args=None): +def flexbe_status_listener_main(args=None): + """Run the status listener to echo state info given state id in messages.""" print('Suggest: export RCUTILS_CONSOLE_OUTPUT_FORMAT="[{time}] {message}" in terminal') - print(' Run this before starting behavior to get full state path informatio from state map.') + print(' Run this before starting behavior to get full state path information from state map.') rclpy.init(args=args) - outcomes = FlexbeOutcomeListener() + outcomes = FlexbeStatusListener() rclpy.spin(outcomes) @@ -100,4 +132,4 @@ def flexbe_outcome_listener_main(args=None): if __name__ == '__main__': - flexbe_outcome_listener_main() \ No newline at end of file + flexbe_status_listener_main()